diff options
Diffstat (limited to 'addon-sdk/source/lib/sdk/io/stream.js')
-rw-r--r-- | addon-sdk/source/lib/sdk/io/stream.js | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/addon-sdk/source/lib/sdk/io/stream.js b/addon-sdk/source/lib/sdk/io/stream.js new file mode 100644 index 000000000..0698b8e32 --- /dev/null +++ b/addon-sdk/source/lib/sdk/io/stream.js @@ -0,0 +1,440 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +module.metadata = { + "stability": "experimental" +}; + +const { CC, Cc, Ci, Cu, Cr, components } = require("chrome"); +const { EventTarget } = require("../event/target"); +const { emit } = require("../event/core"); +const { Buffer } = require("./buffer"); +const { Class } = require("../core/heritage"); +const { setTimeout } = require("../timers"); + + +const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1", + "nsIMultiplexInputStream"); +const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1", + "nsIAsyncStreamCopier", "init"); +const StringInputStream = CC("@mozilla.org/io/string-input-stream;1", + "nsIStringInputStream"); +const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1", + "nsIArrayBufferInputStream"); + +const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", + "nsIBinaryInputStream", "setInputStream"); +const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1", + "nsIInputStreamPump", "init"); + +const threadManager = Cc["@mozilla.org/thread-manager;1"]. + getService(Ci.nsIThreadManager); + +const eventTarget = Cc["@mozilla.org/network/stream-transport-service;1"]. + getService(Ci.nsIEventTarget); + +var isFunction = value => typeof(value) === "function" + +function accessor() { + let map = new WeakMap(); + return function(target, value) { + if (value) + map.set(target, value); + return map.get(target); + } +} + +const Stream = Class({ + extends: EventTarget, + initialize: function() { + this.readable = false; + this.writable = false; + this.encoding = null; + }, + setEncoding: function setEncoding(encoding) { + this.encoding = String(encoding).toUpperCase(); + }, + pipe: function pipe(target, options) { + let source = this; + function onData(chunk) { + if (target.writable) { + if (false === target.write(chunk)) + source.pause(); + } + } + function onDrain() { + if (source.readable) + source.resume(); + } + function onEnd() { + target.end(); + } + function onPause() { + source.pause(); + } + function onResume() { + if (source.readable) + source.resume(); + } + + function cleanup() { + source.removeListener("data", onData); + target.removeListener("drain", onDrain); + source.removeListener("end", onEnd); + + target.removeListener("pause", onPause); + target.removeListener("resume", onResume); + + source.removeListener("end", cleanup); + source.removeListener("close", cleanup); + + target.removeListener("end", cleanup); + target.removeListener("close", cleanup); + } + + if (!options || options.end !== false) + target.on("end", onEnd); + + source.on("data", onData); + target.on("drain", onDrain); + target.on("resume", onResume); + target.on("pause", onPause); + + source.on("end", cleanup); + source.on("close", cleanup); + + target.on("end", cleanup); + target.on("close", cleanup); + + emit(target, "pipe", source); + }, + pause: function pause() { + emit(this, "pause"); + }, + resume: function resume() { + emit(this, "resume"); + }, + destroySoon: function destroySoon() { + this.destroy(); + } +}); +exports.Stream = Stream; + + +var nsIStreamListener = accessor(); +var nsIInputStreamPump = accessor(); +var nsIAsyncInputStream = accessor(); +var nsIBinaryInputStream = accessor(); + +const StreamListener = Class({ + initialize: function(stream) { + this.stream = stream; + }, + + // Next three methods are part of `nsIStreamListener` interface and are + // invoked by `nsIInputStreamPump.asyncRead`. + onDataAvailable: function(request, context, input, offset, count) { + let stream = this.stream; + let buffer = new ArrayBuffer(count); + nsIBinaryInputStream(stream).readArrayBuffer(count, buffer); + emit(stream, "data", new Buffer(buffer)); + }, + + // Next two methods implement `nsIRequestObserver` interface and are invoked + // by `nsIInputStreamPump.asyncRead`. + onStartRequest: function() {}, + // Called to signify the end of an asynchronous request. We only care to + // discover errors. + onStopRequest: function(request, context, status) { + let stream = this.stream; + stream.readable = false; + if (!components.isSuccessCode(status)) + emit(stream, "error", status); + else + emit(stream, "end"); + } +}); + + +const InputStream = Class({ + extends: Stream, + readable: false, + paused: false, + initialize: function initialize(options) { + let { asyncInputStream } = options; + + this.readable = true; + + let binaryInputStream = new BinaryInputStream(asyncInputStream); + let inputStreamPump = new InputStreamPump(asyncInputStream, + -1, -1, 0, 0, false); + let streamListener = new StreamListener(this); + + nsIAsyncInputStream(this, asyncInputStream); + nsIInputStreamPump(this, inputStreamPump); + nsIBinaryInputStream(this, binaryInputStream); + nsIStreamListener(this, streamListener); + + this.asyncInputStream = asyncInputStream; + this.inputStreamPump = inputStreamPump; + this.binaryInputStream = binaryInputStream; + }, + get status() { + return nsIInputStreamPump(this).status; + }, + read: function() { + nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null); + }, + pause: function pause() { + this.paused = true; + nsIInputStreamPump(this).suspend(); + emit(this, "paused"); + }, + resume: function resume() { + this.paused = false; + if (nsIInputStreamPump(this).isPending()) { + nsIInputStreamPump(this).resume(); + emit(this, "resume"); + } + }, + close: function close() { + this.readable = false; + nsIInputStreamPump(this).cancel(Cr.NS_OK); + nsIBinaryInputStream(this).close(); + nsIAsyncInputStream(this).close(); + }, + destroy: function destroy() { + this.close(); + + nsIInputStreamPump(this); + nsIAsyncInputStream(this); + nsIBinaryInputStream(this); + nsIStreamListener(this); + } +}); +exports.InputStream = InputStream; + + + +var nsIRequestObserver = accessor(); +var nsIAsyncOutputStream = accessor(); +var nsIAsyncStreamCopier = accessor(); +var nsIMultiplexInputStream = accessor(); + +const RequestObserver = Class({ + initialize: function(stream) { + this.stream = stream; + }, + // Method is part of `nsIRequestObserver` interface that is + // invoked by `nsIAsyncStreamCopier.asyncCopy`. + onStartRequest: function() {}, + // Method is part of `nsIRequestObserver` interface that is + // invoked by `nsIAsyncStreamCopier.asyncCopy`. + onStopRequest: function(request, context, status) { + let stream = this.stream; + stream.drained = true; + + // Remove copied chunk. + let multiplexInputStream = nsIMultiplexInputStream(stream); + multiplexInputStream.removeStream(0); + + // If there was an error report. + if (!components.isSuccessCode(status)) + emit(stream, "error", status); + + // If there more chunks in queue then flush them. + else if (multiplexInputStream.count) + stream.flush(); + + // If stream is still writable notify that queue has drained. + else if (stream.writable) + emit(stream, "drain"); + + // If stream is no longer writable close it. + else { + nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK); + nsIMultiplexInputStream(stream).close(); + nsIAsyncOutputStream(stream).close(); + nsIAsyncOutputStream(stream).flush(); + } + } +}); + +const OutputStreamCallback = Class({ + initialize: function(stream) { + this.stream = stream; + }, + // Method is part of `nsIOutputStreamCallback` interface that + // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered + // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior, + // causing the `onOutputStreamReady` notification to be suppressed until + // the stream becomes closed. + onOutputStreamReady: function(nsIAsyncOutputStream) { + emit(this.stream, "finish"); + } +}); + +const OutputStream = Class({ + extends: Stream, + writable: false, + drained: true, + get bufferSize() { + let multiplexInputStream = nsIMultiplexInputStream(this); + return multiplexInputStream && multiplexInputStream.available(); + }, + initialize: function initialize(options) { + let { asyncOutputStream, output } = options; + this.writable = true; + + // Ensure that `nsIAsyncOutputStream` was provided. + asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream); + + // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former + // is used to queue written data chunks that `asyncStreamCopier` will + // asynchronously drain into `asyncOutputStream`. + let multiplexInputStream = MultiplexInputStream(); + let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream, + output || asyncOutputStream, + eventTarget, + // nsIMultiplexInputStream + // implemnts .readSegments() + true, + // nsIOutputStream may or + // may not implemnet + // .writeSegments(). + false, + // Use default buffer size. + null, + // Should not close an input. + false, + // Should not close an output. + false); + + // Create `requestObserver` implementing `nsIRequestObserver` interface + // in the constructor that's gonna be reused across several flushes. + let requestObserver = RequestObserver(this); + + + // Create observer that implements `nsIOutputStreamCallback` and register + // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once + // `nsIAsyncOutputStream` is closed. + asyncOutputStream.asyncWait(OutputStreamCallback(this), + asyncOutputStream.WAIT_CLOSURE_ONLY, + 0, + threadManager.currentThread); + + nsIRequestObserver(this, requestObserver); + nsIAsyncOutputStream(this, asyncOutputStream); + nsIMultiplexInputStream(this, multiplexInputStream); + nsIAsyncStreamCopier(this, asyncStreamCopier); + + this.asyncOutputStream = asyncOutputStream; + this.multiplexInputStream = multiplexInputStream; + this.asyncStreamCopier = asyncStreamCopier; + }, + write: function write(content, encoding, callback) { + if (isFunction(encoding)) { + callback = encoding; + encoding = callback; + } + + // If stream is not writable we throw an error. + if (!this.writable) throw Error("stream is not writable"); + + let chunk = null; + + // If content is not a buffer then we create one out of it. + if (Buffer.isBuffer(content)) { + chunk = new ArrayBufferInputStream(); + chunk.setData(content.buffer, 0, content.length); + } + else { + chunk = new StringInputStream(); + chunk.setData(content, content.length); + } + + if (callback) + this.once("drain", callback); + + // Queue up chunk to be copied to output sync. + nsIMultiplexInputStream(this).appendStream(chunk); + this.flush(); + + return this.drained; + }, + flush: function() { + if (this.drained) { + this.drained = false; + nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null); + } + }, + end: function end(content, encoding, callback) { + if (isFunction(content)) { + callback = content + content = callback + } + if (isFunction(encoding)) { + callback = encoding + encoding = callback + } + + // Setting a listener to "finish" event if passed. + if (isFunction(callback)) + this.once("finish", callback); + + + if (content) + this.write(content, encoding); + this.writable = false; + + // Close `asyncOutputStream` only if output has drained. If it's + // not drained than `asyncStreamCopier` is busy writing, so let + // it finish. Note that since `this.writable` is false copier will + // close `asyncOutputStream` once output drains. + if (this.drained) + nsIAsyncOutputStream(this).close(); + }, + destroy: function destroy() { + nsIAsyncOutputStream(this).close(); + nsIAsyncOutputStream(this); + nsIMultiplexInputStream(this); + nsIAsyncStreamCopier(this); + nsIRequestObserver(this); + } +}); +exports.OutputStream = OutputStream; + +const DuplexStream = Class({ + extends: Stream, + implements: [InputStream, OutputStream], + allowHalfOpen: true, + initialize: function initialize(options) { + options = options || {}; + let { readable, writable, allowHalfOpen } = options; + + InputStream.prototype.initialize.call(this, options); + OutputStream.prototype.initialize.call(this, options); + + if (readable === false) + this.readable = false; + + if (writable === false) + this.writable = false; + + if (allowHalfOpen === false) + this.allowHalfOpen = false; + + // If in a half open state and it's disabled enforce end. + this.once("end", () => { + if (!this.allowHalfOpen && (!this.readable || !this.writable)) + this.end(); + }); + }, + destroy: function destroy(error) { + InputStream.prototype.destroy.call(this); + OutputStream.prototype.destroy.call(this); + } +}); +exports.DuplexStream = DuplexStream; |