summaryrefslogtreecommitdiffstats
path: root/toolkit/modules/subprocess/subprocess_common.jsm
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/modules/subprocess/subprocess_common.jsm')
-rw-r--r--toolkit/modules/subprocess/subprocess_common.jsm703
1 files changed, 703 insertions, 0 deletions
diff --git a/toolkit/modules/subprocess/subprocess_common.jsm b/toolkit/modules/subprocess/subprocess_common.jsm
new file mode 100644
index 000000000..a899fcc49
--- /dev/null
+++ b/toolkit/modules/subprocess/subprocess_common.jsm
@@ -0,0 +1,703 @@
+/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set sts=2 sw=2 et tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+"use strict";
+
+/* eslint-disable mozilla/balanced-listeners */
+
+/* exported BaseProcess, PromiseWorker */
+
+var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
+
+Cu.import("resource://gre/modules/Services.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+Cu.importGlobalProperties(["TextDecoder"]);
+
+XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
+ "resource://gre/modules/AsyncShutdown.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "setTimeout",
+ "resource://gre/modules/Timer.jsm");
+
+Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
+
+var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker", "SubprocessConstants"];
+
+const BUFFER_SIZE = 4096;
+
+let nextResponseId = 0;
+
+/**
+ * Wraps a ChromeWorker so that messages sent to it return a promise which
+ * resolves when the message has been received and the operation it triggers is
+ * complete.
+ */
+class PromiseWorker extends ChromeWorker {
+ constructor(url) {
+ super(url);
+
+ this.listeners = new Map();
+ this.pendingResponses = new Map();
+
+ this.addListener("close", this.onClose.bind(this));
+ this.addListener("failure", this.onFailure.bind(this));
+ this.addListener("success", this.onSuccess.bind(this));
+ this.addListener("debug", this.onDebug.bind(this));
+
+ this.addEventListener("message", this.onmessage);
+
+ this.shutdown = this.shutdown.bind(this);
+ AsyncShutdown.webWorkersShutdown.addBlocker(
+ "Subprocess.jsm: Shut down IO worker",
+ this.shutdown);
+ }
+
+ onClose() {
+ AsyncShutdown.webWorkersShutdown.removeBlocker(this.shutdown);
+ }
+
+ shutdown() {
+ return this.call("shutdown", []);
+ }
+
+ /**
+ * Adds a listener for the given message from the worker. Any message received
+ * from the worker with a `data.msg` property matching the given `msg`
+ * parameter are passed to the given listener.
+ *
+ * @param {string} msg
+ * The message to listen for.
+ * @param {function(Event)} listener
+ * The listener to call when matching messages are received.
+ */
+ addListener(msg, listener) {
+ if (!this.listeners.has(msg)) {
+ this.listeners.set(msg, new Set());
+ }
+ this.listeners.get(msg).add(listener);
+ }
+
+ /**
+ * Removes the given message listener.
+ *
+ * @param {string} msg
+ * The message to stop listening for.
+ * @param {function(Event)} listener
+ * The listener to remove.
+ */
+ removeListener(msg, listener) {
+ let listeners = this.listeners.get(msg);
+ if (listeners) {
+ listeners.delete(listener);
+
+ if (!listeners.size) {
+ this.listeners.delete(msg);
+ }
+ }
+ }
+
+ onmessage(event) {
+ let {msg} = event.data;
+ let listeners = this.listeners.get(msg) || new Set();
+
+ for (let listener of listeners) {
+ try {
+ listener(event.data);
+ } catch (e) {
+ Cu.reportError(e);
+ }
+ }
+ }
+
+ /**
+ * Called when a message sent to the worker has failed, and rejects its
+ * corresponding promise.
+ *
+ * @private
+ */
+ onFailure({msgId, error}) {
+ this.pendingResponses.get(msgId).reject(error);
+ this.pendingResponses.delete(msgId);
+ }
+
+ /**
+ * Called when a message sent to the worker has succeeded, and resolves its
+ * corresponding promise.
+ *
+ * @private
+ */
+ onSuccess({msgId, data}) {
+ this.pendingResponses.get(msgId).resolve(data);
+ this.pendingResponses.delete(msgId);
+ }
+
+ onDebug({message}) {
+ dump(`Worker debug: ${message}\n`);
+ }
+
+ /**
+ * Calls the given method in the worker, and returns a promise which resolves
+ * or rejects when the method has completed.
+ *
+ * @param {string} method
+ * The name of the method to call.
+ * @param {Array} args
+ * The arguments to pass to the method.
+ * @param {Array} [transferList]
+ * A list of objects to transfer to the worker, rather than cloning.
+ * @returns {Promise}
+ */
+ call(method, args, transferList = []) {
+ let msgId = nextResponseId++;
+
+ return new Promise((resolve, reject) => {
+ this.pendingResponses.set(msgId, {resolve, reject});
+
+ let message = {
+ msg: method,
+ msgId,
+ args,
+ };
+
+ this.postMessage(message, transferList);
+ });
+ }
+}
+
+/**
+ * Represents an input or output pipe connected to a subprocess.
+ *
+ * @property {integer} fd
+ * The file descriptor number of the pipe on the child process's side.
+ * @readonly
+ */
+class Pipe {
+ /**
+ * @param {Process} process
+ * The child process that this pipe is connected to.
+ * @param {integer} fd
+ * The file descriptor number of the pipe on the child process's side.
+ * @param {integer} id
+ * The internal ID of the pipe, which ties it to the corresponding Pipe
+ * object on the Worker side.
+ */
+ constructor(process, fd, id) {
+ this.id = id;
+ this.fd = fd;
+ this.processId = process.id;
+ this.worker = process.worker;
+
+ /**
+ * @property {boolean} closed
+ * True if the file descriptor has been closed, and can no longer
+ * be read from or written to. Pending IO operations may still
+ * complete, but new operations may not be initiated.
+ * @readonly
+ */
+ this.closed = false;
+ }
+
+ /**
+ * Closes the end of the pipe which belongs to this process.
+ *
+ * @param {boolean} force
+ * If true, the pipe is closed immediately, regardless of any pending
+ * IO operations. If false, the pipe is closed after any existing
+ * pending IO operations have completed.
+ * @returns {Promise<object>}
+ * Resolves to an object with no properties once the pipe has been
+ * closed.
+ */
+ close(force = false) {
+ this.closed = true;
+ return this.worker.call("close", [this.id, force]);
+ }
+}
+
+/**
+ * Represents an output-only pipe, to which data may be written.
+ */
+class OutputPipe extends Pipe {
+ constructor(...args) {
+ super(...args);
+
+ this.encoder = new TextEncoder();
+ }
+
+ /**
+ * Writes the given data to the stream.
+ *
+ * When given an array buffer or typed array, ownership of the buffer is
+ * transferred to the IO worker, and it may no longer be used from this
+ * thread.
+ *
+ * @param {ArrayBuffer|TypedArray|string} buffer
+ * Data to write to the stream.
+ * @returns {Promise<object>}
+ * Resolves to an object with a `bytesWritten` property, containing
+ * the number of bytes successfully written, once the operation has
+ * completed.
+ *
+ * @rejects {object}
+ * May be rejected with an Error object, or an object with similar
+ * properties. The object will include an `errorCode` property with
+ * one of the following values if it was rejected for the
+ * corresponding reason:
+ *
+ * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before
+ * all of the data in `buffer` could be written to it.
+ */
+ write(buffer) {
+ if (typeof buffer === "string") {
+ buffer = this.encoder.encode(buffer);
+ }
+
+ if (Cu.getClassName(buffer, true) !== "ArrayBuffer") {
+ if (buffer.byteLength === buffer.buffer.byteLength) {
+ buffer = buffer.buffer;
+ } else {
+ buffer = buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
+ }
+ }
+
+ let args = [this.id, buffer];
+
+ return this.worker.call("write", args, [buffer]);
+ }
+}
+
+/**
+ * Represents an input-only pipe, from which data may be read.
+ */
+class InputPipe extends Pipe {
+ constructor(...args) {
+ super(...args);
+
+ this.buffers = [];
+
+ /**
+ * @property {integer} dataAvailable
+ * The number of readable bytes currently stored in the input
+ * buffer.
+ * @readonly
+ */
+ this.dataAvailable = 0;
+
+ this.decoder = new TextDecoder();
+
+ this.pendingReads = [];
+
+ this._pendingBufferRead = null;
+
+ this.fillBuffer();
+ }
+
+ /**
+ * @property {integer} bufferSize
+ * The current size of the input buffer. This varies depending on
+ * the size of pending read operations.
+ * @readonly
+ */
+ get bufferSize() {
+ if (this.pendingReads.length) {
+ return Math.max(this.pendingReads[0].length, BUFFER_SIZE);
+ }
+ return BUFFER_SIZE;
+ }
+
+ /**
+ * Attempts to fill the input buffer.
+ *
+ * @private
+ */
+ fillBuffer() {
+ let dataWanted = this.bufferSize - this.dataAvailable;
+
+ if (!this._pendingBufferRead && dataWanted > 0) {
+ this._pendingBufferRead = this._read(dataWanted);
+
+ this._pendingBufferRead.then((result) => {
+ this._pendingBufferRead = null;
+
+ if (result) {
+ this.onInput(result.buffer);
+
+ this.fillBuffer();
+ }
+ });
+ }
+ }
+
+ _read(size) {
+ let args = [this.id, size];
+
+ return this.worker.call("read", args).catch(e => {
+ this.closed = true;
+
+ for (let {length, resolve, reject} of this.pendingReads.splice(0)) {
+ if (length === null && e.errorCode === SubprocessConstants.ERROR_END_OF_FILE) {
+ resolve(new ArrayBuffer(0));
+ } else {
+ reject(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Adds the given data to the end of the input buffer.
+ *
+ * @param {ArrayBuffer} buffer
+ * An input buffer to append to the current buffered input.
+ * @private
+ */
+ onInput(buffer) {
+ this.buffers.push(buffer);
+ this.dataAvailable += buffer.byteLength;
+ this.checkPendingReads();
+ }
+
+ /**
+ * Checks the topmost pending read operations and fulfills as many as can be
+ * filled from the current input buffer.
+ *
+ * @private
+ */
+ checkPendingReads() {
+ this.fillBuffer();
+
+ let reads = this.pendingReads;
+ while (reads.length && this.dataAvailable &&
+ reads[0].length <= this.dataAvailable) {
+ let pending = this.pendingReads.shift();
+
+ let length = pending.length || this.dataAvailable;
+
+ let result;
+ let byteLength = this.buffers[0].byteLength;
+ if (byteLength == length) {
+ result = this.buffers.shift();
+ } else if (byteLength > length) {
+ let buffer = this.buffers[0];
+
+ this.buffers[0] = buffer.slice(length);
+ result = ArrayBuffer.transfer(buffer, length);
+ } else {
+ result = ArrayBuffer.transfer(this.buffers.shift(), length);
+ let u8result = new Uint8Array(result);
+
+ while (byteLength < length) {
+ let buffer = this.buffers[0];
+ let u8buffer = new Uint8Array(buffer);
+
+ let remaining = length - byteLength;
+
+ if (buffer.byteLength <= remaining) {
+ this.buffers.shift();
+
+ u8result.set(u8buffer, byteLength);
+ } else {
+ this.buffers[0] = buffer.slice(remaining);
+
+ u8result.set(u8buffer.subarray(0, remaining), byteLength);
+ }
+
+ byteLength += Math.min(buffer.byteLength, remaining);
+ }
+ }
+
+ this.dataAvailable -= result.byteLength;
+ pending.resolve(result);
+ }
+ }
+
+ /**
+ * Reads exactly `length` bytes of binary data from the input stream, or, if
+ * length is not provided, reads the first chunk of data to become available.
+ * In the latter case, returns an empty array buffer on end of file.
+ *
+ * The read operation will not complete until enough data is available to
+ * fulfill the request. If the pipe closes without enough available data to
+ * fulfill the read, the operation fails, and any remaining buffered data is
+ * lost.
+ *
+ * @param {integer} [length]
+ * The number of bytes to read.
+ * @returns {Promise<ArrayBuffer>}
+ *
+ * @rejects {object}
+ * May be rejected with an Error object, or an object with similar
+ * properties. The object will include an `errorCode` property with
+ * one of the following values if it was rejected for the
+ * corresponding reason:
+ *
+ * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before
+ * enough input could be read to satisfy the request.
+ */
+ read(length = null) {
+ if (length !== null && !(Number.isInteger(length) && length >= 0)) {
+ throw new RangeError("Length must be a non-negative integer");
+ }
+
+ if (length == 0) {
+ return Promise.resolve(new ArrayBuffer(0));
+ }
+
+ return new Promise((resolve, reject) => {
+ this.pendingReads.push({length, resolve, reject});
+ this.checkPendingReads();
+ });
+ }
+
+ /**
+ * Reads exactly `length` bytes from the input stream, and parses them as
+ * UTF-8 JSON data.
+ *
+ * @param {integer} length
+ * The number of bytes to read.
+ * @returns {Promise<object>}
+ *
+ * @rejects {object}
+ * May be rejected with an Error object, or an object with similar
+ * properties. The object will include an `errorCode` property with
+ * one of the following values if it was rejected for the
+ * corresponding reason:
+ *
+ * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before
+ * enough input could be read to satisfy the request.
+ * - Subprocess.ERROR_INVALID_JSON: The data read from the pipe
+ * could not be parsed as a valid JSON string.
+ */
+ readJSON(length) {
+ if (!Number.isInteger(length) || length <= 0) {
+ throw new RangeError("Length must be a positive integer");
+ }
+
+ return this.readString(length).then(string => {
+ try {
+ return JSON.parse(string);
+ } catch (e) {
+ e.errorCode = SubprocessConstants.ERROR_INVALID_JSON;
+ throw e;
+ }
+ });
+ }
+
+ /**
+ * Reads a chunk of UTF-8 data from the input stream, and converts it to a
+ * JavaScript string.
+ *
+ * If `length` is provided, reads exactly `length` bytes. Otherwise, reads the
+ * first chunk of data to become available, and returns an empty string on end
+ * of file. In the latter case, the chunk is decoded in streaming mode, and
+ * any incomplete UTF-8 sequences at the end of a chunk are returned at the
+ * start of a subsequent read operation.
+ *
+ * @param {integer} [length]
+ * The number of bytes to read.
+ * @param {object} [options]
+ * An options object as expected by TextDecoder.decode.
+ * @returns {Promise<string>}
+ *
+ * @rejects {object}
+ * May be rejected with an Error object, or an object with similar
+ * properties. The object will include an `errorCode` property with
+ * one of the following values if it was rejected for the
+ * corresponding reason:
+ *
+ * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before
+ * enough input could be read to satisfy the request.
+ */
+ readString(length = null, options = {stream: length === null}) {
+ if (length !== null && !(Number.isInteger(length) && length >= 0)) {
+ throw new RangeError("Length must be a non-negative integer");
+ }
+
+ return this.read(length).then(buffer => {
+ return this.decoder.decode(buffer, options);
+ });
+ }
+
+ /**
+ * Reads 4 bytes from the input stream, and parses them as an unsigned
+ * integer, in native byte order.
+ *
+ * @returns {Promise<integer>}
+ *
+ * @rejects {object}
+ * May be rejected with an Error object, or an object with similar
+ * properties. The object will include an `errorCode` property with
+ * one of the following values if it was rejected for the
+ * corresponding reason:
+ *
+ * - Subprocess.ERROR_END_OF_FILE: The pipe was closed before
+ * enough input could be read to satisfy the request.
+ */
+ readUint32() {
+ return this.read(4).then(buffer => {
+ return new Uint32Array(buffer)[0];
+ });
+ }
+}
+
+/**
+ * @class Process
+ * @extends BaseProcess
+ */
+
+/**
+ * Represents a currently-running process, and allows interaction with it.
+ */
+class BaseProcess {
+ /**
+ * @param {PromiseWorker} worker
+ * The worker instance which owns the process.
+ * @param {integer} processId
+ * The internal ID of the Process object, which ties it to the
+ * corresponding process on the Worker side.
+ * @param {integer[]} fds
+ * An array of internal Pipe IDs, one for each standard file descriptor
+ * in the child process.
+ * @param {integer} pid
+ * The operating system process ID of the process.
+ */
+ constructor(worker, processId, fds, pid) {
+ this.id = processId;
+ this.worker = worker;
+
+ /**
+ * @property {integer} pid
+ * The process ID of the process, assigned by the operating system.
+ * @readonly
+ */
+ this.pid = pid;
+
+ this.exitCode = null;
+
+ this.exitPromise = new Promise(resolve => {
+ this.worker.call("wait", [this.id]).then(({exitCode}) => {
+ resolve(Object.freeze({exitCode}));
+ this.exitCode = exitCode;
+ });
+ });
+
+ if (fds[0] !== undefined) {
+ /**
+ * @property {OutputPipe} stdin
+ * A Pipe object which allows writing to the process's standard
+ * input.
+ * @readonly
+ */
+ this.stdin = new OutputPipe(this, 0, fds[0]);
+ }
+ if (fds[1] !== undefined) {
+ /**
+ * @property {InputPipe} stdout
+ * A Pipe object which allows reading from the process's standard
+ * output.
+ * @readonly
+ */
+ this.stdout = new InputPipe(this, 1, fds[1]);
+ }
+ if (fds[2] !== undefined) {
+ /**
+ * @property {InputPipe} [stderr]
+ * An optional Pipe object which allows reading from the
+ * process's standard error output.
+ * @readonly
+ */
+ this.stderr = new InputPipe(this, 2, fds[2]);
+ }
+ }
+
+ /**
+ * Spawns a process, and resolves to a BaseProcess instance on success.
+ *
+ * @param {object} options
+ * An options object as passed to `Subprocess.call`.
+ *
+ * @returns {Promise<BaseProcess>}
+ */
+ static create(options) {
+ let worker = this.getWorker();
+
+ return worker.call("spawn", [options]).then(({processId, fds, pid}) => {
+ return new this(worker, processId, fds, pid);
+ });
+ }
+
+ static get WORKER_URL() {
+ throw new Error("Not implemented");
+ }
+
+ static get WorkerClass() {
+ return PromiseWorker;
+ }
+
+ /**
+ * Gets the current subprocess worker, or spawns a new one if it does not
+ * currently exist.
+ *
+ * @returns {PromiseWorker}
+ */
+ static getWorker() {
+ if (!this._worker) {
+ this._worker = new this.WorkerClass(this.WORKER_URL);
+ }
+ return this._worker;
+ }
+
+ /**
+ * Kills the process.
+ *
+ * @param {integer} [timeout=300]
+ * A timeout, in milliseconds, after which the process will be forcibly
+ * killed. On platforms which support it, the process will be sent
+ * a `SIGTERM` signal immediately, so that it has a chance to terminate
+ * gracefully, and a `SIGKILL` signal if it hasn't exited within
+ * `timeout` milliseconds. On other platforms (namely Windows), the
+ * process will be forcibly terminated immediately.
+ *
+ * @returns {Promise<object>}
+ * Resolves to an object with an `exitCode` property when the process
+ * has exited.
+ */
+ kill(timeout = 300) {
+ // If the process has already exited, don't bother sending a signal.
+ if (this.exitCode != null) {
+ return this.wait();
+ }
+
+ let force = timeout <= 0;
+ this.worker.call("kill", [this.id, force]);
+
+ if (!force) {
+ setTimeout(() => {
+ if (this.exitCode == null) {
+ this.worker.call("kill", [this.id, true]);
+ }
+ }, timeout);
+ }
+
+ return this.wait();
+ }
+
+ /**
+ * Returns a promise which resolves to the process's exit code, once it has
+ * exited.
+ *
+ * @returns {Promise<object>}
+ * Resolves to an object with an `exitCode` property, containing the
+ * process's exit code, once the process has exited.
+ *
+ * On Unix-like systems, a negative exit code indicates that the
+ * process was killed by a signal whose signal number is the absolute
+ * value of the error code. On Windows, an exit code of -9 indicates
+ * that the process was killed via the {@linkcode BaseProcess#kill kill()}
+ * method.
+ */
+ wait() {
+ return this.exitPromise;
+ }
+}