diff options
Diffstat (limited to 'toolkit/modules/subprocess/subprocess_common.jsm')
-rw-r--r-- | toolkit/modules/subprocess/subprocess_common.jsm | 703 |
1 files changed, 703 insertions, 0 deletions
diff --git a/toolkit/modules/subprocess/subprocess_common.jsm b/toolkit/modules/subprocess/subprocess_common.jsm new file mode 100644 index 000000000..a899fcc49 --- /dev/null +++ b/toolkit/modules/subprocess/subprocess_common.jsm @@ -0,0 +1,703 @@ +/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */ +/* vim: set sts=2 sw=2 et tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +/* eslint-disable mozilla/balanced-listeners */ + +/* exported BaseProcess, PromiseWorker */ + +var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components; + +Cu.import("resource://gre/modules/Services.jsm"); +Cu.import("resource://gre/modules/XPCOMUtils.jsm"); +Cu.importGlobalProperties(["TextDecoder"]); + +XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown", + "resource://gre/modules/AsyncShutdown.jsm"); +XPCOMUtils.defineLazyModuleGetter(this, "setTimeout", + "resource://gre/modules/Timer.jsm"); + +Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this); + +var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker", "SubprocessConstants"]; + +const BUFFER_SIZE = 4096; + +let nextResponseId = 0; + +/** + * Wraps a ChromeWorker so that messages sent to it return a promise which + * resolves when the message has been received and the operation it triggers is + * complete. + */ +class PromiseWorker extends ChromeWorker { + constructor(url) { + super(url); + + this.listeners = new Map(); + this.pendingResponses = new Map(); + + this.addListener("close", this.onClose.bind(this)); + this.addListener("failure", this.onFailure.bind(this)); + this.addListener("success", this.onSuccess.bind(this)); + this.addListener("debug", this.onDebug.bind(this)); + + this.addEventListener("message", this.onmessage); + + this.shutdown = this.shutdown.bind(this); + AsyncShutdown.webWorkersShutdown.addBlocker( + "Subprocess.jsm: Shut down IO worker", + this.shutdown); + } + + onClose() { + AsyncShutdown.webWorkersShutdown.removeBlocker(this.shutdown); + } + + shutdown() { + return this.call("shutdown", []); + } + + /** + * Adds a listener for the given message from the worker. Any message received + * from the worker with a `data.msg` property matching the given `msg` + * parameter are passed to the given listener. + * + * @param {string} msg + * The message to listen for. + * @param {function(Event)} listener + * The listener to call when matching messages are received. + */ + addListener(msg, listener) { + if (!this.listeners.has(msg)) { + this.listeners.set(msg, new Set()); + } + this.listeners.get(msg).add(listener); + } + + /** + * Removes the given message listener. + * + * @param {string} msg + * The message to stop listening for. + * @param {function(Event)} listener + * The listener to remove. + */ + removeListener(msg, listener) { + let listeners = this.listeners.get(msg); + if (listeners) { + listeners.delete(listener); + + if (!listeners.size) { + this.listeners.delete(msg); + } + } + } + + onmessage(event) { + let {msg} = event.data; + let listeners = this.listeners.get(msg) || new Set(); + + for (let listener of listeners) { + try { + listener(event.data); + } catch (e) { + Cu.reportError(e); + } + } + } + + /** + * Called when a message sent to the worker has failed, and rejects its + * corresponding promise. + * + * @private + */ + onFailure({msgId, error}) { + this.pendingResponses.get(msgId).reject(error); + this.pendingResponses.delete(msgId); + } + + /** + * Called when a message sent to the worker has succeeded, and resolves its + * corresponding promise. + * + * @private + */ + onSuccess({msgId, data}) { + this.pendingResponses.get(msgId).resolve(data); + this.pendingResponses.delete(msgId); + } + + onDebug({message}) { + dump(`Worker debug: ${message}\n`); + } + + /** + * Calls the given method in the worker, and returns a promise which resolves + * or rejects when the method has completed. + * + * @param {string} method + * The name of the method to call. + * @param {Array} args + * The arguments to pass to the method. + * @param {Array} [transferList] + * A list of objects to transfer to the worker, rather than cloning. + * @returns {Promise} + */ + call(method, args, transferList = []) { + let msgId = nextResponseId++; + + return new Promise((resolve, reject) => { + this.pendingResponses.set(msgId, {resolve, reject}); + + let message = { + msg: method, + msgId, + args, + }; + + this.postMessage(message, transferList); + }); + } +} + +/** + * Represents an input or output pipe connected to a subprocess. + * + * @property {integer} fd + * The file descriptor number of the pipe on the child process's side. + * @readonly + */ +class Pipe { + /** + * @param {Process} process + * The child process that this pipe is connected to. + * @param {integer} fd + * The file descriptor number of the pipe on the child process's side. + * @param {integer} id + * The internal ID of the pipe, which ties it to the corresponding Pipe + * object on the Worker side. + */ + constructor(process, fd, id) { + this.id = id; + this.fd = fd; + this.processId = process.id; + this.worker = process.worker; + + /** + * @property {boolean} closed + * True if the file descriptor has been closed, and can no longer + * be read from or written to. Pending IO operations may still + * complete, but new operations may not be initiated. + * @readonly + */ + this.closed = false; + } + + /** + * Closes the end of the pipe which belongs to this process. + * + * @param {boolean} force + * If true, the pipe is closed immediately, regardless of any pending + * IO operations. If false, the pipe is closed after any existing + * pending IO operations have completed. + * @returns {Promise<object>} + * Resolves to an object with no properties once the pipe has been + * closed. + */ + close(force = false) { + this.closed = true; + return this.worker.call("close", [this.id, force]); + } +} + +/** + * Represents an output-only pipe, to which data may be written. + */ +class OutputPipe extends Pipe { + constructor(...args) { + super(...args); + + this.encoder = new TextEncoder(); + } + + /** + * Writes the given data to the stream. + * + * When given an array buffer or typed array, ownership of the buffer is + * transferred to the IO worker, and it may no longer be used from this + * thread. + * + * @param {ArrayBuffer|TypedArray|string} buffer + * Data to write to the stream. + * @returns {Promise<object>} + * Resolves to an object with a `bytesWritten` property, containing + * the number of bytes successfully written, once the operation has + * completed. + * + * @rejects {object} + * May be rejected with an Error object, or an object with similar + * properties. The object will include an `errorCode` property with + * one of the following values if it was rejected for the + * corresponding reason: + * + * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before + * all of the data in `buffer` could be written to it. + */ + write(buffer) { + if (typeof buffer === "string") { + buffer = this.encoder.encode(buffer); + } + + if (Cu.getClassName(buffer, true) !== "ArrayBuffer") { + if (buffer.byteLength === buffer.buffer.byteLength) { + buffer = buffer.buffer; + } else { + buffer = buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength); + } + } + + let args = [this.id, buffer]; + + return this.worker.call("write", args, [buffer]); + } +} + +/** + * Represents an input-only pipe, from which data may be read. + */ +class InputPipe extends Pipe { + constructor(...args) { + super(...args); + + this.buffers = []; + + /** + * @property {integer} dataAvailable + * The number of readable bytes currently stored in the input + * buffer. + * @readonly + */ + this.dataAvailable = 0; + + this.decoder = new TextDecoder(); + + this.pendingReads = []; + + this._pendingBufferRead = null; + + this.fillBuffer(); + } + + /** + * @property {integer} bufferSize + * The current size of the input buffer. This varies depending on + * the size of pending read operations. + * @readonly + */ + get bufferSize() { + if (this.pendingReads.length) { + return Math.max(this.pendingReads[0].length, BUFFER_SIZE); + } + return BUFFER_SIZE; + } + + /** + * Attempts to fill the input buffer. + * + * @private + */ + fillBuffer() { + let dataWanted = this.bufferSize - this.dataAvailable; + + if (!this._pendingBufferRead && dataWanted > 0) { + this._pendingBufferRead = this._read(dataWanted); + + this._pendingBufferRead.then((result) => { + this._pendingBufferRead = null; + + if (result) { + this.onInput(result.buffer); + + this.fillBuffer(); + } + }); + } + } + + _read(size) { + let args = [this.id, size]; + + return this.worker.call("read", args).catch(e => { + this.closed = true; + + for (let {length, resolve, reject} of this.pendingReads.splice(0)) { + if (length === null && e.errorCode === SubprocessConstants.ERROR_END_OF_FILE) { + resolve(new ArrayBuffer(0)); + } else { + reject(e); + } + } + }); + } + + /** + * Adds the given data to the end of the input buffer. + * + * @param {ArrayBuffer} buffer + * An input buffer to append to the current buffered input. + * @private + */ + onInput(buffer) { + this.buffers.push(buffer); + this.dataAvailable += buffer.byteLength; + this.checkPendingReads(); + } + + /** + * Checks the topmost pending read operations and fulfills as many as can be + * filled from the current input buffer. + * + * @private + */ + checkPendingReads() { + this.fillBuffer(); + + let reads = this.pendingReads; + while (reads.length && this.dataAvailable && + reads[0].length <= this.dataAvailable) { + let pending = this.pendingReads.shift(); + + let length = pending.length || this.dataAvailable; + + let result; + let byteLength = this.buffers[0].byteLength; + if (byteLength == length) { + result = this.buffers.shift(); + } else if (byteLength > length) { + let buffer = this.buffers[0]; + + this.buffers[0] = buffer.slice(length); + result = ArrayBuffer.transfer(buffer, length); + } else { + result = ArrayBuffer.transfer(this.buffers.shift(), length); + let u8result = new Uint8Array(result); + + while (byteLength < length) { + let buffer = this.buffers[0]; + let u8buffer = new Uint8Array(buffer); + + let remaining = length - byteLength; + + if (buffer.byteLength <= remaining) { + this.buffers.shift(); + + u8result.set(u8buffer, byteLength); + } else { + this.buffers[0] = buffer.slice(remaining); + + u8result.set(u8buffer.subarray(0, remaining), byteLength); + } + + byteLength += Math.min(buffer.byteLength, remaining); + } + } + + this.dataAvailable -= result.byteLength; + pending.resolve(result); + } + } + + /** + * Reads exactly `length` bytes of binary data from the input stream, or, if + * length is not provided, reads the first chunk of data to become available. + * In the latter case, returns an empty array buffer on end of file. + * + * The read operation will not complete until enough data is available to + * fulfill the request. If the pipe closes without enough available data to + * fulfill the read, the operation fails, and any remaining buffered data is + * lost. + * + * @param {integer} [length] + * The number of bytes to read. + * @returns {Promise<ArrayBuffer>} + * + * @rejects {object} + * May be rejected with an Error object, or an object with similar + * properties. The object will include an `errorCode` property with + * one of the following values if it was rejected for the + * corresponding reason: + * + * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before + * enough input could be read to satisfy the request. + */ + read(length = null) { + if (length !== null && !(Number.isInteger(length) && length >= 0)) { + throw new RangeError("Length must be a non-negative integer"); + } + + if (length == 0) { + return Promise.resolve(new ArrayBuffer(0)); + } + + return new Promise((resolve, reject) => { + this.pendingReads.push({length, resolve, reject}); + this.checkPendingReads(); + }); + } + + /** + * Reads exactly `length` bytes from the input stream, and parses them as + * UTF-8 JSON data. + * + * @param {integer} length + * The number of bytes to read. + * @returns {Promise<object>} + * + * @rejects {object} + * May be rejected with an Error object, or an object with similar + * properties. The object will include an `errorCode` property with + * one of the following values if it was rejected for the + * corresponding reason: + * + * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before + * enough input could be read to satisfy the request. + * - Subprocess.ERROR_INVALID_JSON: The data read from the pipe + * could not be parsed as a valid JSON string. + */ + readJSON(length) { + if (!Number.isInteger(length) || length <= 0) { + throw new RangeError("Length must be a positive integer"); + } + + return this.readString(length).then(string => { + try { + return JSON.parse(string); + } catch (e) { + e.errorCode = SubprocessConstants.ERROR_INVALID_JSON; + throw e; + } + }); + } + + /** + * Reads a chunk of UTF-8 data from the input stream, and converts it to a + * JavaScript string. + * + * If `length` is provided, reads exactly `length` bytes. Otherwise, reads the + * first chunk of data to become available, and returns an empty string on end + * of file. In the latter case, the chunk is decoded in streaming mode, and + * any incomplete UTF-8 sequences at the end of a chunk are returned at the + * start of a subsequent read operation. + * + * @param {integer} [length] + * The number of bytes to read. + * @param {object} [options] + * An options object as expected by TextDecoder.decode. + * @returns {Promise<string>} + * + * @rejects {object} + * May be rejected with an Error object, or an object with similar + * properties. The object will include an `errorCode` property with + * one of the following values if it was rejected for the + * corresponding reason: + * + * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before + * enough input could be read to satisfy the request. + */ + readString(length = null, options = {stream: length === null}) { + if (length !== null && !(Number.isInteger(length) && length >= 0)) { + throw new RangeError("Length must be a non-negative integer"); + } + + return this.read(length).then(buffer => { + return this.decoder.decode(buffer, options); + }); + } + + /** + * Reads 4 bytes from the input stream, and parses them as an unsigned + * integer, in native byte order. + * + * @returns {Promise<integer>} + * + * @rejects {object} + * May be rejected with an Error object, or an object with similar + * properties. The object will include an `errorCode` property with + * one of the following values if it was rejected for the + * corresponding reason: + * + * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before + * enough input could be read to satisfy the request. + */ + readUint32() { + return this.read(4).then(buffer => { + return new Uint32Array(buffer)[0]; + }); + } +} + +/** + * @class Process + * @extends BaseProcess + */ + +/** + * Represents a currently-running process, and allows interaction with it. + */ +class BaseProcess { + /** + * @param {PromiseWorker} worker + * The worker instance which owns the process. + * @param {integer} processId + * The internal ID of the Process object, which ties it to the + * corresponding process on the Worker side. + * @param {integer[]} fds + * An array of internal Pipe IDs, one for each standard file descriptor + * in the child process. + * @param {integer} pid + * The operating system process ID of the process. + */ + constructor(worker, processId, fds, pid) { + this.id = processId; + this.worker = worker; + + /** + * @property {integer} pid + * The process ID of the process, assigned by the operating system. + * @readonly + */ + this.pid = pid; + + this.exitCode = null; + + this.exitPromise = new Promise(resolve => { + this.worker.call("wait", [this.id]).then(({exitCode}) => { + resolve(Object.freeze({exitCode})); + this.exitCode = exitCode; + }); + }); + + if (fds[0] !== undefined) { + /** + * @property {OutputPipe} stdin + * A Pipe object which allows writing to the process's standard + * input. + * @readonly + */ + this.stdin = new OutputPipe(this, 0, fds[0]); + } + if (fds[1] !== undefined) { + /** + * @property {InputPipe} stdout + * A Pipe object which allows reading from the process's standard + * output. + * @readonly + */ + this.stdout = new InputPipe(this, 1, fds[1]); + } + if (fds[2] !== undefined) { + /** + * @property {InputPipe} [stderr] + * An optional Pipe object which allows reading from the + * process's standard error output. + * @readonly + */ + this.stderr = new InputPipe(this, 2, fds[2]); + } + } + + /** + * Spawns a process, and resolves to a BaseProcess instance on success. + * + * @param {object} options + * An options object as passed to `Subprocess.call`. + * + * @returns {Promise<BaseProcess>} + */ + static create(options) { + let worker = this.getWorker(); + + return worker.call("spawn", [options]).then(({processId, fds, pid}) => { + return new this(worker, processId, fds, pid); + }); + } + + static get WORKER_URL() { + throw new Error("Not implemented"); + } + + static get WorkerClass() { + return PromiseWorker; + } + + /** + * Gets the current subprocess worker, or spawns a new one if it does not + * currently exist. + * + * @returns {PromiseWorker} + */ + static getWorker() { + if (!this._worker) { + this._worker = new this.WorkerClass(this.WORKER_URL); + } + return this._worker; + } + + /** + * Kills the process. + * + * @param {integer} [timeout=300] + * A timeout, in milliseconds, after which the process will be forcibly + * killed. On platforms which support it, the process will be sent + * a `SIGTERM` signal immediately, so that it has a chance to terminate + * gracefully, and a `SIGKILL` signal if it hasn't exited within + * `timeout` milliseconds. On other platforms (namely Windows), the + * process will be forcibly terminated immediately. + * + * @returns {Promise<object>} + * Resolves to an object with an `exitCode` property when the process + * has exited. + */ + kill(timeout = 300) { + // If the process has already exited, don't bother sending a signal. + if (this.exitCode != null) { + return this.wait(); + } + + let force = timeout <= 0; + this.worker.call("kill", [this.id, force]); + + if (!force) { + setTimeout(() => { + if (this.exitCode == null) { + this.worker.call("kill", [this.id, true]); + } + }, timeout); + } + + return this.wait(); + } + + /** + * Returns a promise which resolves to the process's exit code, once it has + * exited. + * + * @returns {Promise<object>} + * Resolves to an object with an `exitCode` property, containing the + * process's exit code, once the process has exited. + * + * On Unix-like systems, a negative exit code indicates that the + * process was killed by a signal whose signal number is the absolute + * value of the error code. On Windows, an exit code of -9 indicates + * that the process was killed via the {@linkcode BaseProcess#kill kill()} + * method. + */ + wait() { + return this.exitPromise; + } +} |