summaryrefslogtreecommitdiffstats
path: root/addon-sdk/source/lib/sdk/event/utils.js
diff options
context:
space:
mode:
Diffstat (limited to 'addon-sdk/source/lib/sdk/event/utils.js')
-rw-r--r--addon-sdk/source/lib/sdk/event/utils.js328
1 files changed, 328 insertions, 0 deletions
diff --git a/addon-sdk/source/lib/sdk/event/utils.js b/addon-sdk/source/lib/sdk/event/utils.js
new file mode 100644
index 000000000..f193b6785
--- /dev/null
+++ b/addon-sdk/source/lib/sdk/event/utils.js
@@ -0,0 +1,328 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+"use strict";
+
+module.metadata = {
+ "stability": "unstable"
+};
+
+var { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core");
+const { Cu } = require("chrome");
+
+// This module provides set of high order function for working with event
+// streams (streams in a NodeJS style that dispatch data, end and error
+// events).
+
+// Function takes a `target` object and returns set of implicit references
+// (non property references) it keeps. This basically allows defining
+// references between objects without storing the explicitly. See transform for
+// more details.
+var refs = (function() {
+ let refSets = new WeakMap();
+ return function refs(target) {
+ if (!refSets.has(target)) refSets.set(target, new Set());
+ return refSets.get(target);
+ };
+})();
+
+function transform(input, f) {
+ let output = new Output();
+
+ // Since event listeners don't prevent `input` to be GC-ed we wanna presrve
+ // it until `output` can be GC-ed. There for we add implicit reference which
+ // is removed once `input` ends.
+ refs(output).add(input);
+
+ const next = data => receive(output, data);
+ once(output, "start", () => start(input));
+ on(input, "error", error => emit(output, "error", error));
+ on(input, "end", function() {
+ refs(output).delete(input);
+ end(output);
+ });
+ on(input, "data", data => f(data, next));
+ return output;
+}
+
+// High order event transformation function that takes `input` event channel
+// and returns transformation containing only events on which `p` predicate
+// returns `true`.
+function filter(input, predicate) {
+ return transform(input, function(data, next) {
+ if (predicate(data))
+ next(data);
+ });
+}
+exports.filter = filter;
+
+// High order function that takes `input` and returns input of it's values
+// mapped via given `f` function.
+const map = (input, f) => transform(input, (data, next) => next(f(data)));
+exports.map = map;
+
+// High order function that takes `input` stream of streams and merges them
+// into single event stream. Like flatten but time based rather than order
+// based.
+function merge(inputs) {
+ let output = new Output();
+ let open = 1;
+ let state = [];
+ output.state = state;
+ refs(output).add(inputs);
+
+ function end(input) {
+ open = open - 1;
+ refs(output).delete(input);
+ if (open === 0) emit(output, "end");
+ }
+ const error = e => emit(output, "error", e);
+ function forward(input) {
+ state.push(input);
+ open = open + 1;
+ on(input, "end", () => end(input));
+ on(input, "error", error);
+ on(input, "data", data => emit(output, "data", data));
+ }
+
+ // If `inputs` is an array treat it as a stream.
+ if (Array.isArray(inputs)) {
+ inputs.forEach(forward);
+ end(inputs);
+ }
+ else {
+ on(inputs, "end", () => end(inputs));
+ on(inputs, "error", error);
+ on(inputs, "data", forward);
+ }
+
+ return output;
+}
+exports.merge = merge;
+
+const expand = (inputs, f) => merge(map(inputs, f));
+exports.expand = expand;
+
+const pipe = (from, to) => on(from, "*", emit.bind(emit, to));
+exports.pipe = pipe;
+
+
+// Shim signal APIs so other modules can be used as is.
+const receive = (input, message) => {
+ if (input[receive])
+ input[receive](input, message);
+ else
+ emit(input, "data", message);
+
+ // Ideally our input will extend Input and already provide a weak value
+ // getter. If not, opportunistically shim the weak value getter on
+ // other types passed as the input.
+ if (!("value" in input)) {
+ Object.defineProperty(input, "value", WeakValueGetterSetter);
+ }
+ input.value = message;
+};
+receive.toString = () => "@@receive";
+exports.receive = receive;
+exports.send = receive;
+
+const end = input => {
+ if (input[end])
+ input[end](input);
+ else
+ emit(input, "end", input);
+};
+end.toString = () => "@@end";
+exports.end = end;
+
+const stop = input => {
+ if (input[stop])
+ input[stop](input);
+ else
+ emit(input, "stop", input);
+};
+stop.toString = () => "@@stop";
+exports.stop = stop;
+
+const start = input => {
+ if (input[start])
+ input[start](input);
+ else
+ emit(input, "start", input);
+};
+start.toString = () => "@@start";
+exports.start = start;
+
+const lift = (step, ...inputs) => {
+ let args = null;
+ let opened = inputs.length;
+ let started = false;
+ const output = new Output();
+ const init = () => {
+ args = [...inputs.map(input => input.value)];
+ output.value = step(...args);
+ };
+
+ inputs.forEach((input, index) => {
+ on(input, "data", data => {
+ args[index] = data;
+ receive(output, step(...args));
+ });
+ on(input, "end", () => {
+ opened = opened - 1;
+ if (opened <= 0)
+ end(output);
+ });
+ });
+
+ once(output, "start", () => {
+ inputs.forEach(start);
+ init();
+ });
+
+ init();
+
+ return output;
+};
+exports.lift = lift;
+
+const merges = inputs => {
+ let opened = inputs.length;
+ let output = new Output();
+ output.value = inputs[0].value;
+ inputs.forEach((input, index) => {
+ on(input, "data", data => receive(output, data));
+ on(input, "end", () => {
+ opened = opened - 1;
+ if (opened <= 0)
+ end(output);
+ });
+ });
+
+ once(output, "start", () => {
+ inputs.forEach(start);
+ output.value = inputs[0].value;
+ });
+
+ return output;
+};
+exports.merges = merges;
+
+const foldp = (step, initial, input) => {
+ let output = map(input, x => step(output.value, x));
+ output.value = initial;
+ return output;
+};
+exports.foldp = foldp;
+
+const keepIf = (p, base, input) => {
+ let output = filter(input, p);
+ output.value = base;
+ return output;
+};
+exports.keepIf = keepIf;
+
+function Input() {}
+Input.start = input => emit(input, "start", input);
+Input.prototype.start = Input.start;
+
+Input.end = input => {
+ emit(input, "end", input);
+ stop(input);
+};
+Input.prototype[end] = Input.end;
+
+// The event channel system caches the last event seen as input.value.
+// Unfortunately, if the last event is a DOM object this is a great way
+// leak windows. Mitigate this by storing input.value using a weak
+// reference. This allows the system to work for normal event processing
+// while also allowing the objects to be reclaimed. It means, however,
+// input.value cannot be accessed long after the event was dispatched.
+const WeakValueGetterSetter = {
+ get: function() {
+ return this._weakValue ? this._weakValue.get() : this._simpleValue
+ },
+ set: function(v) {
+ if (v && typeof v === "object") {
+ try {
+ // Try to set a weak reference. This can throw for some values.
+ // For example, if the value is a native object that does not
+ // implement nsISupportsWeakReference.
+ this._weakValue = Cu.getWeakReference(v)
+ this._simpleValue = undefined;
+ return;
+ } catch (e) {
+ // Do nothing. Fall through to setting _simpleValue below.
+ }
+ }
+ this._simpleValue = v;
+ this._weakValue = undefined;
+ },
+}
+Object.defineProperty(Input.prototype, "value", WeakValueGetterSetter);
+
+exports.Input = Input;
+
+// Define an Output type with a weak value getter for the transformation
+// functions that produce new channels.
+function Output() { }
+Object.defineProperty(Output.prototype, "value", WeakValueGetterSetter);
+exports.Output = Output;
+
+const $source = "@@source";
+const $outputs = "@@outputs";
+exports.outputs = $outputs;
+
+// NOTE: Passing DOM objects through a Reactor can cause them to leak
+// when they get cached in this.value. We cannot use a weak reference
+// in this case because the Reactor design expects to always have both the
+// past and present value. If we allow past values to be collected the
+// system breaks.
+
+function Reactor(options={}) {
+ const {onStep, onStart, onEnd} = options;
+ if (onStep)
+ this.onStep = onStep;
+ if (onStart)
+ this.onStart = onStart;
+ if (onEnd)
+ this.onEnd = onEnd;
+}
+Reactor.prototype.onStep = _ => void(0);
+Reactor.prototype.onStart = _ => void(0);
+Reactor.prototype.onEnd = _ => void(0);
+Reactor.prototype.onNext = function(present, past) {
+ this.value = present;
+ this.onStep(present, past);
+};
+Reactor.prototype.run = function(input) {
+ on(input, "data", message => this.onNext(message, input.value));
+ on(input, "end", () => this.onEnd(input.value));
+ start(input);
+ this.value = input.value;
+ this.onStart(input.value);
+};
+exports.Reactor = Reactor;
+
+/**
+ * Takes an object used as options with potential keys like 'onMessage',
+ * used to be called `require('sdk/event/core').setListeners` on.
+ * This strips all keys that would trigger a listener to be set.
+ *
+ * @params {Object} object
+ * @return {Object}
+ */
+
+function stripListeners (object) {
+ return Object.keys(object || {}).reduce((agg, key) => {
+ if (!EVENT_TYPE_PATTERN.test(key))
+ agg[key] = object[key];
+ return agg;
+ }, {});
+}
+exports.stripListeners = stripListeners;
+
+const when = (target, type) => new Promise(resolve => {
+ once(target, type, resolve);
+});
+exports.when = when;