diff options
Diffstat (limited to 'toolkit/jetpack/sdk/io')
-rw-r--r-- | toolkit/jetpack/sdk/io/buffer.js | 351 | ||||
-rw-r--r-- | toolkit/jetpack/sdk/io/byte-streams.js | 104 | ||||
-rw-r--r-- | toolkit/jetpack/sdk/io/file.js | 196 | ||||
-rw-r--r-- | toolkit/jetpack/sdk/io/fs.js | 984 | ||||
-rw-r--r-- | toolkit/jetpack/sdk/io/stream.js | 440 | ||||
-rw-r--r-- | toolkit/jetpack/sdk/io/text-streams.js | 235 |
6 files changed, 2310 insertions, 0 deletions
diff --git a/toolkit/jetpack/sdk/io/buffer.js b/toolkit/jetpack/sdk/io/buffer.js new file mode 100644 index 000000000..5ea169402 --- /dev/null +++ b/toolkit/jetpack/sdk/io/buffer.js @@ -0,0 +1,351 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +'use strict'; + +module.metadata = { + 'stability': 'experimental' +}; + +/* + * Encodings supported by TextEncoder/Decoder: + * utf-8, utf-16le, utf-16be + * http://encoding.spec.whatwg.org/#interface-textencoder + * + * Node however supports the following encodings: + * ascii, utf-8, utf-16le, usc2, base64, hex + */ + +const { Cu } = require('chrome'); +const { isNumber } = require('sdk/lang/type'); +const { TextEncoder, TextDecoder } = Cu.import('resource://gre/modules/commonjs/toolkit/loader.js', {}); + +exports.TextEncoder = TextEncoder; +exports.TextDecoder = TextDecoder; + +/** + * Use WeakMaps to work around Bug 929146, which prevents us from adding + * getters or values to typed arrays + * https://bugzilla.mozilla.org/show_bug.cgi?id=929146 + */ +const parents = new WeakMap(); +const views = new WeakMap(); + +function Buffer(subject, encoding /*, bufferLength */) { + + // Allow invocation without `new` constructor + if (!(this instanceof Buffer)) + return new Buffer(subject, encoding, arguments[2]); + + var type = typeof(subject); + + switch (type) { + case 'number': + // Create typed array of the given size if number. + try { + let buffer = new Uint8Array(subject > 0 ? Math.floor(subject) : 0); + return buffer; + } catch (e) { + if (/size and count too large/.test(e.message) || + /invalid arguments/.test(e.message)) + throw new RangeError('Could not instantiate buffer: size of buffer may be too large'); + else + throw new Error('Could not instantiate buffer'); + } + break; + case 'string': + // If string encode it and use buffer for the returned Uint8Array + // to create a local patched version that acts like node buffer. + encoding = encoding || 'utf8'; + return new Uint8Array(new TextEncoder(encoding).encode(subject).buffer); + case 'object': + // This form of the constructor uses the form of + // new Uint8Array(buffer, offset, length); + // So we can instantiate a typed array within the constructor + // to inherit the appropriate properties, where both the + // `subject` and newly instantiated buffer share the same underlying + // data structure. + if (arguments.length === 3) + return new Uint8Array(subject, encoding, arguments[2]); + // If array or alike just make a copy with a local patched prototype. + else + return new Uint8Array(subject); + default: + throw new TypeError('must start with number, buffer, array or string'); + } +} +exports.Buffer = Buffer; + +// Tests if `value` is a Buffer. +Buffer.isBuffer = value => value instanceof Buffer + +// Returns true if the encoding is a valid encoding argument & false otherwise +Buffer.isEncoding = function (encoding) { + if (!encoding) return false; + try { + new TextDecoder(encoding); + } catch(e) { + return false; + } + return true; +} + +// Gives the actual byte length of a string. encoding defaults to 'utf8'. +// This is not the same as String.prototype.length since that returns the +// number of characters in a string. +Buffer.byteLength = (value, encoding = 'utf8') => + new TextEncoder(encoding).encode(value).byteLength + +// Direct copy of the nodejs's buffer implementation: +// https://github.com/joyent/node/blob/b255f4c10a80343f9ce1cee56d0288361429e214/lib/buffer.js#L146-L177 +Buffer.concat = function(list, length) { + if (!Array.isArray(list)) + throw new TypeError('Usage: Buffer.concat(list[, length])'); + + if (typeof length === 'undefined') { + length = 0; + for (var i = 0; i < list.length; i++) + length += list[i].length; + } else { + length = ~~length; + } + + if (length < 0) + length = 0; + + if (list.length === 0) + return new Buffer(0); + else if (list.length === 1) + return list[0]; + + if (length < 0) + throw new RangeError('length is not a positive number'); + + var buffer = new Buffer(length); + var pos = 0; + for (var i = 0; i < list.length; i++) { + var buf = list[i]; + buf.copy(buffer, pos); + pos += buf.length; + } + + return buffer; +}; + +// Node buffer is very much like Uint8Array although it has bunch of methods +// that typically can be used in combination with `DataView` while preserving +// access by index. Since in SDK each module has it's own set of bult-ins it +// ok to patch ours to make it nodejs Buffer compatible. +const Uint8ArraySet = Uint8Array.prototype.set +Buffer.prototype = Uint8Array.prototype; +Object.defineProperties(Buffer.prototype, { + parent: { + get: function() { return parents.get(this, undefined); } + }, + view: { + get: function () { + let view = views.get(this, undefined); + if (view) return view; + view = new DataView(this.buffer); + views.set(this, view); + return view; + } + }, + toString: { + value: function(encoding, start, end) { + encoding = !!encoding ? (encoding + '').toLowerCase() : 'utf8'; + start = Math.max(0, ~~start); + end = Math.min(this.length, end === void(0) ? this.length : ~~end); + return new TextDecoder(encoding).decode(this.subarray(start, end)); + } + }, + toJSON: { + value: function() { + return { type: 'Buffer', data: Array.slice(this, 0) }; + } + }, + get: { + value: function(offset) { + return this[offset]; + } + }, + set: { + value: function(offset, value) { this[offset] = value; } + }, + copy: { + value: function(target, offset, start, end) { + let length = this.length; + let targetLength = target.length; + offset = isNumber(offset) ? offset : 0; + start = isNumber(start) ? start : 0; + + if (start < 0) + throw new RangeError('sourceStart is outside of valid range'); + if (end < 0) + throw new RangeError('sourceEnd is outside of valid range'); + + // If sourceStart > sourceEnd, or targetStart > targetLength, + // zero bytes copied + if (start > end || + offset > targetLength + ) + return 0; + + // If `end` is not defined, or if it is defined + // but would overflow `target`, redefine `end` + // so we can copy as much as we can + if (end - start > targetLength - offset || + end == null) { + let remainingTarget = targetLength - offset; + let remainingSource = length - start; + if (remainingSource <= remainingTarget) + end = length; + else + end = start + remainingTarget; + } + + Uint8ArraySet.call(target, this.subarray(start, end), offset); + return end - start; + } + }, + slice: { + value: function(start, end) { + let length = this.length; + start = ~~start; + end = end != null ? end : length; + + if (start < 0) { + start += length; + if (start < 0) start = 0; + } else if (start > length) + start = length; + + if (end < 0) { + end += length; + if (end < 0) end = 0; + } else if (end > length) + end = length; + + if (end < start) + end = start; + + // This instantiation uses the new Uint8Array(buffer, offset, length) version + // of construction to share the same underling data structure + let buffer = new Buffer(this.buffer, start, end - start); + + // If buffer has a value, assign its parent value to the + // buffer it shares its underlying structure with. If a slice of + // a slice, then use the root structure + if (buffer.length > 0) + parents.set(buffer, this.parent || this); + + return buffer; + } + }, + write: { + value: function(string, offset, length, encoding = 'utf8') { + // write(string, encoding); + if (typeof(offset) === 'string' && Number.isNaN(parseInt(offset))) { + [offset, length, encoding] = [0, null, offset]; + } + // write(string, offset, encoding); + else if (typeof(length) === 'string') + [length, encoding] = [null, length]; + + if (offset < 0 || offset > this.length) + throw new RangeError('offset is outside of valid range'); + + offset = ~~offset; + + // Clamp length if it would overflow buffer, or if its + // undefined + if (length == null || length + offset > this.length) + length = this.length - offset; + + let buffer = new TextEncoder(encoding).encode(string); + let result = Math.min(buffer.length, length); + if (buffer.length !== length) + buffer = buffer.subarray(0, length); + + Uint8ArraySet.call(this, buffer, offset); + return result; + } + }, + fill: { + value: function fill(value, start, end) { + let length = this.length; + value = value || 0; + start = start || 0; + end = end || length; + + if (typeof(value) === 'string') + value = value.charCodeAt(0); + if (typeof(value) !== 'number' || isNaN(value)) + throw TypeError('value is not a number'); + if (end < start) + throw new RangeError('end < start'); + + // Fill 0 bytes; we're done + if (end === start) + return 0; + if (length == 0) + return 0; + + if (start < 0 || start >= length) + throw RangeError('start out of bounds'); + + if (end < 0 || end > length) + throw RangeError('end out of bounds'); + + let index = start; + while (index < end) this[index++] = value; + } + } +}); + +// Define nodejs Buffer's getter and setter functions that just proxy +// to internal DataView's equivalent methods. + +// TODO do we need to check architecture to see if it's default big/little endian? +[['readUInt16LE', 'getUint16', true], + ['readUInt16BE', 'getUint16', false], + ['readInt16LE', 'getInt16', true], + ['readInt16BE', 'getInt16', false], + ['readUInt32LE', 'getUint32', true], + ['readUInt32BE', 'getUint32', false], + ['readInt32LE', 'getInt32', true], + ['readInt32BE', 'getInt32', false], + ['readFloatLE', 'getFloat32', true], + ['readFloatBE', 'getFloat32', false], + ['readDoubleLE', 'getFloat64', true], + ['readDoubleBE', 'getFloat64', false], + ['readUInt8', 'getUint8'], + ['readInt8', 'getInt8']].forEach(([alias, name, littleEndian]) => { + Object.defineProperty(Buffer.prototype, alias, { + value: function(offset) { + return this.view[name](offset, littleEndian); + } + }); +}); + +[['writeUInt16LE', 'setUint16', true], + ['writeUInt16BE', 'setUint16', false], + ['writeInt16LE', 'setInt16', true], + ['writeInt16BE', 'setInt16', false], + ['writeUInt32LE', 'setUint32', true], + ['writeUInt32BE', 'setUint32', false], + ['writeInt32LE', 'setInt32', true], + ['writeInt32BE', 'setInt32', false], + ['writeFloatLE', 'setFloat32', true], + ['writeFloatBE', 'setFloat32', false], + ['writeDoubleLE', 'setFloat64', true], + ['writeDoubleBE', 'setFloat64', false], + ['writeUInt8', 'setUint8'], + ['writeInt8', 'setInt8']].forEach(([alias, name, littleEndian]) => { + Object.defineProperty(Buffer.prototype, alias, { + value: function(value, offset) { + return this.view[name](offset, value, littleEndian); + } + }); +}); diff --git a/toolkit/jetpack/sdk/io/byte-streams.js b/toolkit/jetpack/sdk/io/byte-streams.js new file mode 100644 index 000000000..6afab4369 --- /dev/null +++ b/toolkit/jetpack/sdk/io/byte-streams.js @@ -0,0 +1,104 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +module.metadata = { + "stability": "experimental" +}; + +exports.ByteReader = ByteReader; +exports.ByteWriter = ByteWriter; + +const {Cc, Ci} = require("chrome"); + +// This just controls the maximum number of bytes we read in at one time. +const BUFFER_BYTE_LEN = 0x8000; + +function ByteReader(inputStream) { + const self = this; + + let stream = Cc["@mozilla.org/binaryinputstream;1"]. + createInstance(Ci.nsIBinaryInputStream); + stream.setInputStream(inputStream); + + let manager = new StreamManager(this, stream); + + this.read = function ByteReader_read(numBytes) { + manager.ensureOpened(); + if (typeof(numBytes) !== "number") + numBytes = Infinity; + + let data = ""; + let read = 0; + try { + while (true) { + let avail = stream.available(); + let toRead = Math.min(numBytes - read, avail, BUFFER_BYTE_LEN); + if (toRead <= 0) + break; + data += stream.readBytes(toRead); + read += toRead; + } + } + catch (err) { + throw new Error("Error reading from stream: " + err); + } + + return data; + }; +} + +function ByteWriter(outputStream) { + const self = this; + + let stream = Cc["@mozilla.org/binaryoutputstream;1"]. + createInstance(Ci.nsIBinaryOutputStream); + stream.setOutputStream(outputStream); + + let manager = new StreamManager(this, stream); + + this.write = function ByteWriter_write(str) { + manager.ensureOpened(); + try { + stream.writeBytes(str, str.length); + } + catch (err) { + throw new Error("Error writing to stream: " + err); + } + }; +} + + +// This manages the lifetime of stream, a ByteReader or ByteWriter. It defines +// closed and close() on stream and registers an unload listener that closes +// rawStream if it's still opened. It also provides ensureOpened(), which +// throws an exception if the stream is closed. +function StreamManager(stream, rawStream) { + const self = this; + this.rawStream = rawStream; + this.opened = true; + + stream.__defineGetter__("closed", function stream_closed() { + return !self.opened; + }); + + stream.close = function stream_close() { + self.ensureOpened(); + self.unload(); + }; + + require("../system/unload").ensure(this); +} + +StreamManager.prototype = { + ensureOpened: function StreamManager_ensureOpened() { + if (!this.opened) + throw new Error("The stream is closed and cannot be used."); + }, + unload: function StreamManager_unload() { + this.rawStream.close(); + this.opened = false; + } +}; diff --git a/toolkit/jetpack/sdk/io/file.js b/toolkit/jetpack/sdk/io/file.js new file mode 100644 index 000000000..47467df87 --- /dev/null +++ b/toolkit/jetpack/sdk/io/file.js @@ -0,0 +1,196 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +module.metadata = { + "stability": "deprecated" +}; + +const {Cc,Ci,Cr} = require("chrome"); +const byteStreams = require("./byte-streams"); +const textStreams = require("./text-streams"); + +// Flags passed when opening a file. See nsprpub/pr/include/prio.h. +const OPEN_FLAGS = { + RDONLY: parseInt("0x01"), + WRONLY: parseInt("0x02"), + CREATE_FILE: parseInt("0x08"), + APPEND: parseInt("0x10"), + TRUNCATE: parseInt("0x20"), + EXCL: parseInt("0x80") +}; + +var dirsvc = Cc["@mozilla.org/file/directory_service;1"] + .getService(Ci.nsIProperties); + +function MozFile(path) { + var file = Cc['@mozilla.org/file/local;1'] + .createInstance(Ci.nsILocalFile); + file.initWithPath(path); + return file; +} + +function ensureReadable(file) { + if (!file.isReadable()) + throw new Error("path is not readable: " + file.path); +} + +function ensureDir(file) { + ensureExists(file); + if (!file.isDirectory()) + throw new Error("path is not a directory: " + file.path); +} + +function ensureFile(file) { + ensureExists(file); + if (!file.isFile()) + throw new Error("path is not a file: " + file.path); +} + +function ensureExists(file) { + if (!file.exists()) + throw friendlyError(Cr.NS_ERROR_FILE_NOT_FOUND, file.path); +} + +function friendlyError(errOrResult, filename) { + var isResult = typeof(errOrResult) === "number"; + var result = isResult ? errOrResult : errOrResult.result; + switch (result) { + case Cr.NS_ERROR_FILE_NOT_FOUND: + return new Error("path does not exist: " + filename); + } + return isResult ? new Error("XPCOM error code: " + errOrResult) : errOrResult; +} + +exports.exists = function exists(filename) { + return MozFile(filename).exists(); +}; + +exports.isFile = function isFile(filename) { + return MozFile(filename).isFile(); +}; + +exports.read = function read(filename, mode) { + if (typeof(mode) !== "string") + mode = ""; + + // Ensure mode is read-only. + mode = /b/.test(mode) ? "b" : ""; + + var stream = exports.open(filename, mode); + try { + var str = stream.read(); + } + finally { + stream.close(); + } + + return str; +}; + +exports.join = function join(base) { + if (arguments.length < 2) + throw new Error("need at least 2 args"); + base = MozFile(base); + for (var i = 1; i < arguments.length; i++) + base.append(arguments[i]); + return base.path; +}; + +exports.dirname = function dirname(path) { + var parent = MozFile(path).parent; + return parent ? parent.path : ""; +}; + +exports.basename = function basename(path) { + var leafName = MozFile(path).leafName; + + // On Windows, leafName when the path is a volume letter and colon ("c:") is + // the path itself. But such a path has no basename, so we want the empty + // string. + return leafName == path ? "" : leafName; +}; + +exports.list = function list(path) { + var file = MozFile(path); + ensureDir(file); + ensureReadable(file); + + var entries = file.directoryEntries; + var entryNames = []; + while(entries.hasMoreElements()) { + var entry = entries.getNext(); + entry.QueryInterface(Ci.nsIFile); + entryNames.push(entry.leafName); + } + return entryNames; +}; + +exports.open = function open(filename, mode) { + var file = MozFile(filename); + if (typeof(mode) !== "string") + mode = ""; + + // File opened for write only. + if (/w/.test(mode)) { + if (file.exists()) + ensureFile(file); + var stream = Cc['@mozilla.org/network/file-output-stream;1']. + createInstance(Ci.nsIFileOutputStream); + var openFlags = OPEN_FLAGS.WRONLY | + OPEN_FLAGS.CREATE_FILE | + OPEN_FLAGS.TRUNCATE; + var permFlags = 0o644; // u+rw go+r + try { + stream.init(file, openFlags, permFlags, 0); + } + catch (err) { + throw friendlyError(err, filename); + } + return /b/.test(mode) ? + new byteStreams.ByteWriter(stream) : + new textStreams.TextWriter(stream); + } + + // File opened for read only, the default. + ensureFile(file); + stream = Cc['@mozilla.org/network/file-input-stream;1']. + createInstance(Ci.nsIFileInputStream); + try { + stream.init(file, OPEN_FLAGS.RDONLY, 0, 0); + } + catch (err) { + throw friendlyError(err, filename); + } + return /b/.test(mode) ? + new byteStreams.ByteReader(stream) : + new textStreams.TextReader(stream); +}; + +exports.remove = function remove(path) { + var file = MozFile(path); + ensureFile(file); + file.remove(false); +}; + +exports.mkpath = function mkpath(path) { + var file = MozFile(path); + if (!file.exists()) + file.create(Ci.nsIFile.DIRECTORY_TYPE, 0o755); // u+rwx go+rx + else if (!file.isDirectory()) + throw new Error("The path already exists and is not a directory: " + path); +}; + +exports.rmdir = function rmdir(path) { + var file = MozFile(path); + ensureDir(file); + try { + file.remove(false); + } + catch (err) { + // Bug 566950 explains why we're not catching a specific exception here. + throw new Error("The directory is not empty: " + path); + } +}; diff --git a/toolkit/jetpack/sdk/io/fs.js b/toolkit/jetpack/sdk/io/fs.js new file mode 100644 index 000000000..860a884a5 --- /dev/null +++ b/toolkit/jetpack/sdk/io/fs.js @@ -0,0 +1,984 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +module.metadata = { + "stability": "experimental" +}; + +const { Cc, Ci, CC } = require("chrome"); + +const { setTimeout } = require("../timers"); +const { Stream, InputStream, OutputStream } = require("./stream"); +const { emit, on } = require("../event/core"); +const { Buffer } = require("./buffer"); +const { ns } = require("../core/namespace"); +const { Class } = require("../core/heritage"); + + +const nsILocalFile = CC("@mozilla.org/file/local;1", "nsILocalFile", + "initWithPath"); +const FileOutputStream = CC("@mozilla.org/network/file-output-stream;1", + "nsIFileOutputStream", "init"); +const FileInputStream = CC("@mozilla.org/network/file-input-stream;1", + "nsIFileInputStream", "init"); +const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", + "nsIBinaryInputStream", "setInputStream"); +const BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1", + "nsIBinaryOutputStream", "setOutputStream"); +const StreamPump = CC("@mozilla.org/network/input-stream-pump;1", + "nsIInputStreamPump", "init"); + +const { createOutputTransport, createInputTransport } = + Cc["@mozilla.org/network/stream-transport-service;1"]. + getService(Ci.nsIStreamTransportService); + +const { OPEN_UNBUFFERED } = Ci.nsITransport; + + +const { REOPEN_ON_REWIND, DEFER_OPEN } = Ci.nsIFileInputStream; +const { DIRECTORY_TYPE, NORMAL_FILE_TYPE } = Ci.nsIFile; +const { NS_SEEK_SET, NS_SEEK_CUR, NS_SEEK_END } = Ci.nsISeekableStream; + +const FILE_PERMISSION = 0o666; +const PR_UINT32_MAX = 0xfffffff; +// Values taken from: +// http://mxr.mozilla.org/mozilla-central/source/nsprpub/pr/include/prio.h#615 +const PR_RDONLY = 0x01; +const PR_WRONLY = 0x02; +const PR_RDWR = 0x04; +const PR_CREATE_FILE = 0x08; +const PR_APPEND = 0x10; +const PR_TRUNCATE = 0x20; +const PR_SYNC = 0x40; +const PR_EXCL = 0x80; + +const FLAGS = { + "r": PR_RDONLY, + "r+": PR_RDWR, + "w": PR_CREATE_FILE | PR_TRUNCATE | PR_WRONLY, + "w+": PR_CREATE_FILE | PR_TRUNCATE | PR_RDWR, + "a": PR_APPEND | PR_CREATE_FILE | PR_WRONLY, + "a+": PR_APPEND | PR_CREATE_FILE | PR_RDWR +}; + +function accessor() { + let map = new WeakMap(); + return function(fd, value) { + if (value === null) map.delete(fd); + if (value !== undefined) map.set(fd, value); + return map.get(fd); + } +} + +var nsIFile = accessor(); +var nsIFileInputStream = accessor(); +var nsIFileOutputStream = accessor(); +var nsIBinaryInputStream = accessor(); +var nsIBinaryOutputStream = accessor(); + +// Just a contstant object used to signal that all of the file +// needs to be read. +const ALL = new String("Read all of the file"); + +function isWritable(mode) { + return !!(mode & PR_WRONLY || mode & PR_RDWR); +} +function isReadable(mode) { + return !!(mode & PR_RDONLY || mode & PR_RDWR); +} + +function isString(value) { + return typeof(value) === "string"; +} +function isFunction(value) { + return typeof(value) === "function"; +} + +function toArray(enumerator) { + let value = []; + while(enumerator.hasMoreElements()) + value.push(enumerator.getNext()) + return value +} + +function getFileName(file) { + return file.QueryInterface(Ci.nsIFile).leafName; +} + + +function remove(path, recursive) { + let fd = new nsILocalFile(path) + if (fd.exists()) { + fd.remove(recursive || false); + } + else { + throw FSError("remove", "ENOENT", 34, path); + } +} + +/** + * Utility function to convert either an octal number or string + * into an octal number + * 0777 => 0o777 + * "0644" => 0o644 + */ +function Mode(mode, fallback) { + return isString(mode) ? parseInt(mode, 8) : mode || fallback; +} +function Flags(flag) { + return !isString(flag) ? flag : + FLAGS[flag] || Error("Unknown file open flag: " + flag); +} + + +function FSError(op, code, errno, path, file, line) { + let error = Error(code + ", " + op + " " + path, file, line); + error.code = code; + error.path = path; + error.errno = errno; + return error; +} + +const ReadStream = Class({ + extends: InputStream, + initialize: function initialize(path, options) { + this.position = -1; + this.length = -1; + this.flags = "r"; + this.mode = FILE_PERMISSION; + this.bufferSize = 64 * 1024; + + options = options || {}; + + if ("flags" in options && options.flags) + this.flags = options.flags; + if ("bufferSize" in options && options.bufferSize) + this.bufferSize = options.bufferSize; + if ("length" in options && options.length) + this.length = options.length; + if ("position" in options && options.position !== undefined) + this.position = options.position; + + let { flags, mode, position, length } = this; + let fd = isString(path) ? openSync(path, flags, mode) : path; + this.fd = fd; + + let input = nsIFileInputStream(fd); + // Setting a stream position, unless it"s `-1` which means current position. + if (position >= 0) + input.QueryInterface(Ci.nsISeekableStream).seek(NS_SEEK_SET, position); + // We use `nsIStreamTransportService` service to transform blocking + // file input stream into a fully asynchronous stream that can be written + // without blocking the main thread. + let transport = createInputTransport(input, position, length, false); + // Open an input stream on a transport. We don"t pass flags to guarantee + // non-blocking stream semantics. Also we use defaults for segment size & + // count. + InputStream.prototype.initialize.call(this, { + asyncInputStream: transport.openInputStream(null, 0, 0) + }); + + // Close file descriptor on end and destroy the stream. + on(this, "end", _ => { + this.destroy(); + emit(this, "close"); + }); + + this.read(); + }, + destroy: function() { + closeSync(this.fd); + InputStream.prototype.destroy.call(this); + } +}); +exports.ReadStream = ReadStream; +exports.createReadStream = function createReadStream(path, options) { + return new ReadStream(path, options); +}; + +const WriteStream = Class({ + extends: OutputStream, + initialize: function initialize(path, options) { + this.drainable = true; + this.flags = "w"; + this.position = -1; + this.mode = FILE_PERMISSION; + + options = options || {}; + + if ("flags" in options && options.flags) + this.flags = options.flags; + if ("mode" in options && options.mode) + this.mode = options.mode; + if ("position" in options && options.position !== undefined) + this.position = options.position; + + let { position, flags, mode } = this; + // If pass was passed we create a file descriptor out of it. Otherwise + // we just use given file descriptor. + let fd = isString(path) ? openSync(path, flags, mode) : path; + this.fd = fd; + + let output = nsIFileOutputStream(fd); + // Setting a stream position, unless it"s `-1` which means current position. + if (position >= 0) + output.QueryInterface(Ci.nsISeekableStream).seek(NS_SEEK_SET, position); + // We use `nsIStreamTransportService` service to transform blocking + // file output stream into a fully asynchronous stream that can be written + // without blocking the main thread. + let transport = createOutputTransport(output, position, -1, false); + // Open an output stream on a transport. We don"t pass flags to guarantee + // non-blocking stream semantics. Also we use defaults for segment size & + // count. + OutputStream.prototype.initialize.call(this, { + asyncOutputStream: transport.openOutputStream(OPEN_UNBUFFERED, 0, 0), + output: output + }); + + // For write streams "finish" basically means close. + on(this, "finish", _ => { + this.destroy(); + emit(this, "close"); + }); + }, + destroy: function() { + OutputStream.prototype.destroy.call(this); + closeSync(this.fd); + } +}); +exports.WriteStream = WriteStream; +exports.createWriteStream = function createWriteStream(path, options) { + return new WriteStream(path, options); +}; + +const Stats = Class({ + initialize: function initialize(path) { + let file = new nsILocalFile(path); + if (!file.exists()) throw FSError("stat", "ENOENT", 34, path); + nsIFile(this, file); + }, + isDirectory: function() { + return nsIFile(this).isDirectory(); + }, + isFile: function() { + return nsIFile(this).isFile(); + }, + isSymbolicLink: function() { + return nsIFile(this).isSymlink(); + }, + get mode() { + return nsIFile(this).permissions; + }, + get size() { + return nsIFile(this).fileSize; + }, + get mtime() { + return nsIFile(this).lastModifiedTime; + }, + isBlockDevice: function() { + return nsIFile(this).isSpecial(); + }, + isCharacterDevice: function() { + return nsIFile(this).isSpecial(); + }, + isFIFO: function() { + return nsIFile(this).isSpecial(); + }, + isSocket: function() { + return nsIFile(this).isSpecial(); + }, + // non standard + get exists() { + return nsIFile(this).exists(); + }, + get hidden() { + return nsIFile(this).isHidden(); + }, + get writable() { + return nsIFile(this).isWritable(); + }, + get readable() { + return nsIFile(this).isReadable(); + } +}); +exports.Stats = Stats; + +const LStats = Class({ + extends: Stats, + get size() { + return this.isSymbolicLink() ? nsIFile(this).fileSizeOfLink : + nsIFile(this).fileSize; + }, + get mtime() { + return this.isSymbolicLink() ? nsIFile(this).lastModifiedTimeOfLink : + nsIFile(this).lastModifiedTime; + }, + // non standard + get permissions() { + return this.isSymbolicLink() ? nsIFile(this).permissionsOfLink : + nsIFile(this).permissions; + } +}); + +const FStat = Class({ + extends: Stats, + initialize: function initialize(fd) { + nsIFile(this, nsIFile(fd)); + } +}); + +function noop() {} +function Async(wrapped) { + return function (path, callback) { + let args = Array.slice(arguments); + callback = args.pop(); + // If node is not given a callback argument + // it just does not calls it. + if (typeof(callback) !== "function") { + args.push(callback); + callback = noop; + } + setTimeout(function() { + try { + var result = wrapped.apply(this, args); + if (result === undefined) callback(null); + else callback(null, result); + } catch (error) { + callback(error); + } + }, 0); + } +} + + +/** + * Synchronous rename(2) + */ +function renameSync(oldPath, newPath) { + let source = new nsILocalFile(oldPath); + let target = new nsILocalFile(newPath); + if (!source.exists()) throw FSError("rename", "ENOENT", 34, oldPath); + return source.moveTo(target.parent, target.leafName); +}; +exports.renameSync = renameSync; + +/** + * Asynchronous rename(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var rename = Async(renameSync); +exports.rename = rename; + +/** + * Test whether or not the given path exists by checking with the file system. + */ +function existsSync(path) { + return new nsILocalFile(path).exists(); +} +exports.existsSync = existsSync; + +var exists = Async(existsSync); +exports.exists = exists; + +/** + * Synchronous ftruncate(2). + */ +function truncateSync(path, length) { + let fd = openSync(path, "w"); + ftruncateSync(fd, length); + closeSync(fd); +} +exports.truncateSync = truncateSync; + +/** + * Asynchronous ftruncate(2). No arguments other than a possible exception are + * given to the completion callback. + */ +function truncate(path, length, callback) { + open(path, "w", function(error, fd) { + if (error) return callback(error); + ftruncate(fd, length, function(error) { + if (error) { + closeSync(fd); + callback(error); + } + else { + close(fd, callback); + } + }); + }); +} +exports.truncate = truncate; + +function ftruncate(fd, length, callback) { + write(fd, new Buffer(length), 0, length, 0, function(error) { + callback(error); + }); +} +exports.ftruncate = ftruncate; + +function ftruncateSync(fd, length = 0) { + writeSync(fd, new Buffer(length), 0, length, 0); +} +exports.ftruncateSync = ftruncateSync; + +function chownSync(path, uid, gid) { + throw Error("Not implemented yet!!"); +} +exports.chownSync = chownSync; + +var chown = Async(chownSync); +exports.chown = chown; + +function lchownSync(path, uid, gid) { + throw Error("Not implemented yet!!"); +} +exports.lchownSync = chownSync; + +var lchown = Async(lchown); +exports.lchown = lchown; + +/** + * Synchronous chmod(2). + */ +function chmodSync (path, mode) { + let file; + try { + file = new nsILocalFile(path); + } catch(e) { + throw FSError("chmod", "ENOENT", 34, path); + } + + file.permissions = Mode(mode); +} +exports.chmodSync = chmodSync; +/** + * Asynchronous chmod(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var chmod = Async(chmodSync); +exports.chmod = chmod; + +/** + * Synchronous chmod(2). + */ +function fchmodSync(fd, mode) { + throw Error("Not implemented yet!!"); +}; +exports.fchmodSync = fchmodSync; +/** + * Asynchronous chmod(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var fchmod = Async(fchmodSync); +exports.fchmod = fchmod; + + +/** + * Synchronous stat(2). Returns an instance of `fs.Stats` + */ +function statSync(path) { + return new Stats(path); +}; +exports.statSync = statSync; + +/** + * Asynchronous stat(2). The callback gets two arguments (err, stats) where + * stats is a `fs.Stats` object. It looks like this: + */ +var stat = Async(statSync); +exports.stat = stat; + +/** + * Synchronous lstat(2). Returns an instance of `fs.Stats`. + */ +function lstatSync(path) { + return new LStats(path); +}; +exports.lstatSync = lstatSync; + +/** + * Asynchronous lstat(2). The callback gets two arguments (err, stats) where + * stats is a fs.Stats object. lstat() is identical to stat(), except that if + * path is a symbolic link, then the link itself is stat-ed, not the file that + * it refers to. + */ +var lstat = Async(lstatSync); +exports.lstat = lstat; + +/** + * Synchronous fstat(2). Returns an instance of `fs.Stats`. + */ +function fstatSync(fd) { + return new FStat(fd); +}; +exports.fstatSync = fstatSync; + +/** + * Asynchronous fstat(2). The callback gets two arguments (err, stats) where + * stats is a fs.Stats object. + */ +var fstat = Async(fstatSync); +exports.fstat = fstat; + +/** + * Synchronous link(2). + */ +function linkSync(source, target) { + throw Error("Not implemented yet!!"); +}; +exports.linkSync = linkSync; + +/** + * Asynchronous link(2). No arguments other than a possible exception are given + * to the completion callback. + */ +var link = Async(linkSync); +exports.link = link; + +/** + * Synchronous symlink(2). + */ +function symlinkSync(source, target) { + throw Error("Not implemented yet!!"); +}; +exports.symlinkSync = symlinkSync; + +/** + * Asynchronous symlink(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var symlink = Async(symlinkSync); +exports.symlink = symlink; + +/** + * Synchronous readlink(2). Returns the resolved path. + */ +function readlinkSync(path) { + return new nsILocalFile(path).target; +}; +exports.readlinkSync = readlinkSync; + +/** + * Asynchronous readlink(2). The callback gets two arguments + * `(error, resolvedPath)`. + */ +var readlink = Async(readlinkSync); +exports.readlink = readlink; + +/** + * Synchronous realpath(2). Returns the resolved path. + */ +function realpathSync(path) { + return new nsILocalFile(path).path; +}; +exports.realpathSync = realpathSync; + +/** + * Asynchronous realpath(2). The callback gets two arguments + * `(err, resolvedPath)`. + */ +var realpath = Async(realpathSync); +exports.realpath = realpath; + +/** + * Synchronous unlink(2). + */ +var unlinkSync = remove; +exports.unlinkSync = unlinkSync; + +/** + * Asynchronous unlink(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var unlink = Async(remove); +exports.unlink = unlink; + +/** + * Synchronous rmdir(2). + */ +var rmdirSync = remove; +exports.rmdirSync = rmdirSync; + +/** + * Asynchronous rmdir(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var rmdir = Async(rmdirSync); +exports.rmdir = rmdir; + +/** + * Synchronous mkdir(2). + */ +function mkdirSync(path, mode) { + try { + return nsILocalFile(path).create(DIRECTORY_TYPE, Mode(mode)); + } catch (error) { + // Adjust exception thorw to match ones thrown by node. + if (error.name === "NS_ERROR_FILE_ALREADY_EXISTS") { + let { fileName, lineNumber } = error; + error = FSError("mkdir", "EEXIST", 47, path, fileName, lineNumber); + } + throw error; + } +}; +exports.mkdirSync = mkdirSync; + +/** + * Asynchronous mkdir(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var mkdir = Async(mkdirSync); +exports.mkdir = mkdir; + +/** + * Synchronous readdir(3). Returns an array of filenames excluding `"."` and + * `".."`. + */ +function readdirSync(path) { + try { + return toArray(new nsILocalFile(path).directoryEntries).map(getFileName); + } + catch (error) { + // Adjust exception thorw to match ones thrown by node. + if (error.name === "NS_ERROR_FILE_TARGET_DOES_NOT_EXIST" || + error.name === "NS_ERROR_FILE_NOT_FOUND") + { + let { fileName, lineNumber } = error; + error = FSError("readdir", "ENOENT", 34, path, fileName, lineNumber); + } + throw error; + } +}; +exports.readdirSync = readdirSync; + +/** + * Asynchronous readdir(3). Reads the contents of a directory. The callback + * gets two arguments `(error, files)` where `files` is an array of the names + * of the files in the directory excluding `"."` and `".."`. + */ +var readdir = Async(readdirSync); +exports.readdir = readdir; + +/** + * Synchronous close(2). + */ + function closeSync(fd) { + let input = nsIFileInputStream(fd); + let output = nsIFileOutputStream(fd); + + // Closing input stream and removing reference. + if (input) input.close(); + // Closing output stream and removing reference. + if (output) output.close(); + + nsIFile(fd, null); + nsIFileInputStream(fd, null); + nsIFileOutputStream(fd, null); + nsIBinaryInputStream(fd, null); + nsIBinaryOutputStream(fd, null); +}; +exports.closeSync = closeSync; +/** + * Asynchronous close(2). No arguments other than a possible exception are + * given to the completion callback. + */ +var close = Async(closeSync); +exports.close = close; + +/** + * Synchronous open(2). + */ +function openSync(aPath, aFlag, aMode) { + let [ fd, flags, mode, file ] = + [ { path: aPath }, Flags(aFlag), Mode(aMode), nsILocalFile(aPath) ]; + + nsIFile(fd, file); + + // If trying to open file for just read that does not exists + // need to throw exception as node does. + if (!file.exists() && !isWritable(flags)) + throw FSError("open", "ENOENT", 34, aPath); + + // If we want to open file in read mode we initialize input stream. + if (isReadable(flags)) { + let input = FileInputStream(file, flags, mode, DEFER_OPEN); + nsIFileInputStream(fd, input); + } + + // If we want to open file in write mode we initialize output stream for it. + if (isWritable(flags)) { + let output = FileOutputStream(file, flags, mode, DEFER_OPEN); + nsIFileOutputStream(fd, output); + } + + return fd; +} +exports.openSync = openSync; +/** + * Asynchronous file open. See open(2). Flags can be + * `"r", "r+", "w", "w+", "a"`, or `"a+"`. mode defaults to `0666`. + * The callback gets two arguments `(error, fd). + */ +var open = Async(openSync); +exports.open = open; + +/** + * Synchronous version of buffer-based fs.write(). Returns the number of bytes + * written. + */ +function writeSync(fd, buffer, offset, length, position) { + if (length + offset > buffer.length) { + throw Error("Length is extends beyond buffer"); + } + else if (length + offset !== buffer.length) { + buffer = buffer.slice(offset, offset + length); + } + + let output = BinaryOutputStream(nsIFileOutputStream(fd)); + nsIBinaryOutputStream(fd, output); + // We write content as a byte array as this will avoid any transcoding + // if content was a buffer. + output.writeByteArray(buffer.valueOf(), buffer.length); + output.flush(); +}; +exports.writeSync = writeSync; + +/** + * Write buffer to the file specified by fd. + * + * `offset` and `length` determine the part of the buffer to be written. + * + * `position` refers to the offset from the beginning of the file where this + * data should be written. If `position` is `null`, the data will be written + * at the current position. See pwrite(2). + * + * The callback will be given three arguments `(error, written, buffer)` where + * written specifies how many bytes were written into buffer. + * + * Note that it is unsafe to use `fs.write` multiple times on the same file + * without waiting for the callback. + */ +function write(fd, buffer, offset, length, position, callback) { + if (!Buffer.isBuffer(buffer)) { + // (fd, data, position, encoding, callback) + let encoding = null; + [ position, encoding, callback ] = Array.slice(arguments, 1); + buffer = new Buffer(String(buffer), encoding); + offset = 0; + } else if (length + offset > buffer.length) { + throw Error("Length is extends beyond buffer"); + } else if (length + offset !== buffer.length) { + buffer = buffer.slice(offset, offset + length); + } + + let writeStream = new WriteStream(fd, { position: position, + length: length }); + writeStream.on("error", callback); + writeStream.write(buffer, function onEnd() { + writeStream.destroy(); + if (callback) + callback(null, buffer.length, buffer); + }); +}; +exports.write = write; + +/** + * Synchronous version of string-based fs.read. Returns the number of + * bytes read. + */ +function readSync(fd, buffer, offset, length, position) { + let input = nsIFileInputStream(fd); + // Setting a stream position, unless it"s `-1` which means current position. + if (position >= 0) + input.QueryInterface(Ci.nsISeekableStream).seek(NS_SEEK_SET, position); + // We use `nsIStreamTransportService` service to transform blocking + // file input stream into a fully asynchronous stream that can be written + // without blocking the main thread. + let binaryInputStream = BinaryInputStream(input); + let count = length === ALL ? binaryInputStream.available() : length; + if (offset === 0) binaryInputStream.readArrayBuffer(count, buffer.buffer); + else { + let chunk = new Buffer(count); + binaryInputStream.readArrayBuffer(count, chunk.buffer); + chunk.copy(buffer, offset); + } + + return buffer.slice(offset, offset + count); +}; +exports.readSync = readSync; + +/** + * Read data from the file specified by `fd`. + * + * `buffer` is the buffer that the data will be written to. + * `offset` is offset within the buffer where writing will start. + * + * `length` is an integer specifying the number of bytes to read. + * + * `position` is an integer specifying where to begin reading from in the file. + * If `position` is `null`, data will be read from the current file position. + * + * The callback is given the three arguments, `(error, bytesRead, buffer)`. + */ +function read(fd, buffer, offset, length, position, callback) { + let bytesRead = 0; + let readStream = new ReadStream(fd, { position: position, length: length }); + readStream.on("data", function onData(data) { + data.copy(buffer, offset + bytesRead); + bytesRead += data.length; + }); + readStream.on("end", function onEnd() { + callback(null, bytesRead, buffer); + readStream.destroy(); + }); +}; +exports.read = read; + +/** + * Asynchronously reads the entire contents of a file. + * The callback is passed two arguments `(error, data)`, where data is the + * contents of the file. + */ +function readFile(path, encoding, callback) { + if (isFunction(encoding)) { + callback = encoding + encoding = null + } + + let buffer = null; + try { + let readStream = new ReadStream(path); + readStream.on("data", function(data) { + if (!buffer) buffer = data; + else buffer = Buffer.concat([buffer, data], 2); + }); + readStream.on("error", function onError(error) { + callback(error); + }); + readStream.on("end", function onEnd() { + // Note: Need to destroy before invoking a callback + // so that file descriptor is released. + readStream.destroy(); + callback(null, buffer); + }); + } + catch (error) { + setTimeout(callback, 0, error); + } +}; +exports.readFile = readFile; + +/** + * Synchronous version of `fs.readFile`. Returns the contents of the path. + * If encoding is specified then this function returns a string. + * Otherwise it returns a buffer. + */ +function readFileSync(path, encoding) { + let fd = openSync(path, "r"); + let size = fstatSync(fd).size; + let buffer = new Buffer(size); + try { + readSync(fd, buffer, 0, ALL, 0); + } + finally { + closeSync(fd); + } + return buffer; +}; +exports.readFileSync = readFileSync; + +/** + * Asynchronously writes data to a file, replacing the file if it already + * exists. data can be a string or a buffer. + */ +function writeFile(path, content, encoding, callback) { + if (!isString(path)) + throw new TypeError('path must be a string'); + + try { + if (isFunction(encoding)) { + callback = encoding + encoding = null + } + if (isString(content)) + content = new Buffer(content, encoding); + + let writeStream = new WriteStream(path); + let error = null; + + writeStream.end(content, function() { + writeStream.destroy(); + callback(error); + }); + + writeStream.on("error", function onError(reason) { + error = reason; + writeStream.destroy(); + }); + } catch (error) { + callback(error); + } +}; +exports.writeFile = writeFile; + +/** + * The synchronous version of `fs.writeFile`. + */ +function writeFileSync(filename, data, encoding) { + // TODO: Implement this in bug 1148209 https://bugzilla.mozilla.org/show_bug.cgi?id=1148209 + throw Error("Not implemented"); +}; +exports.writeFileSync = writeFileSync; + + +function utimesSync(path, atime, mtime) { + throw Error("Not implemented"); +} +exports.utimesSync = utimesSync; + +var utimes = Async(utimesSync); +exports.utimes = utimes; + +function futimesSync(fd, atime, mtime, callback) { + throw Error("Not implemented"); +} +exports.futimesSync = futimesSync; + +var futimes = Async(futimesSync); +exports.futimes = futimes; + +function fsyncSync(fd, atime, mtime, callback) { + throw Error("Not implemented"); +} +exports.fsyncSync = fsyncSync; + +var fsync = Async(fsyncSync); +exports.fsync = fsync; + + +/** + * Watch for changes on filename. The callback listener will be called each + * time the file is accessed. + * + * The second argument is optional. The options if provided should be an object + * containing two members a boolean, persistent, and interval, a polling value + * in milliseconds. The default is { persistent: true, interval: 0 }. + */ +function watchFile(path, options, listener) { + throw Error("Not implemented"); +}; +exports.watchFile = watchFile; + + +function unwatchFile(path, listener) { + throw Error("Not implemented"); +} +exports.unwatchFile = unwatchFile; + +function watch(path, options, listener) { + throw Error("Not implemented"); +} +exports.watch = watch; diff --git a/toolkit/jetpack/sdk/io/stream.js b/toolkit/jetpack/sdk/io/stream.js new file mode 100644 index 000000000..0698b8e32 --- /dev/null +++ b/toolkit/jetpack/sdk/io/stream.js @@ -0,0 +1,440 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +module.metadata = { + "stability": "experimental" +}; + +const { CC, Cc, Ci, Cu, Cr, components } = require("chrome"); +const { EventTarget } = require("../event/target"); +const { emit } = require("../event/core"); +const { Buffer } = require("./buffer"); +const { Class } = require("../core/heritage"); +const { setTimeout } = require("../timers"); + + +const MultiplexInputStream = CC("@mozilla.org/io/multiplex-input-stream;1", + "nsIMultiplexInputStream"); +const AsyncStreamCopier = CC("@mozilla.org/network/async-stream-copier;1", + "nsIAsyncStreamCopier", "init"); +const StringInputStream = CC("@mozilla.org/io/string-input-stream;1", + "nsIStringInputStream"); +const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1", + "nsIArrayBufferInputStream"); + +const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", + "nsIBinaryInputStream", "setInputStream"); +const InputStreamPump = CC("@mozilla.org/network/input-stream-pump;1", + "nsIInputStreamPump", "init"); + +const threadManager = Cc["@mozilla.org/thread-manager;1"]. + getService(Ci.nsIThreadManager); + +const eventTarget = Cc["@mozilla.org/network/stream-transport-service;1"]. + getService(Ci.nsIEventTarget); + +var isFunction = value => typeof(value) === "function" + +function accessor() { + let map = new WeakMap(); + return function(target, value) { + if (value) + map.set(target, value); + return map.get(target); + } +} + +const Stream = Class({ + extends: EventTarget, + initialize: function() { + this.readable = false; + this.writable = false; + this.encoding = null; + }, + setEncoding: function setEncoding(encoding) { + this.encoding = String(encoding).toUpperCase(); + }, + pipe: function pipe(target, options) { + let source = this; + function onData(chunk) { + if (target.writable) { + if (false === target.write(chunk)) + source.pause(); + } + } + function onDrain() { + if (source.readable) + source.resume(); + } + function onEnd() { + target.end(); + } + function onPause() { + source.pause(); + } + function onResume() { + if (source.readable) + source.resume(); + } + + function cleanup() { + source.removeListener("data", onData); + target.removeListener("drain", onDrain); + source.removeListener("end", onEnd); + + target.removeListener("pause", onPause); + target.removeListener("resume", onResume); + + source.removeListener("end", cleanup); + source.removeListener("close", cleanup); + + target.removeListener("end", cleanup); + target.removeListener("close", cleanup); + } + + if (!options || options.end !== false) + target.on("end", onEnd); + + source.on("data", onData); + target.on("drain", onDrain); + target.on("resume", onResume); + target.on("pause", onPause); + + source.on("end", cleanup); + source.on("close", cleanup); + + target.on("end", cleanup); + target.on("close", cleanup); + + emit(target, "pipe", source); + }, + pause: function pause() { + emit(this, "pause"); + }, + resume: function resume() { + emit(this, "resume"); + }, + destroySoon: function destroySoon() { + this.destroy(); + } +}); +exports.Stream = Stream; + + +var nsIStreamListener = accessor(); +var nsIInputStreamPump = accessor(); +var nsIAsyncInputStream = accessor(); +var nsIBinaryInputStream = accessor(); + +const StreamListener = Class({ + initialize: function(stream) { + this.stream = stream; + }, + + // Next three methods are part of `nsIStreamListener` interface and are + // invoked by `nsIInputStreamPump.asyncRead`. + onDataAvailable: function(request, context, input, offset, count) { + let stream = this.stream; + let buffer = new ArrayBuffer(count); + nsIBinaryInputStream(stream).readArrayBuffer(count, buffer); + emit(stream, "data", new Buffer(buffer)); + }, + + // Next two methods implement `nsIRequestObserver` interface and are invoked + // by `nsIInputStreamPump.asyncRead`. + onStartRequest: function() {}, + // Called to signify the end of an asynchronous request. We only care to + // discover errors. + onStopRequest: function(request, context, status) { + let stream = this.stream; + stream.readable = false; + if (!components.isSuccessCode(status)) + emit(stream, "error", status); + else + emit(stream, "end"); + } +}); + + +const InputStream = Class({ + extends: Stream, + readable: false, + paused: false, + initialize: function initialize(options) { + let { asyncInputStream } = options; + + this.readable = true; + + let binaryInputStream = new BinaryInputStream(asyncInputStream); + let inputStreamPump = new InputStreamPump(asyncInputStream, + -1, -1, 0, 0, false); + let streamListener = new StreamListener(this); + + nsIAsyncInputStream(this, asyncInputStream); + nsIInputStreamPump(this, inputStreamPump); + nsIBinaryInputStream(this, binaryInputStream); + nsIStreamListener(this, streamListener); + + this.asyncInputStream = asyncInputStream; + this.inputStreamPump = inputStreamPump; + this.binaryInputStream = binaryInputStream; + }, + get status() { + return nsIInputStreamPump(this).status; + }, + read: function() { + nsIInputStreamPump(this).asyncRead(nsIStreamListener(this), null); + }, + pause: function pause() { + this.paused = true; + nsIInputStreamPump(this).suspend(); + emit(this, "paused"); + }, + resume: function resume() { + this.paused = false; + if (nsIInputStreamPump(this).isPending()) { + nsIInputStreamPump(this).resume(); + emit(this, "resume"); + } + }, + close: function close() { + this.readable = false; + nsIInputStreamPump(this).cancel(Cr.NS_OK); + nsIBinaryInputStream(this).close(); + nsIAsyncInputStream(this).close(); + }, + destroy: function destroy() { + this.close(); + + nsIInputStreamPump(this); + nsIAsyncInputStream(this); + nsIBinaryInputStream(this); + nsIStreamListener(this); + } +}); +exports.InputStream = InputStream; + + + +var nsIRequestObserver = accessor(); +var nsIAsyncOutputStream = accessor(); +var nsIAsyncStreamCopier = accessor(); +var nsIMultiplexInputStream = accessor(); + +const RequestObserver = Class({ + initialize: function(stream) { + this.stream = stream; + }, + // Method is part of `nsIRequestObserver` interface that is + // invoked by `nsIAsyncStreamCopier.asyncCopy`. + onStartRequest: function() {}, + // Method is part of `nsIRequestObserver` interface that is + // invoked by `nsIAsyncStreamCopier.asyncCopy`. + onStopRequest: function(request, context, status) { + let stream = this.stream; + stream.drained = true; + + // Remove copied chunk. + let multiplexInputStream = nsIMultiplexInputStream(stream); + multiplexInputStream.removeStream(0); + + // If there was an error report. + if (!components.isSuccessCode(status)) + emit(stream, "error", status); + + // If there more chunks in queue then flush them. + else if (multiplexInputStream.count) + stream.flush(); + + // If stream is still writable notify that queue has drained. + else if (stream.writable) + emit(stream, "drain"); + + // If stream is no longer writable close it. + else { + nsIAsyncStreamCopier(stream).cancel(Cr.NS_OK); + nsIMultiplexInputStream(stream).close(); + nsIAsyncOutputStream(stream).close(); + nsIAsyncOutputStream(stream).flush(); + } + } +}); + +const OutputStreamCallback = Class({ + initialize: function(stream) { + this.stream = stream; + }, + // Method is part of `nsIOutputStreamCallback` interface that + // is invoked by `nsIAsyncOutputStream.asyncWait`. It is registered + // with `WAIT_CLOSURE_ONLY` flag that overrides the default behavior, + // causing the `onOutputStreamReady` notification to be suppressed until + // the stream becomes closed. + onOutputStreamReady: function(nsIAsyncOutputStream) { + emit(this.stream, "finish"); + } +}); + +const OutputStream = Class({ + extends: Stream, + writable: false, + drained: true, + get bufferSize() { + let multiplexInputStream = nsIMultiplexInputStream(this); + return multiplexInputStream && multiplexInputStream.available(); + }, + initialize: function initialize(options) { + let { asyncOutputStream, output } = options; + this.writable = true; + + // Ensure that `nsIAsyncOutputStream` was provided. + asyncOutputStream.QueryInterface(Ci.nsIAsyncOutputStream); + + // Create a `nsIMultiplexInputStream` and `nsIAsyncStreamCopier`. Former + // is used to queue written data chunks that `asyncStreamCopier` will + // asynchronously drain into `asyncOutputStream`. + let multiplexInputStream = MultiplexInputStream(); + let asyncStreamCopier = AsyncStreamCopier(multiplexInputStream, + output || asyncOutputStream, + eventTarget, + // nsIMultiplexInputStream + // implemnts .readSegments() + true, + // nsIOutputStream may or + // may not implemnet + // .writeSegments(). + false, + // Use default buffer size. + null, + // Should not close an input. + false, + // Should not close an output. + false); + + // Create `requestObserver` implementing `nsIRequestObserver` interface + // in the constructor that's gonna be reused across several flushes. + let requestObserver = RequestObserver(this); + + + // Create observer that implements `nsIOutputStreamCallback` and register + // using `WAIT_CLOSURE_ONLY` flag. That way it will be notfied once + // `nsIAsyncOutputStream` is closed. + asyncOutputStream.asyncWait(OutputStreamCallback(this), + asyncOutputStream.WAIT_CLOSURE_ONLY, + 0, + threadManager.currentThread); + + nsIRequestObserver(this, requestObserver); + nsIAsyncOutputStream(this, asyncOutputStream); + nsIMultiplexInputStream(this, multiplexInputStream); + nsIAsyncStreamCopier(this, asyncStreamCopier); + + this.asyncOutputStream = asyncOutputStream; + this.multiplexInputStream = multiplexInputStream; + this.asyncStreamCopier = asyncStreamCopier; + }, + write: function write(content, encoding, callback) { + if (isFunction(encoding)) { + callback = encoding; + encoding = callback; + } + + // If stream is not writable we throw an error. + if (!this.writable) throw Error("stream is not writable"); + + let chunk = null; + + // If content is not a buffer then we create one out of it. + if (Buffer.isBuffer(content)) { + chunk = new ArrayBufferInputStream(); + chunk.setData(content.buffer, 0, content.length); + } + else { + chunk = new StringInputStream(); + chunk.setData(content, content.length); + } + + if (callback) + this.once("drain", callback); + + // Queue up chunk to be copied to output sync. + nsIMultiplexInputStream(this).appendStream(chunk); + this.flush(); + + return this.drained; + }, + flush: function() { + if (this.drained) { + this.drained = false; + nsIAsyncStreamCopier(this).asyncCopy(nsIRequestObserver(this), null); + } + }, + end: function end(content, encoding, callback) { + if (isFunction(content)) { + callback = content + content = callback + } + if (isFunction(encoding)) { + callback = encoding + encoding = callback + } + + // Setting a listener to "finish" event if passed. + if (isFunction(callback)) + this.once("finish", callback); + + + if (content) + this.write(content, encoding); + this.writable = false; + + // Close `asyncOutputStream` only if output has drained. If it's + // not drained than `asyncStreamCopier` is busy writing, so let + // it finish. Note that since `this.writable` is false copier will + // close `asyncOutputStream` once output drains. + if (this.drained) + nsIAsyncOutputStream(this).close(); + }, + destroy: function destroy() { + nsIAsyncOutputStream(this).close(); + nsIAsyncOutputStream(this); + nsIMultiplexInputStream(this); + nsIAsyncStreamCopier(this); + nsIRequestObserver(this); + } +}); +exports.OutputStream = OutputStream; + +const DuplexStream = Class({ + extends: Stream, + implements: [InputStream, OutputStream], + allowHalfOpen: true, + initialize: function initialize(options) { + options = options || {}; + let { readable, writable, allowHalfOpen } = options; + + InputStream.prototype.initialize.call(this, options); + OutputStream.prototype.initialize.call(this, options); + + if (readable === false) + this.readable = false; + + if (writable === false) + this.writable = false; + + if (allowHalfOpen === false) + this.allowHalfOpen = false; + + // If in a half open state and it's disabled enforce end. + this.once("end", () => { + if (!this.allowHalfOpen && (!this.readable || !this.writable)) + this.end(); + }); + }, + destroy: function destroy(error) { + InputStream.prototype.destroy.call(this); + OutputStream.prototype.destroy.call(this); + } +}); +exports.DuplexStream = DuplexStream; diff --git a/toolkit/jetpack/sdk/io/text-streams.js b/toolkit/jetpack/sdk/io/text-streams.js new file mode 100644 index 000000000..ed4ec4972 --- /dev/null +++ b/toolkit/jetpack/sdk/io/text-streams.js @@ -0,0 +1,235 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +module.metadata = { + "stability": "experimental" +}; + +const { Cc, Ci, Cu, components } = require("chrome"); +const { ensure } = require("../system/unload"); +const { NetUtil } = Cu.import("resource://gre/modules/NetUtil.jsm", {}); + +// NetUtil.asyncCopy() uses this buffer length, and since we call it, for best +// performance we use it, too. +const BUFFER_BYTE_LEN = 0x8000; +const PR_UINT32_MAX = 0xffffffff; +const DEFAULT_CHARSET = "UTF-8"; + + +/** + * An input stream that reads text from a backing stream using a given text + * encoding. + * + * @param inputStream + * The stream is backed by this nsIInputStream. It must already be + * opened. + * @param charset + * Text in inputStream is expected to be in this character encoding. If + * not given, "UTF-8" is assumed. See nsICharsetConverterManager.idl for + * documentation on how to determine other valid values for this. + */ +function TextReader(inputStream, charset) { + charset = checkCharset(charset); + + let stream = Cc["@mozilla.org/intl/converter-input-stream;1"]. + createInstance(Ci.nsIConverterInputStream); + stream.init(inputStream, charset, BUFFER_BYTE_LEN, + Ci.nsIConverterInputStream.DEFAULT_REPLACEMENT_CHARACTER); + + let manager = new StreamManager(this, stream); + + /** + * Reads a string from the stream. If the stream is closed, an exception is + * thrown. + * + * @param numChars + * The number of characters to read. If not given, the remainder of + * the stream is read. + * @return The string read. If the stream is already at EOS, returns the + * empty string. + */ + this.read = function TextReader_read(numChars) { + manager.ensureOpened(); + + let readAll = false; + if (typeof(numChars) === "number") + numChars = Math.max(numChars, 0); + else + readAll = true; + + let str = ""; + let totalRead = 0; + let chunkRead = 1; + + // Read in numChars or until EOS, whichever comes first. Note that the + // units here are characters, not bytes. + while (true) { + let chunk = {}; + let toRead = readAll ? + PR_UINT32_MAX : + Math.min(numChars - totalRead, PR_UINT32_MAX); + if (toRead <= 0 || chunkRead <= 0) + break; + + // The converter stream reads in at most BUFFER_BYTE_LEN bytes in a call + // to readString, enough to fill its byte buffer. chunkRead will be the + // number of characters encoded by the bytes in that buffer. + chunkRead = stream.readString(toRead, chunk); + str += chunk.value; + totalRead += chunkRead; + } + + return str; + }; +} +exports.TextReader = TextReader; + +/** + * A buffered output stream that writes text to a backing stream using a given + * text encoding. + * + * @param outputStream + * The stream is backed by this nsIOutputStream. It must already be + * opened. + * @param charset + * Text will be written to outputStream using this character encoding. + * If not given, "UTF-8" is assumed. See nsICharsetConverterManager.idl + * for documentation on how to determine other valid values for this. + */ +function TextWriter(outputStream, charset) { + charset = checkCharset(charset); + + let stream = outputStream; + + // Buffer outputStream if it's not already. + let ioUtils = Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil); + if (!ioUtils.outputStreamIsBuffered(outputStream)) { + stream = Cc["@mozilla.org/network/buffered-output-stream;1"]. + createInstance(Ci.nsIBufferedOutputStream); + stream.init(outputStream, BUFFER_BYTE_LEN); + } + + // I'd like to use nsIConverterOutputStream. But NetUtil.asyncCopy(), which + // we use below in writeAsync(), naturally expects its sink to be an instance + // of nsIOutputStream, which nsIConverterOutputStream's only implementation is + // not. So we use uconv and manually convert all strings before writing to + // outputStream. + let uconv = Cc["@mozilla.org/intl/scriptableunicodeconverter"]. + createInstance(Ci.nsIScriptableUnicodeConverter); + uconv.charset = charset; + + let manager = new StreamManager(this, stream); + + /** + * Flushes the backing stream's buffer. + */ + this.flush = function TextWriter_flush() { + manager.ensureOpened(); + stream.flush(); + }; + + /** + * Writes a string to the stream. If the stream is closed, an exception is + * thrown. + * + * @param str + * The string to write. + */ + this.write = function TextWriter_write(str) { + manager.ensureOpened(); + let istream = uconv.convertToInputStream(str); + let len = istream.available(); + while (len > 0) { + stream.writeFrom(istream, len); + len = istream.available(); + } + istream.close(); + }; + + /** + * Writes a string on a background thread. After the write completes, the + * backing stream's buffer is flushed, and both the stream and the backing + * stream are closed, also on the background thread. If the stream is already + * closed, an exception is thrown immediately. + * + * @param str + * The string to write. + * @param callback + * An optional function. If given, it's called as callback(error) when + * the write completes. error is an Error object or undefined if there + * was no error. Inside callback, |this| is the stream object. + */ + this.writeAsync = function TextWriter_writeAsync(str, callback) { + manager.ensureOpened(); + let istream = uconv.convertToInputStream(str); + NetUtil.asyncCopy(istream, stream, (result) => { + let err = components.isSuccessCode(result) ? undefined : + new Error("An error occured while writing to the stream: " + result); + if (err) + console.error(err); + + // asyncCopy() closes its output (and input) stream. + manager.opened = false; + + if (typeof(callback) === "function") { + try { + callback.call(this, err); + } + catch (exc) { + console.exception(exc); + } + } + }); + }; +} +exports.TextWriter = TextWriter; + +// This manages the lifetime of stream, a TextReader or TextWriter. It defines +// closed and close() on stream and registers an unload listener that closes +// rawStream if it's still opened. It also provides ensureOpened(), which +// throws an exception if the stream is closed. +function StreamManager(stream, rawStream) { + this.rawStream = rawStream; + this.opened = true; + + /** + * True iff the stream is closed. + */ + stream.__defineGetter__("closed", () => !this.opened); + + /** + * Closes both the stream and its backing stream. If the stream is already + * closed, an exception is thrown. For TextWriters, this first flushes the + * backing stream's buffer. + */ + stream.close = () => { + this.ensureOpened(); + this.unload(); + }; + + ensure(this); +} + +StreamManager.prototype = { + ensureOpened: function StreamManager_ensureOpened() { + if (!this.opened) + throw new Error("The stream is closed and cannot be used."); + }, + unload: function StreamManager_unload() { + // TextWriter.writeAsync() causes rawStream to close and therefore sets + // opened to false, so check that we're still opened. + if (this.opened) { + // Calling close() on both an nsIUnicharInputStream and + // nsIBufferedOutputStream closes their backing streams. It also forces + // nsIOutputStreams to flush first. + this.rawStream.close(); + this.opened = false; + } + } +}; + +function checkCharset(charset) { + return typeof(charset) === "string" ? charset : DEFAULT_CHARSET; +} |