summaryrefslogtreecommitdiffstats
path: root/addon-sdk/source/lib/sdk/io/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'addon-sdk/source/lib/sdk/io/stream.js')
-rw-r--r--addon-sdk/source/lib/sdk/io/stream.js440
1 files changed, 440 insertions, 0 deletions
diff --git a/addon-sdk/source/lib/sdk/io/stream.js b/addon-sdk/source/lib/sdk/io/stream.js
new file mode 100644
index 000000000..0698b8e32
--- /dev/null
+++ b/addon-sdk/source/lib/sdk/io/stream.js
@@ -0,0 +1,440 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+"use strict";
+
+module.metadata = {
+ "stability": "experimental"
+};
+
+const { CC, Cc, Ci, Cu, Cr, components } = require("chrome");
+const { EventTarget } = require("../event/target");
+const { emit } = require("../event/core");
+const { Buffer } = require("./buffer");
+const { Class } = require("../core/heritage");
+const { setTimeout } = require("../timers");
+
+
+const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1",
+ "nsIMultiplexInputStream");
+const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
+ "nsIAsyncStreamCopier", "init");
+const StringInputStream = CC("@mozilla.org/io/string-input-stream;1",
+ "nsIStringInputStream");
+const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
+ "nsIArrayBufferInputStream");
+
+const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
+ "nsIBinaryInputStream", "setInputStream");
+const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1",
+ "nsIInputStreamPump", "init");
+
+const threadManager = Cc["@mozilla.org/thread-manager;1"].
+ getService(Ci.nsIThreadManager);
+
+const eventTarget = Cc["@mozilla.org/network/stream-transport-service;1"].
+ getService(Ci.nsIEventTarget);
+
+var isFunction = value => typeof(value) === "function"
+
+function accessor() {
+ let map = new WeakMap();
+ return function(target, value) {
+ if (value)
+ map.set(target, value);
+ return map.get(target);
+ }
+}
+
+const Stream = Class({
+ extends: EventTarget,
+ initialize: function() {
+ this.readable = false;
+ this.writable = false;
+ this.encoding = null;
+ },
+ setEncoding: function setEncoding(encoding) {
+ this.encoding = String(encoding).toUpperCase();
+ },
+ pipe: function pipe(target, options) {
+ let source = this;
+ function onData(chunk) {
+ if (target.writable) {
+ if (false === target.write(chunk))
+ source.pause();
+ }
+ }
+ function onDrain() {
+ if (source.readable)
+ source.resume();
+ }
+ function onEnd() {
+ target.end();
+ }
+ function onPause() {
+ source.pause();
+ }
+ function onResume() {
+ if (source.readable)
+ source.resume();
+ }
+
+ function cleanup() {
+ source.removeListener("data", onData);
+ target.removeListener("drain", onDrain);
+ source.removeListener("end", onEnd);
+
+ target.removeListener("pause", onPause);
+ target.removeListener("resume", onResume);
+
+ source.removeListener("end", cleanup);
+ source.removeListener("close", cleanup);
+
+ target.removeListener("end", cleanup);
+ target.removeListener("close", cleanup);
+ }
+
+ if (!options || options.end !== false)
+ target.on("end", onEnd);
+
+ source.on("data", onData);
+ target.on("drain", onDrain);
+ target.on("resume", onResume);
+ target.on("pause", onPause);
+
+ source.on("end", cleanup);
+ source.on("close", cleanup);
+
+ target.on("end", cleanup);
+ target.on("close", cleanup);
+
+ emit(target, "pipe", source);
+ },
+ pause: function pause() {
+ emit(this, "pause");
+ },
+ resume: function resume() {
+ emit(this, "resume");
+ },
+ destroySoon: function destroySoon() {
+ this.destroy();
+ }
+});
+exports.Stream = Stream;
+
+
+var nsIStreamListener = accessor();
+var nsIInputStreamPump = accessor();
+var nsIAsyncInputStream = accessor();
+var nsIBinaryInputStream = accessor();
+
+const StreamListener = Class({
+ initialize: function(stream) {
+ this.stream = stream;
+ },
+
+ // Next three methods are part of `nsIStreamListener` interface and are
+ // invoked by `nsIInputStreamPump.asyncRead`.
+ onDataAvailable: function(request, context, input, offset, count) {
+ let stream = this.stream;
+ let buffer = new ArrayBuffer(count);
+ nsIBinaryInputStream(stream).readArrayBuffer(count, buffer);
+ emit(stream, "data", new Buffer(buffer));
+ },
+
+ // Next two methods implement `nsIRequestObserver` interface and are invoked
+ // by `nsIInputStreamPump.asyncRead`.
+ onStartRequest: function() {},
+ // Called to signify the end of an asynchronous request. We only care to
+ // discover errors.
+ onStopRequest: function(request, context, status) {
+ let stream = this.stream;
+ stream.readable = false;
+ if (!components.isSuccessCode(status))
+ emit(stream, "error", status);
+ else
+ emit(stream, "end");
+ }
+});
+
+
+const InputStream = Class({
+ extends: Stream,
+ readable: false,
+ paused: false,
+ initialize: function initialize(options) {
+ let { asyncInputStream } = options;
+
+ this.readable = true;
+
+ let binaryInputStream = new BinaryInputStream(asyncInputStream);
+ let inputStreamPump = new InputStreamPump(asyncInputStream,
+ -1, -1, 0, 0, false);
+ let streamListener = new StreamListener(this);
+
+ nsIAsyncInputStream(this, asyncInputStream);
+ nsIInputStreamPump(this, inputStreamPump);
+ nsIBinaryInputStream(this, binaryInputStream);
+ nsIStreamListener(this, streamListener);
+
+ this.asyncInputStream = asyncInputStream;
+ this.inputStreamPump = inputStreamPump;
+ this.binaryInputStream = binaryInputStream;
+ },
+ get status() {
+ return nsIInputStreamPump(this).status;
+ },
+ read: function() {
+ nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null);
+ },
+ pause: function pause() {
+ this.paused = true;
+ nsIInputStreamPump(this).suspend();
+ emit(this, "paused");
+ },
+ resume: function resume() {
+ this.paused = false;
+ if (nsIInputStreamPump(this).isPending()) {
+ nsIInputStreamPump(this).resume();
+ emit(this, "resume");
+ }
+ },
+ close: function close() {
+ this.readable = false;
+ nsIInputStreamPump(this).cancel(Cr.NS_OK);
+ nsIBinaryInputStream(this).close();
+ nsIAsyncInputStream(this).close();
+ },
+ destroy: function destroy() {
+ this.close();
+
+ nsIInputStreamPump(this);
+ nsIAsyncInputStream(this);
+ nsIBinaryInputStream(this);
+ nsIStreamListener(this);
+ }
+});
+exports.InputStream = InputStream;
+
+
+
+var nsIRequestObserver = accessor();
+var nsIAsyncOutputStream = accessor();
+var nsIAsyncStreamCopier = accessor();
+var nsIMultiplexInputStream = accessor();
+
+const RequestObserver = Class({
+ initialize: function(stream) {
+ this.stream = stream;
+ },
+ // Method is part of `nsIRequestObserver` interface that is
+ // invoked by `nsIAsyncStreamCopier.asyncCopy`.
+ onStartRequest: function() {},
+ // Method is part of `nsIRequestObserver` interface that is
+ // invoked by `nsIAsyncStreamCopier.asyncCopy`.
+ onStopRequest: function(request, context, status) {
+ let stream = this.stream;
+ stream.drained = true;
+
+ // Remove copied chunk.
+ let multiplexInputStream = nsIMultiplexInputStream(stream);
+ multiplexInputStream.removeStream(0);
+
+ // If there was an error report.
+ if (!components.isSuccessCode(status))
+ emit(stream, "error", status);
+
+ // If there more chunks in queue then flush them.
+ else if (multiplexInputStream.count)
+ stream.flush();
+
+ // If stream is still writable notify that queue has drained.
+ else if (stream.writable)
+ emit(stream, "drain");
+
+ // If stream is no longer writable close it.
+ else {
+ nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK);
+ nsIMultiplexInputStream(stream).close();
+ nsIAsyncOutputStream(stream).close();
+ nsIAsyncOutputStream(stream).flush();
+ }
+ }
+});
+
+const OutputStreamCallback = Class({
+ initialize: function(stream) {
+ this.stream = stream;
+ },
+ // Method is part of `nsIOutputStreamCallback` interface that
+ // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered
+ // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior,
+ // causing the `onOutputStreamReady` notification to be suppressed until
+ // the stream becomes closed.
+ onOutputStreamReady: function(nsIAsyncOutputStream) {
+ emit(this.stream, "finish");
+ }
+});
+
+const OutputStream = Class({
+ extends: Stream,
+ writable: false,
+ drained: true,
+ get bufferSize() {
+ let multiplexInputStream = nsIMultiplexInputStream(this);
+ return multiplexInputStream && multiplexInputStream.available();
+ },
+ initialize: function initialize(options) {
+ let { asyncOutputStream, output } = options;
+ this.writable = true;
+
+ // Ensure that `nsIAsyncOutputStream` was provided.
+ asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream);
+
+ // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former
+ // is used to queue written data chunks that `asyncStreamCopier` will
+ // asynchronously drain into `asyncOutputStream`.
+ let multiplexInputStream = MultiplexInputStream();
+ let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream,
+ output || asyncOutputStream,
+ eventTarget,
+ // nsIMultiplexInputStream
+ // implemnts .readSegments()
+ true,
+ // nsIOutputStream may or
+ // may not implemnet
+ // .writeSegments().
+ false,
+ // Use default buffer size.
+ null,
+ // Should not close an input.
+ false,
+ // Should not close an output.
+ false);
+
+ // Create `requestObserver` implementing `nsIRequestObserver` interface
+ // in the constructor that's gonna be reused across several flushes.
+ let requestObserver = RequestObserver(this);
+
+
+ // Create observer that implements `nsIOutputStreamCallback` and register
+ // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once
+ // `nsIAsyncOutputStream` is closed.
+ asyncOutputStream.asyncWait(OutputStreamCallback(this),
+ asyncOutputStream.WAIT_CLOSURE_ONLY,
+ 0,
+ threadManager.currentThread);
+
+ nsIRequestObserver(this, requestObserver);
+ nsIAsyncOutputStream(this, asyncOutputStream);
+ nsIMultiplexInputStream(this, multiplexInputStream);
+ nsIAsyncStreamCopier(this, asyncStreamCopier);
+
+ this.asyncOutputStream = asyncOutputStream;
+ this.multiplexInputStream = multiplexInputStream;
+ this.asyncStreamCopier = asyncStreamCopier;
+ },
+ write: function write(content, encoding, callback) {
+ if (isFunction(encoding)) {
+ callback = encoding;
+ encoding = callback;
+ }
+
+ // If stream is not writable we throw an error.
+ if (!this.writable) throw Error("stream is not writable");
+
+ let chunk = null;
+
+ // If content is not a buffer then we create one out of it.
+ if (Buffer.isBuffer(content)) {
+ chunk = new ArrayBufferInputStream();
+ chunk.setData(content.buffer, 0, content.length);
+ }
+ else {
+ chunk = new StringInputStream();
+ chunk.setData(content, content.length);
+ }
+
+ if (callback)
+ this.once("drain", callback);
+
+ // Queue up chunk to be copied to output sync.
+ nsIMultiplexInputStream(this).appendStream(chunk);
+ this.flush();
+
+ return this.drained;
+ },
+ flush: function() {
+ if (this.drained) {
+ this.drained = false;
+ nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null);
+ }
+ },
+ end: function end(content, encoding, callback) {
+ if (isFunction(content)) {
+ callback = content
+ content = callback
+ }
+ if (isFunction(encoding)) {
+ callback = encoding
+ encoding = callback
+ }
+
+ // Setting a listener to "finish" event if passed.
+ if (isFunction(callback))
+ this.once("finish", callback);
+
+
+ if (content)
+ this.write(content, encoding);
+ this.writable = false;
+
+ // Close `asyncOutputStream` only if output has drained. If it's
+ // not drained than `asyncStreamCopier` is busy writing, so let
+ // it finish. Note that since `this.writable` is false copier will
+ // close `asyncOutputStream` once output drains.
+ if (this.drained)
+ nsIAsyncOutputStream(this).close();
+ },
+ destroy: function destroy() {
+ nsIAsyncOutputStream(this).close();
+ nsIAsyncOutputStream(this);
+ nsIMultiplexInputStream(this);
+ nsIAsyncStreamCopier(this);
+ nsIRequestObserver(this);
+ }
+});
+exports.OutputStream = OutputStream;
+
+const DuplexStream = Class({
+ extends: Stream,
+ implements: [InputStream, OutputStream],
+ allowHalfOpen: true,
+ initialize: function initialize(options) {
+ options = options || {};
+ let { readable, writable, allowHalfOpen } = options;
+
+ InputStream.prototype.initialize.call(this, options);
+ OutputStream.prototype.initialize.call(this, options);
+
+ if (readable === false)
+ this.readable = false;
+
+ if (writable === false)
+ this.writable = false;
+
+ if (allowHalfOpen === false)
+ this.allowHalfOpen = false;
+
+ // If in a half open state and it's disabled enforce end.
+ this.once("end", () => {
+ if (!this.allowHalfOpen && (!this.readable || !this.writable))
+ this.end();
+ });
+ },
+ destroy: function destroy(error) {
+ InputStream.prototype.destroy.call(this);
+ OutputStream.prototype.destroy.call(this);
+ }
+});
+exports.DuplexStream = DuplexStream;