1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
|
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
module.metadata = {
"stability": "unstable"
};
var { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core");
const { Cu } = require("chrome");
// This module provides set of high order function for working with event
// streams (streams in a NodeJS style that dispatch data, end and error
// events).
// Function takes a `target` object and returns set of implicit references
// (non property references) it keeps. This basically allows defining
// references between objects without storing the explicitly. See transform for
// more details.
var refs = (function() {
let refSets = new WeakMap();
return function refs(target) {
if (!refSets.has(target)) refSets.set(target, new Set());
return refSets.get(target);
};
})();
function transform(input, f) {
let output = new Output();
// Since event listeners don't prevent `input` to be GC-ed we wanna presrve
// it until `output` can be GC-ed. There for we add implicit reference which
// is removed once `input` ends.
refs(output).add(input);
const next = data => receive(output, data);
once(output, "start", () => start(input));
on(input, "error", error => emit(output, "error", error));
on(input, "end", function() {
refs(output).delete(input);
end(output);
});
on(input, "data", data => f(data, next));
return output;
}
// High order event transformation function that takes `input` event channel
// and returns transformation containing only events on which `p` predicate
// returns `true`.
function filter(input, predicate) {
return transform(input, function(data, next) {
if (predicate(data))
next(data);
});
}
exports.filter = filter;
// High order function that takes `input` and returns input of it's values
// mapped via given `f` function.
const map = (input, f) => transform(input, (data, next) => next(f(data)));
exports.map = map;
// High order function that takes `input` stream of streams and merges them
// into single event stream. Like flatten but time based rather than order
// based.
function merge(inputs) {
let output = new Output();
let open = 1;
let state = [];
output.state = state;
refs(output).add(inputs);
function end(input) {
open = open - 1;
refs(output).delete(input);
if (open === 0) emit(output, "end");
}
const error = e => emit(output, "error", e);
function forward(input) {
state.push(input);
open = open + 1;
on(input, "end", () => end(input));
on(input, "error", error);
on(input, "data", data => emit(output, "data", data));
}
// If `inputs` is an array treat it as a stream.
if (Array.isArray(inputs)) {
inputs.forEach(forward);
end(inputs);
}
else {
on(inputs, "end", () => end(inputs));
on(inputs, "error", error);
on(inputs, "data", forward);
}
return output;
}
exports.merge = merge;
const expand = (inputs, f) => merge(map(inputs, f));
exports.expand = expand;
const pipe = (from, to) => on(from, "*", emit.bind(emit, to));
exports.pipe = pipe;
// Shim signal APIs so other modules can be used as is.
const receive = (input, message) => {
if (input[receive])
input[receive](input, message);
else
emit(input, "data", message);
// Ideally our input will extend Input and already provide a weak value
// getter. If not, opportunistically shim the weak value getter on
// other types passed as the input.
if (!("value" in input)) {
Object.defineProperty(input, "value", WeakValueGetterSetter);
}
input.value = message;
};
receive.toString = () => "@@receive";
exports.receive = receive;
exports.send = receive;
const end = input => {
if (input[end])
input[end](input);
else
emit(input, "end", input);
};
end.toString = () => "@@end";
exports.end = end;
const stop = input => {
if (input[stop])
input[stop](input);
else
emit(input, "stop", input);
};
stop.toString = () => "@@stop";
exports.stop = stop;
const start = input => {
if (input[start])
input[start](input);
else
emit(input, "start", input);
};
start.toString = () => "@@start";
exports.start = start;
const lift = (step, ...inputs) => {
let args = null;
let opened = inputs.length;
let started = false;
const output = new Output();
const init = () => {
args = [...inputs.map(input => input.value)];
output.value = step(...args);
};
inputs.forEach((input, index) => {
on(input, "data", data => {
args[index] = data;
receive(output, step(...args));
});
on(input, "end", () => {
opened = opened - 1;
if (opened <= 0)
end(output);
});
});
once(output, "start", () => {
inputs.forEach(start);
init();
});
init();
return output;
};
exports.lift = lift;
const merges = inputs => {
let opened = inputs.length;
let output = new Output();
output.value = inputs[0].value;
inputs.forEach((input, index) => {
on(input, "data", data => receive(output, data));
on(input, "end", () => {
opened = opened - 1;
if (opened <= 0)
end(output);
});
});
once(output, "start", () => {
inputs.forEach(start);
output.value = inputs[0].value;
});
return output;
};
exports.merges = merges;
const foldp = (step, initial, input) => {
let output = map(input, x => step(output.value, x));
output.value = initial;
return output;
};
exports.foldp = foldp;
const keepIf = (p, base, input) => {
let output = filter(input, p);
output.value = base;
return output;
};
exports.keepIf = keepIf;
function Input() {}
Input.start = input => emit(input, "start", input);
Input.prototype.start = Input.start;
Input.end = input => {
emit(input, "end", input);
stop(input);
};
Input.prototype[end] = Input.end;
// The event channel system caches the last event seen as input.value.
// Unfortunately, if the last event is a DOM object this is a great way
// leak windows. Mitigate this by storing input.value using a weak
// reference. This allows the system to work for normal event processing
// while also allowing the objects to be reclaimed. It means, however,
// input.value cannot be accessed long after the event was dispatched.
const WeakValueGetterSetter = {
get: function() {
return this._weakValue ? this._weakValue.get() : this._simpleValue
},
set: function(v) {
if (v && typeof v === "object") {
try {
// Try to set a weak reference. This can throw for some values.
// For example, if the value is a native object that does not
// implement nsISupportsWeakReference.
this._weakValue = Cu.getWeakReference(v)
this._simpleValue = undefined;
return;
} catch (e) {
// Do nothing. Fall through to setting _simpleValue below.
}
}
this._simpleValue = v;
this._weakValue = undefined;
},
}
Object.defineProperty(Input.prototype, "value", WeakValueGetterSetter);
exports.Input = Input;
// Define an Output type with a weak value getter for the transformation
// functions that produce new channels.
function Output() { }
Object.defineProperty(Output.prototype, "value", WeakValueGetterSetter);
exports.Output = Output;
const $source = "@@source";
const $outputs = "@@outputs";
exports.outputs = $outputs;
// NOTE: Passing DOM objects through a Reactor can cause them to leak
// when they get cached in this.value. We cannot use a weak reference
// in this case because the Reactor design expects to always have both the
// past and present value. If we allow past values to be collected the
// system breaks.
function Reactor(options={}) {
const {onStep, onStart, onEnd} = options;
if (onStep)
this.onStep = onStep;
if (onStart)
this.onStart = onStart;
if (onEnd)
this.onEnd = onEnd;
}
Reactor.prototype.onStep = _ => void(0);
Reactor.prototype.onStart = _ => void(0);
Reactor.prototype.onEnd = _ => void(0);
Reactor.prototype.onNext = function(present, past) {
this.value = present;
this.onStep(present, past);
};
Reactor.prototype.run = function(input) {
on(input, "data", message => this.onNext(message, input.value));
on(input, "end", () => this.onEnd(input.value));
start(input);
this.value = input.value;
this.onStart(input.value);
};
exports.Reactor = Reactor;
/**
* Takes an object used as options with potential keys like 'onMessage',
* used to be called `require('sdk/event/core').setListeners` on.
* This strips all keys that would trigger a listener to be set.
*
* @params {Object} object
* @return {Object}
*/
function stripListeners (object) {
return Object.keys(object || {}).reduce((agg, key) => {
if (!EVENT_TYPE_PATTERN.test(key))
agg[key] = object[key];
return agg;
}, {});
}
exports.stripListeners = stripListeners;
const when = (target, type) => new Promise(resolve => {
once(target, type, resolve);
});
exports.when = when;
|