diff options
Diffstat (limited to 'security/nss/automation/taskcluster/graph/src/queue.js')
-rw-r--r-- | security/nss/automation/taskcluster/graph/src/queue.js | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/security/nss/automation/taskcluster/graph/src/queue.js b/security/nss/automation/taskcluster/graph/src/queue.js new file mode 100644 index 000000000..2a4a7b3fe --- /dev/null +++ b/security/nss/automation/taskcluster/graph/src/queue.js @@ -0,0 +1,242 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import {clone} from "merge"; +import merge from "./merge"; +import slugid from "slugid"; +import taskcluster from "taskcluster-client"; +import * as image_builder from "./image_builder"; + +let maps = []; +let filters = []; + +let tasks = new Map(); +let image_tasks = new Map(); + +let queue = new taskcluster.Queue({ + baseUrl: "http://taskcluster/queue/v1" +}); + +function fromNow(hours) { + let d = new Date(); + d.setHours(d.getHours() + (hours|0)); + return d.toJSON(); +} + +function parseRoutes(routes) { + return [ + `tc-treeherder.v2.${process.env.TC_PROJECT}.${process.env.NSS_HEAD_REVISION}.${process.env.NSS_PUSHLOG_ID}`, + ...routes + ]; +} + +function parseFeatures(list) { + return list.reduce((map, feature) => { + map[feature] = true; + return map; + }, {}); +} + +function parseArtifacts(artifacts) { + let copy = clone(artifacts); + Object.keys(copy).forEach(key => { + copy[key].expires = fromNow(copy[key].expires); + }); + return copy; +} + +function parseCollection(name) { + let collection = {}; + collection[name] = true; + return collection; +} + +function parseTreeherder(def) { + let treeherder = { + build: { + platform: def.platform + }, + machine: { + platform: def.platform + }, + symbol: def.symbol, + jobKind: def.kind + }; + + if (def.group) { + treeherder.groupSymbol = def.group; + } + + if (def.collection) { + treeherder.collection = parseCollection(def.collection); + } + + if (def.tier) { + treeherder.tier = def.tier; + } + + return treeherder; +} + +function convertTask(def) { + let dependencies = []; + + let env = merge({ + NSS_HEAD_REPOSITORY: process.env.NSS_HEAD_REPOSITORY, + NSS_HEAD_REVISION: process.env.NSS_HEAD_REVISION + }, def.env || {}); + + if (def.parent) { + dependencies.push(def.parent); + env.TC_PARENT_TASK_ID = def.parent; + } + + if (def.tests) { + env.NSS_TESTS = def.tests; + } + + if (def.cycle) { + env.NSS_CYCLES = def.cycle; + } + + let payload = { + env, + command: def.command, + maxRunTime: def.maxRunTime || 3600 + }; + + if (def.image) { + payload.image = def.image; + } + + if (def.features) { + payload.features = parseFeatures(def.features); + } + + if (def.artifacts) { + payload.artifacts = parseArtifacts(def.artifacts); + } + + return { + provisionerId: def.provisioner || "aws-provisioner-v1", + workerType: def.workerType || "hg-worker", + schedulerId: "task-graph-scheduler", + + created: fromNow(0), + deadline: fromNow(24), + + dependencies, + routes: parseRoutes(def.routes || []), + + metadata: { + name: def.name, + description: def.name, + owner: process.env.TC_OWNER, + source: process.env.TC_SOURCE + }, + + payload, + + extra: { + treeherder: parseTreeherder(def) + } + }; +} + +export function map(fun) { + maps.push(fun); +} + +export function filter(fun) { + filters.push(fun); +} + +export function scheduleTask(def) { + let taskId = slugid.v4(); + tasks.set(taskId, merge({}, def)); + return taskId; +} + +export async function submit() { + let promises = new Map(); + + for (let [taskId, task] of tasks) { + // Allow filtering tasks before we schedule them. + if (!filters.every(filter => filter(task))) { + continue; + } + + // Allow changing tasks before we schedule them. + maps.forEach(map => { task = map(merge({}, task)) }); + + let log_id = `${task.name} @ ${task.platform}[${task.collection || "opt"}]`; + console.log(`+ Submitting ${log_id}.`); + + let parent = task.parent; + + // Convert the task definition. + task = await convertTask(task); + + // Convert the docker image definition. + let image_def = task.payload.image; + if (image_def && image_def.hasOwnProperty("path")) { + let key = `${image_def.name}:${image_def.path}`; + let data = {}; + + // Check the cache first. + if (image_tasks.has(key)) { + data = image_tasks.get(key); + } else { + data.taskId = await image_builder.findTask(image_def); + data.isPending = !data.taskId; + + // No task found. + if (data.isPending) { + let image_task = await image_builder.buildTask(image_def); + + // Schedule a new image builder task immediately. + data.taskId = slugid.v4(); + + try { + await queue.createTask(data.taskId, convertTask(image_task)); + } catch (e) { + console.error("! FAIL: Scheduling image builder task failed."); + continue; /* Skip this task on failure. */ + } + } + + // Store in cache. + image_tasks.set(key, data); + } + + if (data.isPending) { + task.dependencies.push(data.taskId); + } + + task.payload.image = { + path: "public/image.tar", + taskId: data.taskId, + type: "task-image" + }; + } + + // Wait for the parent task to be created before scheduling dependants. + let predecessor = parent ? promises.get(parent) : Promise.resolve(); + + promises.set(taskId, predecessor.then(() => { + // Schedule the task. + return queue.createTask(taskId, task).catch(err => { + console.error(`! FAIL: Scheduling ${log_id} failed.`, err); + }); + })); + } + + // Wait for all requests to finish. + if (promises.length) { + await Promise.all([...promises.values()]); + console.log("=== Total:", promises.length, "tasks. ==="); + } + + tasks.clear(); +} |