diff options
Diffstat (limited to 'toolkit/jetpack/sdk/event/utils.js')
-rw-r--r-- | toolkit/jetpack/sdk/event/utils.js | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/toolkit/jetpack/sdk/event/utils.js b/toolkit/jetpack/sdk/event/utils.js new file mode 100644 index 000000000..f193b6785 --- /dev/null +++ b/toolkit/jetpack/sdk/event/utils.js @@ -0,0 +1,328 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +module.metadata = { + "stability": "unstable" +}; + +var { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core"); +const { Cu } = require("chrome"); + +// This module provides set of high order function for working with event +// streams (streams in a NodeJS style that dispatch data, end and error +// events). + +// Function takes a `target` object and returns set of implicit references +// (non property references) it keeps. This basically allows defining +// references between objects without storing the explicitly. See transform for +// more details. +var refs = (function() { + let refSets = new WeakMap(); + return function refs(target) { + if (!refSets.has(target)) refSets.set(target, new Set()); + return refSets.get(target); + }; +})(); + +function transform(input, f) { + let output = new Output(); + + // Since event listeners don't prevent `input` to be GC-ed we wanna presrve + // it until `output` can be GC-ed. There for we add implicit reference which + // is removed once `input` ends. + refs(output).add(input); + + const next = data => receive(output, data); + once(output, "start", () => start(input)); + on(input, "error", error => emit(output, "error", error)); + on(input, "end", function() { + refs(output).delete(input); + end(output); + }); + on(input, "data", data => f(data, next)); + return output; +} + +// High order event transformation function that takes `input` event channel +// and returns transformation containing only events on which `p` predicate +// returns `true`. +function filter(input, predicate) { + return transform(input, function(data, next) { + if (predicate(data)) + next(data); + }); +} +exports.filter = filter; + +// High order function that takes `input` and returns input of it's values +// mapped via given `f` function. +const map = (input, f) => transform(input, (data, next) => next(f(data))); +exports.map = map; + +// High order function that takes `input` stream of streams and merges them +// into single event stream. Like flatten but time based rather than order +// based. +function merge(inputs) { + let output = new Output(); + let open = 1; + let state = []; + output.state = state; + refs(output).add(inputs); + + function end(input) { + open = open - 1; + refs(output).delete(input); + if (open === 0) emit(output, "end"); + } + const error = e => emit(output, "error", e); + function forward(input) { + state.push(input); + open = open + 1; + on(input, "end", () => end(input)); + on(input, "error", error); + on(input, "data", data => emit(output, "data", data)); + } + + // If `inputs` is an array treat it as a stream. + if (Array.isArray(inputs)) { + inputs.forEach(forward); + end(inputs); + } + else { + on(inputs, "end", () => end(inputs)); + on(inputs, "error", error); + on(inputs, "data", forward); + } + + return output; +} +exports.merge = merge; + +const expand = (inputs, f) => merge(map(inputs, f)); +exports.expand = expand; + +const pipe = (from, to) => on(from, "*", emit.bind(emit, to)); +exports.pipe = pipe; + + +// Shim signal APIs so other modules can be used as is. +const receive = (input, message) => { + if (input[receive]) + input[receive](input, message); + else + emit(input, "data", message); + + // Ideally our input will extend Input and already provide a weak value + // getter. If not, opportunistically shim the weak value getter on + // other types passed as the input. + if (!("value" in input)) { + Object.defineProperty(input, "value", WeakValueGetterSetter); + } + input.value = message; +}; +receive.toString = () => "@@receive"; +exports.receive = receive; +exports.send = receive; + +const end = input => { + if (input[end]) + input[end](input); + else + emit(input, "end", input); +}; +end.toString = () => "@@end"; +exports.end = end; + +const stop = input => { + if (input[stop]) + input[stop](input); + else + emit(input, "stop", input); +}; +stop.toString = () => "@@stop"; +exports.stop = stop; + +const start = input => { + if (input[start]) + input[start](input); + else + emit(input, "start", input); +}; +start.toString = () => "@@start"; +exports.start = start; + +const lift = (step, ...inputs) => { + let args = null; + let opened = inputs.length; + let started = false; + const output = new Output(); + const init = () => { + args = [...inputs.map(input => input.value)]; + output.value = step(...args); + }; + + inputs.forEach((input, index) => { + on(input, "data", data => { + args[index] = data; + receive(output, step(...args)); + }); + on(input, "end", () => { + opened = opened - 1; + if (opened <= 0) + end(output); + }); + }); + + once(output, "start", () => { + inputs.forEach(start); + init(); + }); + + init(); + + return output; +}; +exports.lift = lift; + +const merges = inputs => { + let opened = inputs.length; + let output = new Output(); + output.value = inputs[0].value; + inputs.forEach((input, index) => { + on(input, "data", data => receive(output, data)); + on(input, "end", () => { + opened = opened - 1; + if (opened <= 0) + end(output); + }); + }); + + once(output, "start", () => { + inputs.forEach(start); + output.value = inputs[0].value; + }); + + return output; +}; +exports.merges = merges; + +const foldp = (step, initial, input) => { + let output = map(input, x => step(output.value, x)); + output.value = initial; + return output; +}; +exports.foldp = foldp; + +const keepIf = (p, base, input) => { + let output = filter(input, p); + output.value = base; + return output; +}; +exports.keepIf = keepIf; + +function Input() {} +Input.start = input => emit(input, "start", input); +Input.prototype.start = Input.start; + +Input.end = input => { + emit(input, "end", input); + stop(input); +}; +Input.prototype[end] = Input.end; + +// The event channel system caches the last event seen as input.value. +// Unfortunately, if the last event is a DOM object this is a great way +// leak windows. Mitigate this by storing input.value using a weak +// reference. This allows the system to work for normal event processing +// while also allowing the objects to be reclaimed. It means, however, +// input.value cannot be accessed long after the event was dispatched. +const WeakValueGetterSetter = { + get: function() { + return this._weakValue ? this._weakValue.get() : this._simpleValue + }, + set: function(v) { + if (v && typeof v === "object") { + try { + // Try to set a weak reference. This can throw for some values. + // For example, if the value is a native object that does not + // implement nsISupportsWeakReference. + this._weakValue = Cu.getWeakReference(v) + this._simpleValue = undefined; + return; + } catch (e) { + // Do nothing. Fall through to setting _simpleValue below. + } + } + this._simpleValue = v; + this._weakValue = undefined; + }, +} +Object.defineProperty(Input.prototype, "value", WeakValueGetterSetter); + +exports.Input = Input; + +// Define an Output type with a weak value getter for the transformation +// functions that produce new channels. +function Output() { } +Object.defineProperty(Output.prototype, "value", WeakValueGetterSetter); +exports.Output = Output; + +const $source = "@@source"; +const $outputs = "@@outputs"; +exports.outputs = $outputs; + +// NOTE: Passing DOM objects through a Reactor can cause them to leak +// when they get cached in this.value. We cannot use a weak reference +// in this case because the Reactor design expects to always have both the +// past and present value. If we allow past values to be collected the +// system breaks. + +function Reactor(options={}) { + const {onStep, onStart, onEnd} = options; + if (onStep) + this.onStep = onStep; + if (onStart) + this.onStart = onStart; + if (onEnd) + this.onEnd = onEnd; +} +Reactor.prototype.onStep = _ => void(0); +Reactor.prototype.onStart = _ => void(0); +Reactor.prototype.onEnd = _ => void(0); +Reactor.prototype.onNext = function(present, past) { + this.value = present; + this.onStep(present, past); +}; +Reactor.prototype.run = function(input) { + on(input, "data", message => this.onNext(message, input.value)); + on(input, "end", () => this.onEnd(input.value)); + start(input); + this.value = input.value; + this.onStart(input.value); +}; +exports.Reactor = Reactor; + +/** + * Takes an object used as options with potential keys like 'onMessage', + * used to be called `require('sdk/event/core').setListeners` on. + * This strips all keys that would trigger a listener to be set. + * + * @params {Object} object + * @return {Object} + */ + +function stripListeners (object) { + return Object.keys(object || {}).reduce((agg, key) => { + if (!EVENT_TYPE_PATTERN.test(key)) + agg[key] = object[key]; + return agg; + }, {}); +} +exports.stripListeners = stripListeners; + +const when = (target, type) => new Promise(resolve => { + once(target, type, resolve); +}); +exports.when = when; |