diff options
Diffstat (limited to 'toolkit/components/promiseworker/PromiseWorker.jsm')
-rw-r--r-- | toolkit/components/promiseworker/PromiseWorker.jsm | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/toolkit/components/promiseworker/PromiseWorker.jsm b/toolkit/components/promiseworker/PromiseWorker.jsm new file mode 100644 index 000000000..0c6e054a2 --- /dev/null +++ b/toolkit/components/promiseworker/PromiseWorker.jsm @@ -0,0 +1,390 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * A wrapper around ChromeWorker with extended capabilities designed + * to simplify main thread-to-worker thread asynchronous function calls. + * + * This wrapper: + * - groups requests and responses as a method `post` that returns a `Promise`; + * - ensures that exceptions thrown on the worker thread are correctly deserialized; + * - provides some utilities for benchmarking various operations. + * + * Generally, you should use PromiseWorker.jsm along with its worker-side + * counterpart PromiseWorker.js. + */ + +"use strict"; + +this.EXPORTED_SYMBOLS = ["BasePromiseWorker"]; + +const Cu = Components.utils; +const Ci = Components.interfaces; + +Cu.import("resource://gre/modules/XPCOMUtils.jsm", this); + +XPCOMUtils.defineLazyModuleGetter(this, "Promise", + "resource://gre/modules/Promise.jsm"); +XPCOMUtils.defineLazyModuleGetter(this, "Task", + "resource://gre/modules/Task.jsm"); + +/** + * An implementation of queues (FIFO). + * + * The current implementation uses one array, runs in O(n ^ 2), and is optimized + * for the case in which queues are generally short. + */ +function Queue() { + this._array = []; +} +Queue.prototype = { + pop: function pop() { + return this._array.shift(); + }, + push: function push(x) { + return this._array.push(x); + }, + isEmpty: function isEmpty() { + return this._array.length == 0; + } +}; + +/** + * Constructors for decoding standard exceptions received from the + * worker. + */ +const EXCEPTION_CONSTRUCTORS = { + EvalError: function(error) { + let result = new EvalError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + InternalError: function(error) { + let result = new InternalError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + RangeError: function(error) { + let result = new RangeError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + ReferenceError: function(error) { + let result = new ReferenceError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + SyntaxError: function(error) { + let result = new SyntaxError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + TypeError: function(error) { + let result = new TypeError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + URIError: function(error) { + let result = new URIError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + StopIteration: function() { + return StopIteration; + } +}; + +/** + * An object responsible for dispatching messages to a chrome worker + * and routing the responses. + * + * Instances of this constructor who need logging may provide a method + * `log: function(...args) { ... }` in charge of printing out (or + * discarding) logs. + * + * Instances of this constructor may add exception handlers to + * `this.ExceptionHandlers`, if they need to handle custom exceptions. + * + * @param {string} url The url containing the source code for this worker, + * as in constructor ChromeWorker. + * + * @constructor + */ +this.BasePromiseWorker = function(url) { + if (typeof url != "string") { + throw new TypeError("Expecting a string"); + } + this._url = url; + + /** + * A set of methods, with the following + * + * ConstructorName: function({message, fileName, lineNumber}) { + * // Construct a new instance of ConstructorName based on + * // `message`, `fileName`, `lineNumber` + * } + * + * By default, this covers EvalError, InternalError, RangeError, + * ReferenceError, SyntaxError, TypeError, URIError, StopIteration. + */ + this.ExceptionHandlers = Object.create(EXCEPTION_CONSTRUCTORS); + + /** + * The queue of deferred, waiting for the completion of their + * respective job by the worker. + * + * Each item in the list may contain an additional field |closure|, + * used to store strong references to value that must not be + * garbage-collected before the reply has been received (e.g. + * arrays). + * + * @type {Queue<{deferred:deferred, closure:*=}>} + */ + this._queue = new Queue(); + + /** + * The number of the current message. + * + * Used for debugging purposes. + */ + this._id = 0; + + /** + * The instant at which the worker was launched. + */ + this.launchTimeStamp = null; + + /** + * Timestamps provided by the worker for statistics purposes. + */ + this.workerTimeStamps = null; +}; +this.BasePromiseWorker.prototype = { + log: function() { + // By Default, ignore all logs. + }, + + /** + * Instantiate the worker lazily. + */ + get _worker() { + delete this._worker; + let worker = new ChromeWorker(this._url); + Object.defineProperty(this, "_worker", {value: + worker + }); + + // We assume that we call to _worker for the purpose of calling + // postMessage(). + this.launchTimeStamp = Date.now(); + + /** + * Receive errors that have been serialized by the built-in mechanism + * of DOM/Chrome Workers. + * + * PromiseWorker.js knows how to serialize a number of errors + * without losing information. These are treated by + * |worker.onmessage|. However, for other errors, we rely on + * DOM's mechanism for serializing errors, which transmits these + * errors through |worker.onerror|. + * + * @param {Error} error Some JS error. + */ + worker.onerror = error => { + this.log("Received uncaught error from worker", error.message, error.filename, error.lineno); + error.preventDefault(); + let {deferred} = this._queue.pop(); + deferred.reject(error); + }; + + /** + * Receive messages from the worker, propagate them to the listeners. + * + * Messages must have one of the following shapes: + * - {ok: some_value} in case of success + * - {fail: some_error} in case of error, where + * some_error is an instance of |PromiseWorker.WorkerError| + * + * Messages may also contain a field |id| to help + * with debugging. + * + * Messages may also optionally contain a field |durationMs|, holding + * the duration of the function call in milliseconds. + * + * @param {*} msg The message received from the worker. + */ + worker.onmessage = msg => { + this.log("Received message from worker", msg.data); + let handler = this._queue.pop(); + let deferred = handler.deferred; + let data = msg.data; + if (data.id != handler.id) { + throw new Error("Internal error: expecting msg " + handler.id + ", " + + " got " + data.id + ": " + JSON.stringify(msg.data)); + } + if ("timeStamps" in data) { + this.workerTimeStamps = data.timeStamps; + } + if ("ok" in data) { + // Pass the data to the listeners. + deferred.resolve(data); + } else if ("fail" in data) { + // We have received an error that was serialized by the + // worker. + deferred.reject(new WorkerError(data.fail)); + } + }; + return worker; + }, + + /** + * Post a message to a worker. + * + * @param {string} fun The name of the function to call. + * @param {Array} args The arguments to pass to `fun`. If any + * of the arguments is a Promise, it is resolved before posting the + * message. If any of the arguments needs to be transfered instead + * of copied, this may be specified by making the argument an instance + * of `BasePromiseWorker.Meta` or by using the `transfers` argument. + * By convention, the last argument may be an object `options` + * with some of the following fields: + * - {number|null} outExecutionDuration A parameter to be filled with the + * duration of the off main thread execution for this call. + * @param {*=} closure An object holding references that should not be + * garbage-collected before the message treatment is complete. + * @param {Array=} transfers An array of objects that should be transfered + * to the worker instead of being copied. If any of the objects is a Promise, + * it is resolved before posting the message. + * + * @return {promise} + */ + post: function(fun, args, closure, transfers) { + return Task.spawn(function* postMessage() { + // Normalize in case any of the arguments is a promise + if (args) { + args = yield Promise.resolve(Promise.all(args)); + } + if (transfers) { + transfers = yield Promise.resolve(Promise.all(transfers)); + } else { + transfers = []; + } + + if (args) { + // Extract `Meta` data + args = args.map(arg => { + if (arg instanceof BasePromiseWorker.Meta) { + if (arg.meta && "transfers" in arg.meta) { + transfers.push(...arg.meta.transfers); + } + return arg.data; + } + return arg; + }); + } + + let id = ++this._id; + let message = {fun: fun, args: args, id: id}; + this.log("Posting message", message); + try { + this._worker.postMessage(message, ...[transfers]); + } catch (ex) { + if (typeof ex == "number") { + this.log("Could not post message", message, "due to xpcom error", ex); + // handle raw xpcom errors (see eg bug 961317) + throw new Components.Exception("Error in postMessage", ex); + } + + this.log("Could not post message", message, "due to error", ex); + throw ex; + } + + let deferred = Promise.defer(); + this._queue.push({deferred:deferred, closure: closure, id: id}); + this.log("Message posted"); + + let reply; + try { + this.log("Expecting reply"); + reply = yield deferred.promise; + } catch (error) { + this.log("Got error", error); + reply = error; + + if (error instanceof WorkerError) { + // We know how to deserialize most well-known errors + throw this.ExceptionHandlers[error.data.exn](error.data); + } + + if (error instanceof ErrorEvent) { + // Other errors get propagated as instances of ErrorEvent + this.log("Error serialized by DOM", error.message, error.filename, error.lineno); + throw new Error(error.message, error.filename, error.lineno); + } + + // We don't know about this kind of error + throw error; + } + + // By convention, the last argument may be an object `options`. + let options = null; + if (args) { + options = args[args.length - 1]; + } + + // Check for duration and return result. + if (!options || + typeof options !== "object" || + !("outExecutionDuration" in options)) { + return reply.ok; + } + // If reply.durationMs is not present, just return the result, + // without updating durations (there was an error in the method + // dispatch). + if (!("durationMs" in reply)) { + return reply.ok; + } + // Bug 874425 demonstrates that two successive calls to Date.now() + // can actually produce an interval with negative duration. + // We assume that this is due to an operation that is so short + // that Date.now() is not monotonic, so we round this up to 0. + let durationMs = Math.max(0, reply.durationMs); + // Accumulate (or initialize) outExecutionDuration + if (typeof options.outExecutionDuration == "number") { + options.outExecutionDuration += durationMs; + } else { + options.outExecutionDuration = durationMs; + } + return reply.ok; + + }.bind(this)); + } +}; + +/** + * An error that has been serialized by the worker. + * + * @constructor + */ +function WorkerError(data) { + this.data = data; +} + +/** + * A constructor used to send data to the worker thread while + * with special treatment (e.g. transmitting data instead of + * copying it). + * + * @param {object=} data The data to send to the caller thread. + * @param {object=} meta Additional instructions, as an object + * that may contain the following fields: + * - {Array} transfers An array of objects that should be transferred + * instead of being copied. + * + * @constructor + */ +this.BasePromiseWorker.Meta = function(data, meta) { + this.data = data; + this.meta = meta; +}; |