diff options
Diffstat (limited to 'devtools/shared/transport')
20 files changed, 3180 insertions, 0 deletions
diff --git a/devtools/shared/transport/moz.build b/devtools/shared/transport/moz.build new file mode 100644 index 000000000..fbf0444b8 --- /dev/null +++ b/devtools/shared/transport/moz.build @@ -0,0 +1,14 @@ +# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*- +# vim: set filetype=python: +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +XPCSHELL_TESTS_MANIFESTS += ['tests/unit/xpcshell.ini'] + +DevToolsModules( + 'packets.js', + 'stream-utils.js', + 'transport.js', + 'websocket-transport.js', +) diff --git a/devtools/shared/transport/packets.js b/devtools/shared/transport/packets.js new file mode 100644 index 000000000..3ebcd8f13 --- /dev/null +++ b/devtools/shared/transport/packets.js @@ -0,0 +1,414 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +/** + * Packets contain read / write functionality for the different packet types + * supported by the debugging protocol, so that a transport can focus on + * delivery and queue management without worrying too much about the specific + * packet types. + * + * They are intended to be "one use only", so a new packet should be + * instantiated for each incoming or outgoing packet. + * + * A complete Packet type should expose at least the following: + * * read(stream, scriptableStream) + * Called when the input stream has data to read + * * write(stream) + * Called when the output stream is ready to write + * * get done() + * Returns true once the packet is done being read / written + * * destroy() + * Called to clean up at the end of use + */ + +const { Cc, Ci, Cu } = require("chrome"); +const DevToolsUtils = require("devtools/shared/DevToolsUtils"); +const { dumpn, dumpv } = DevToolsUtils; +const flags = require("devtools/shared/flags"); +const StreamUtils = require("devtools/shared/transport/stream-utils"); +const promise = require("promise"); +const defer = require("devtools/shared/defer"); + +DevToolsUtils.defineLazyGetter(this, "unicodeConverter", () => { + const unicodeConverter = Cc["@mozilla.org/intl/scriptableunicodeconverter"] + .createInstance(Ci.nsIScriptableUnicodeConverter); + unicodeConverter.charset = "UTF-8"; + return unicodeConverter; +}); + +// The transport's previous check ensured the header length did not exceed 20 +// characters. Here, we opt for the somewhat smaller, but still large limit of +// 1 TiB. +const PACKET_LENGTH_MAX = Math.pow(2, 40); + +/** + * A generic Packet processing object (extended by two subtypes below). + */ +function Packet(transport) { + this._transport = transport; + this._length = 0; +} + +/** + * Attempt to initialize a new Packet based on the incoming packet header we've + * received so far. We try each of the types in succession, trying JSON packets + * first since they are much more common. + * @param header string + * The packet header string to attempt parsing. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @return Packet + * The parsed packet of the matching type, or null if no types matched. + */ +Packet.fromHeader = function (header, transport) { + return JSONPacket.fromHeader(header, transport) || + BulkPacket.fromHeader(header, transport); +}; + +Packet.prototype = { + + get length() { + return this._length; + }, + + set length(length) { + if (length > PACKET_LENGTH_MAX) { + throw Error("Packet length " + length + " exceeds the max length of " + + PACKET_LENGTH_MAX); + } + this._length = length; + }, + + destroy: function () { + this._transport = null; + } + +}; + +exports.Packet = Packet; + +/** + * With a JSON packet (the typical packet type sent via the transport), data is + * transferred as a JSON packet serialized into a string, with the string length + * prepended to the packet, followed by a colon ([length]:[packet]). The + * contents of the JSON packet are specified in the Remote Debugging Protocol + * specification. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + */ +function JSONPacket(transport) { + Packet.call(this, transport); + this._data = ""; + this._done = false; +} + +/** + * Attempt to initialize a new JSONPacket based on the incoming packet header + * we've received so far. + * @param header string + * The packet header string to attempt parsing. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @return JSONPacket + * The parsed packet, or null if it's not a match. + */ +JSONPacket.fromHeader = function (header, transport) { + let match = this.HEADER_PATTERN.exec(header); + + if (!match) { + return null; + } + + dumpv("Header matches JSON packet"); + let packet = new JSONPacket(transport); + packet.length = +match[1]; + return packet; +}; + +JSONPacket.HEADER_PATTERN = /^(\d+):$/; + +JSONPacket.prototype = Object.create(Packet.prototype); + +Object.defineProperty(JSONPacket.prototype, "object", { + /** + * Gets the object (not the serialized string) being read or written. + */ + get: function () { return this._object; }, + + /** + * Sets the object to be sent when write() is called. + */ + set: function (object) { + this._object = object; + let data = JSON.stringify(object); + this._data = unicodeConverter.ConvertFromUnicode(data); + this.length = this._data.length; + } +}); + +JSONPacket.prototype.read = function (stream, scriptableStream) { + dumpv("Reading JSON packet"); + + // Read in more packet data. + this._readData(stream, scriptableStream); + + if (!this.done) { + // Don't have a complete packet yet. + return; + } + + let json = this._data; + try { + json = unicodeConverter.ConvertToUnicode(json); + this._object = JSON.parse(json); + } catch (e) { + let msg = "Error parsing incoming packet: " + json + " (" + e + + " - " + e.stack + ")"; + console.error(msg); + dumpn(msg); + return; + } + + this._transport._onJSONObjectReady(this._object); +}; + +JSONPacket.prototype._readData = function (stream, scriptableStream) { + if (flags.wantVerbose) { + dumpv("Reading JSON data: _l: " + this.length + " dL: " + + this._data.length + " sA: " + stream.available()); + } + let bytesToRead = Math.min(this.length - this._data.length, + stream.available()); + this._data += scriptableStream.readBytes(bytesToRead); + this._done = this._data.length === this.length; +}; + +JSONPacket.prototype.write = function (stream) { + dumpv("Writing JSON packet"); + + if (this._outgoing === undefined) { + // Format the serialized packet to a buffer + this._outgoing = this.length + ":" + this._data; + } + + let written = stream.write(this._outgoing, this._outgoing.length); + this._outgoing = this._outgoing.slice(written); + this._done = !this._outgoing.length; +}; + +Object.defineProperty(JSONPacket.prototype, "done", { + get: function () { return this._done; } +}); + +JSONPacket.prototype.toString = function () { + return JSON.stringify(this._object, null, 2); +}; + +exports.JSONPacket = JSONPacket; + +/** + * With a bulk packet, data is transferred by temporarily handing over the + * transport's input or output stream to the application layer for writing data + * directly. This can be much faster for large data sets, and avoids various + * stages of copies and data duplication inherent in the JSON packet type. The + * bulk packet looks like: + * + * bulk [actor] [type] [length]:[data] + * + * The interpretation of the data portion depends on the kind of actor and the + * packet's type. See the Remote Debugging Protocol Stream Transport spec for + * more details. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + */ +function BulkPacket(transport) { + Packet.call(this, transport); + this._done = false; + this._readyForWriting = defer(); +} + +/** + * Attempt to initialize a new BulkPacket based on the incoming packet header + * we've received so far. + * @param header string + * The packet header string to attempt parsing. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @return BulkPacket + * The parsed packet, or null if it's not a match. + */ +BulkPacket.fromHeader = function (header, transport) { + let match = this.HEADER_PATTERN.exec(header); + + if (!match) { + return null; + } + + dumpv("Header matches bulk packet"); + let packet = new BulkPacket(transport); + packet.header = { + actor: match[1], + type: match[2], + length: +match[3] + }; + return packet; +}; + +BulkPacket.HEADER_PATTERN = /^bulk ([^: ]+) ([^: ]+) (\d+):$/; + +BulkPacket.prototype = Object.create(Packet.prototype); + +BulkPacket.prototype.read = function (stream) { + dumpv("Reading bulk packet, handing off input stream"); + + // Temporarily pause monitoring of the input stream + this._transport.pauseIncoming(); + + let deferred = defer(); + + this._transport._onBulkReadReady({ + actor: this.actor, + type: this.type, + length: this.length, + copyTo: (output) => { + dumpv("CT length: " + this.length); + let copying = StreamUtils.copyStream(stream, output, this.length); + deferred.resolve(copying); + return copying; + }, + stream: stream, + done: deferred + }); + + // Await the result of reading from the stream + deferred.promise.then(() => { + dumpv("onReadDone called, ending bulk mode"); + this._done = true; + this._transport.resumeIncoming(); + }, this._transport.close); + + // Ensure this is only done once + this.read = () => { + throw new Error("Tried to read() a BulkPacket's stream multiple times."); + }; +}; + +BulkPacket.prototype.write = function (stream) { + dumpv("Writing bulk packet"); + + if (this._outgoingHeader === undefined) { + dumpv("Serializing bulk packet header"); + // Format the serialized packet header to a buffer + this._outgoingHeader = "bulk " + this.actor + " " + this.type + " " + + this.length + ":"; + } + + // Write the header, or whatever's left of it to write. + if (this._outgoingHeader.length) { + dumpv("Writing bulk packet header"); + let written = stream.write(this._outgoingHeader, + this._outgoingHeader.length); + this._outgoingHeader = this._outgoingHeader.slice(written); + return; + } + + dumpv("Handing off output stream"); + + // Temporarily pause the monitoring of the output stream + this._transport.pauseOutgoing(); + + let deferred = defer(); + + this._readyForWriting.resolve({ + copyFrom: (input) => { + dumpv("CF length: " + this.length); + let copying = StreamUtils.copyStream(input, stream, this.length); + deferred.resolve(copying); + return copying; + }, + stream: stream, + done: deferred + }); + + // Await the result of writing to the stream + deferred.promise.then(() => { + dumpv("onWriteDone called, ending bulk mode"); + this._done = true; + this._transport.resumeOutgoing(); + }, this._transport.close); + + // Ensure this is only done once + this.write = () => { + throw new Error("Tried to write() a BulkPacket's stream multiple times."); + }; +}; + +Object.defineProperty(BulkPacket.prototype, "streamReadyForWriting", { + get: function () { + return this._readyForWriting.promise; + } +}); + +Object.defineProperty(BulkPacket.prototype, "header", { + get: function () { + return { + actor: this.actor, + type: this.type, + length: this.length + }; + }, + + set: function (header) { + this.actor = header.actor; + this.type = header.type; + this.length = header.length; + }, +}); + +Object.defineProperty(BulkPacket.prototype, "done", { + get: function () { return this._done; }, +}); + + +BulkPacket.prototype.toString = function () { + return "Bulk: " + JSON.stringify(this.header, null, 2); +}; + +exports.BulkPacket = BulkPacket; + +/** + * RawPacket is used to test the transport's error handling of malformed + * packets, by writing data directly onto the stream. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @param data string + * The raw string to send out onto the stream. + */ +function RawPacket(transport, data) { + Packet.call(this, transport); + this._data = data; + this.length = data.length; + this._done = false; +} + +RawPacket.prototype = Object.create(Packet.prototype); + +RawPacket.prototype.read = function (stream) { + // This hasn't yet been needed for testing. + throw Error("Not implmented."); +}; + +RawPacket.prototype.write = function (stream) { + let written = stream.write(this._data, this._data.length); + this._data = this._data.slice(written); + this._done = !this._data.length; +}; + +Object.defineProperty(RawPacket.prototype, "done", { + get: function () { return this._done; } +}); + +exports.RawPacket = RawPacket; diff --git a/devtools/shared/transport/stream-utils.js b/devtools/shared/transport/stream-utils.js new file mode 100644 index 000000000..23f67a518 --- /dev/null +++ b/devtools/shared/transport/stream-utils.js @@ -0,0 +1,249 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const { Ci, Cc, Cu, Cr, CC } = require("chrome"); +const Services = require("Services"); +const DevToolsUtils = require("devtools/shared/DevToolsUtils"); +const { dumpv } = DevToolsUtils; +const EventEmitter = require("devtools/shared/event-emitter"); +const promise = require("promise"); +const defer = require("devtools/shared/defer"); + +DevToolsUtils.defineLazyGetter(this, "IOUtil", () => { + return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil); +}); + +DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => { + return CC("@mozilla.org/scriptableinputstream;1", + "nsIScriptableInputStream", "init"); +}); + +const BUFFER_SIZE = 0x8000; + +/** + * This helper function (and its companion object) are used by bulk senders and + * receivers to read and write data in and out of other streams. Functions that + * make use of this tool are passed to callers when it is time to read or write + * bulk data. It is highly recommended to use these copier functions instead of + * the stream directly because the copier enforces the agreed upon length. + * Since bulk mode reuses an existing stream, the sender and receiver must write + * and read exactly the agreed upon amount of data, or else the entire transport + * will be left in a invalid state. Additionally, other methods of stream + * copying (such as NetUtil.asyncCopy) close the streams involved, which would + * terminate the debugging transport, and so it is avoided here. + * + * Overall, this *works*, but clearly the optimal solution would be able to just + * use the streams directly. If it were possible to fully implement + * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to + * enforce the length and avoid closing, and consumers could use familiar stream + * utilities like NetUtil.asyncCopy. + * + * The function takes two async streams and copies a precise number of bytes + * from one to the other. Copying begins immediately, but may complete at some + * future time depending on data size. Use the returned promise to know when + * it's complete. + * + * @param input nsIAsyncInputStream + * The stream to copy from. + * @param output nsIAsyncOutputStream + * The stream to copy to. + * @param length Integer + * The amount of data that needs to be copied. + * @return Promise + * The promise is resolved when copying completes or rejected if any + * (unexpected) errors occur. + */ +function copyStream(input, output, length) { + let copier = new StreamCopier(input, output, length); + return copier.copy(); +} + +function StreamCopier(input, output, length) { + EventEmitter.decorate(this); + this._id = StreamCopier._nextId++; + this.input = input; + // Save off the base output stream, since we know it's async as we've required + this.baseAsyncOutput = output; + if (IOUtil.outputStreamIsBuffered(output)) { + this.output = output; + } else { + this.output = Cc["@mozilla.org/network/buffered-output-stream;1"]. + createInstance(Ci.nsIBufferedOutputStream); + this.output.init(output, BUFFER_SIZE); + } + this._length = length; + this._amountLeft = length; + this._deferred = defer(); + + this._copy = this._copy.bind(this); + this._flush = this._flush.bind(this); + this._destroy = this._destroy.bind(this); + + // Copy promise's then method up to this object. + // Allows the copier to offer a promise interface for the simple succeed or + // fail scenarios, but also emit events (due to the EventEmitter) for other + // states, like progress. + this.then = this._deferred.promise.then.bind(this._deferred.promise); + this.then(this._destroy, this._destroy); + + // Stream ready callback starts as |_copy|, but may switch to |_flush| at end + // if flushing would block the output stream. + this._streamReadyCallback = this._copy; +} +StreamCopier._nextId = 0; + +StreamCopier.prototype = { + + copy: function () { + // Dispatch to the next tick so that it's possible to attach a progress + // event listener, even for extremely fast copies (like when testing). + Services.tm.currentThread.dispatch(() => { + try { + this._copy(); + } catch (e) { + this._deferred.reject(e); + } + }, 0); + return this; + }, + + _copy: function () { + let bytesAvailable = this.input.available(); + let amountToCopy = Math.min(bytesAvailable, this._amountLeft); + this._debug("Trying to copy: " + amountToCopy); + + let bytesCopied; + try { + bytesCopied = this.output.writeFrom(this.input, amountToCopy); + } catch (e) { + if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this._debug("Base stream would block, will retry"); + this._debug("Waiting for output stream"); + this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); + return; + } else { + throw e; + } + } + + this._amountLeft -= bytesCopied; + this._debug("Copied: " + bytesCopied + + ", Left: " + this._amountLeft); + this._emitProgress(); + + if (this._amountLeft === 0) { + this._debug("Copy done!"); + this._flush(); + return; + } + + this._debug("Waiting for input stream"); + this.input.asyncWait(this, 0, 0, Services.tm.currentThread); + }, + + _emitProgress: function () { + this.emit("progress", { + bytesSent: this._length - this._amountLeft, + totalBytes: this._length + }); + }, + + _flush: function () { + try { + this.output.flush(); + } catch (e) { + if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK || + e.result == Cr.NS_ERROR_FAILURE) { + this._debug("Flush would block, will retry"); + this._streamReadyCallback = this._flush; + this._debug("Waiting for output stream"); + this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); + return; + } else { + throw e; + } + } + this._deferred.resolve(); + }, + + _destroy: function () { + this._destroy = null; + this._copy = null; + this._flush = null; + this.input = null; + this.output = null; + }, + + // nsIInputStreamCallback + onInputStreamReady: function () { + this._streamReadyCallback(); + }, + + // nsIOutputStreamCallback + onOutputStreamReady: function () { + this._streamReadyCallback(); + }, + + _debug: function (msg) { + // Prefix logs with the copier ID, which makes logs much easier to + // understand when several copiers are running simultaneously + dumpv("Copier: " + this._id + " " + msg); + } + +}; + +/** + * Read from a stream, one byte at a time, up to the next |delimiter| + * character, but stopping if we've read |count| without finding it. Reading + * also terminates early if there are less than |count| bytes available on the + * stream. In that case, we only read as many bytes as the stream currently has + * to offer. + * TODO: This implementation could be removed if bug 984651 is fixed, which + * provides a native version of the same idea. + * @param stream nsIInputStream + * The input stream to read from. + * @param delimiter string + * The character we're trying to find. + * @param count integer + * The max number of characters to read while searching. + * @return string + * The data collected. If the delimiter was found, this string will + * end with it. + */ +function delimitedRead(stream, delimiter, count) { + dumpv("Starting delimited read for " + delimiter + " up to " + + count + " bytes"); + + let scriptableStream; + if (stream instanceof Ci.nsIScriptableInputStream) { + scriptableStream = stream; + } else { + scriptableStream = new ScriptableInputStream(stream); + } + + let data = ""; + + // Don't exceed what's available on the stream + count = Math.min(count, stream.available()); + + if (count <= 0) { + return data; + } + + let char; + while (char !== delimiter && count > 0) { + char = scriptableStream.readBytes(1); + count--; + data += char; + } + + return data; +} + +module.exports = { + copyStream: copyStream, + delimitedRead: delimitedRead +}; diff --git a/devtools/shared/transport/tests/unit/.eslintrc.js b/devtools/shared/transport/tests/unit/.eslintrc.js new file mode 100644 index 000000000..59adf410a --- /dev/null +++ b/devtools/shared/transport/tests/unit/.eslintrc.js @@ -0,0 +1,6 @@ +"use strict"; + +module.exports = { + // Extend from the common devtools xpcshell eslintrc config. + "extends": "../../../../.eslintrc.xpcshell.js" +}; diff --git a/devtools/shared/transport/tests/unit/head_dbg.js b/devtools/shared/transport/tests/unit/head_dbg.js new file mode 100644 index 000000000..1f96ad440 --- /dev/null +++ b/devtools/shared/transport/tests/unit/head_dbg.js @@ -0,0 +1,278 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +var Cc = Components.classes; +var Ci = Components.interfaces; +var Cu = Components.utils; +var Cr = Components.results; +var CC = Components.Constructor; + +const { require } = + Cu.import("resource://devtools/shared/Loader.jsm", {}); +const { NetUtil } = require("resource://gre/modules/NetUtil.jsm"); +const promise = require("promise"); +const defer = require("devtools/shared/defer"); +const { Task } = require("devtools/shared/task"); + +const Services = require("Services"); +const DevToolsUtils = require("devtools/shared/DevToolsUtils"); + +// We do not want to log packets by default, because in some tests, +// we can be sending large amounts of data. The test harness has +// trouble dealing with logging all the data, and we end up with +// intermittent time outs (e.g. bug 775924). +// Services.prefs.setBoolPref("devtools.debugger.log", true); +// Services.prefs.setBoolPref("devtools.debugger.log.verbose", true); +// Enable remote debugging for the relevant tests. +Services.prefs.setBoolPref("devtools.debugger.remote-enabled", true); + +const { DebuggerServer } = require("devtools/server/main"); +const { DebuggerClient } = require("devtools/shared/client/main"); + +function testExceptionHook(ex) { + try { + do_report_unexpected_exception(ex); + } catch (ex) { + return {throw: ex}; + } + return undefined; +} + +// Convert an nsIScriptError 'aFlags' value into an appropriate string. +function scriptErrorFlagsToKind(aFlags) { + var kind; + if (aFlags & Ci.nsIScriptError.warningFlag) + kind = "warning"; + if (aFlags & Ci.nsIScriptError.exceptionFlag) + kind = "exception"; + else + kind = "error"; + + if (aFlags & Ci.nsIScriptError.strictFlag) + kind = "strict " + kind; + + return kind; +} + +// Register a console listener, so console messages don't just disappear +// into the ether. +var errorCount = 0; +var listener = { + observe: function (aMessage) { + errorCount++; + try { + // If we've been given an nsIScriptError, then we can print out + // something nicely formatted, for tools like Emacs to pick up. + var scriptError = aMessage.QueryInterface(Ci.nsIScriptError); + dump(aMessage.sourceName + ":" + aMessage.lineNumber + ": " + + scriptErrorFlagsToKind(aMessage.flags) + ": " + + aMessage.errorMessage + "\n"); + var string = aMessage.errorMessage; + } catch (x) { + // Be a little paranoid with message, as the whole goal here is to lose + // no information. + try { + var string = "" + aMessage.message; + } catch (x) { + var string = "<error converting error message to string>"; + } + } + + // Make sure we exit all nested event loops so that the test can finish. + while (DebuggerServer.xpcInspector.eventLoopNestLevel > 0) { + DebuggerServer.xpcInspector.exitNestedEventLoop(); + } + + // Throw in most cases, but ignore the "strict" messages + if (!(aMessage.flags & Ci.nsIScriptError.strictFlag)) { + do_throw("head_dbg.js got console message: " + string + "\n"); + } + } +}; + +var consoleService = Cc["@mozilla.org/consoleservice;1"] + .getService(Ci.nsIConsoleService); +consoleService.registerListener(listener); + +function check_except(func) { + try { + func(); + } catch (e) { + do_check_true(true); + return; + } + dump("Should have thrown an exception: " + func.toString()); + do_check_true(false); +} + +function testGlobal(aName) { + let systemPrincipal = Cc["@mozilla.org/systemprincipal;1"] + .createInstance(Ci.nsIPrincipal); + + let sandbox = Cu.Sandbox(systemPrincipal); + sandbox.__name = aName; + return sandbox; +} + +function addTestGlobal(aName) +{ + let global = testGlobal(aName); + DebuggerServer.addTestGlobal(global); + return global; +} + +// List the DebuggerClient |aClient|'s tabs, look for one whose title is +// |aTitle|, and apply |aCallback| to the packet's entry for that tab. +function getTestTab(aClient, aTitle, aCallback) { + aClient.listTabs(function (aResponse) { + for (let tab of aResponse.tabs) { + if (tab.title === aTitle) { + aCallback(tab); + return; + } + } + aCallback(null); + }); +} + +// Attach to |aClient|'s tab whose title is |aTitle|; pass |aCallback| the +// response packet and a TabClient instance referring to that tab. +function attachTestTab(aClient, aTitle, aCallback) { + getTestTab(aClient, aTitle, function (aTab) { + aClient.attachTab(aTab.actor, aCallback); + }); +} + +// Attach to |aClient|'s tab whose title is |aTitle|, and then attach to +// that tab's thread. Pass |aCallback| the thread attach response packet, a +// TabClient referring to the tab, and a ThreadClient referring to the +// thread. +function attachTestThread(aClient, aTitle, aCallback) { + attachTestTab(aClient, aTitle, function (aResponse, aTabClient) { + function onAttach(aResponse, aThreadClient) { + aCallback(aResponse, aTabClient, aThreadClient); + } + aTabClient.attachThread({ useSourceMaps: true }, onAttach); + }); +} + +// Attach to |aClient|'s tab whose title is |aTitle|, attach to the tab's +// thread, and then resume it. Pass |aCallback| the thread's response to +// the 'resume' packet, a TabClient for the tab, and a ThreadClient for the +// thread. +function attachTestTabAndResume(aClient, aTitle, aCallback) { + attachTestThread(aClient, aTitle, function (aResponse, aTabClient, aThreadClient) { + aThreadClient.resume(function (aResponse) { + aCallback(aResponse, aTabClient, aThreadClient); + }); + }); +} + +/** + * Initialize the testing debugger server. + */ +function initTestDebuggerServer() { + DebuggerServer.registerModule("devtools/server/actors/script", { + prefix: "script", + constructor: "ScriptActor", + type: { global: true, tab: true } + }); + DebuggerServer.registerModule("xpcshell-test/testactors"); + // Allow incoming connections. + DebuggerServer.init(); +} + +function finishClient(aClient) { + aClient.close().then(function () { + do_test_finished(); + }); +} + +/** + * Takes a relative file path and returns the absolute file url for it. + */ +function getFileUrl(aName, aAllowMissing = false) { + let file = do_get_file(aName, aAllowMissing); + return Services.io.newFileURI(file).spec; +} + +/** + * Returns the full path of the file with the specified name in a + * platform-independent and URL-like form. + */ +function getFilePath(aName, aAllowMissing = false) { + let file = do_get_file(aName, aAllowMissing); + let path = Services.io.newFileURI(file).spec; + let filePrePath = "file://"; + if ("nsILocalFileWin" in Ci && + file instanceof Ci.nsILocalFileWin) { + filePrePath += "/"; + } + return path.slice(filePrePath.length); +} + +/** + * Wrapper around do_get_file to prefix files with the name of current test to + * avoid collisions when running in parallel. + */ +function getTestTempFile(fileName, allowMissing) { + let thisTest = _TEST_FILE.toString().replace(/\\/g, "/"); + thisTest = thisTest.substring(thisTest.lastIndexOf("/") + 1); + thisTest = thisTest.replace(/\..*$/, ""); + return do_get_file(fileName + "-" + thisTest, allowMissing); +} + +function writeTestTempFile(aFileName, aContent) { + let file = getTestTempFile(aFileName, true); + let stream = Cc["@mozilla.org/network/file-output-stream;1"] + .createInstance(Ci.nsIFileOutputStream); + stream.init(file, -1, -1, 0); + try { + do { + let numWritten = stream.write(aContent, aContent.length); + aContent = aContent.slice(numWritten); + } while (aContent.length > 0); + } finally { + stream.close(); + } +} + +/** * Transport Factories ***/ + +var socket_transport = Task.async(function* () { + if (!DebuggerServer.listeningSockets) { + let AuthenticatorType = DebuggerServer.Authenticators.get("PROMPT"); + let authenticator = new AuthenticatorType.Server(); + authenticator.allowConnection = () => { + return DebuggerServer.AuthenticationResult.ALLOW; + }; + let listener = DebuggerServer.createListener(); + listener.portOrPath = -1; + listener.authenticator = authenticator; + yield listener.open(); + } + let port = DebuggerServer._listeners[0].port; + do_print("Debugger server port is " + port); + return DebuggerClient.socketConnect({ host: "127.0.0.1", port }); +}); + +function local_transport() { + return promise.resolve(DebuggerServer.connectPipe()); +} + +/** * Sample Data ***/ + +var gReallyLong; +function really_long() { + if (gReallyLong) { + return gReallyLong; + } + let ret = "0123456789"; + for (let i = 0; i < 18; i++) { + ret += ret; + } + gReallyLong = ret; + return ret; +} diff --git a/devtools/shared/transport/tests/unit/test_bulk_error.js b/devtools/shared/transport/tests/unit/test_bulk_error.js new file mode 100644 index 000000000..954499291 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_bulk_error.js @@ -0,0 +1,92 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +function run_test() { + initTestDebuggerServer(); + add_test_bulk_actor(); + + add_task(function* () { + yield test_string_error(socket_transport, json_reply); + yield test_string_error(local_transport, json_reply); + DebuggerServer.destroy(); + }); + + run_next_test(); +} + +/** * Sample Bulk Actor ***/ + +function TestBulkActor() {} + +TestBulkActor.prototype = { + + actorPrefix: "testBulk", + + jsonReply: function ({length, reader, reply, done}) { + do_check_eq(length, really_long().length); + + return { + allDone: true + }; + } + +}; + +TestBulkActor.prototype.requestTypes = { + "jsonReply": TestBulkActor.prototype.jsonReply +}; + +function add_test_bulk_actor() { + DebuggerServer.addGlobalActor(TestBulkActor); +} + +/** * Tests ***/ + +var test_string_error = Task.async(function* (transportFactory, onReady) { + let transport = yield transportFactory(); + + let client = new DebuggerClient(transport); + return client.connect().then(([app, traits]) => { + do_check_eq(traits.bulk, true); + return client.listTabs(); + }).then(response => { + return onReady(client, response); + }).then(() => { + client.close(); + transport.close(); + }); +}); + +/** * Reply Types ***/ + +function json_reply(client, response) { + let reallyLong = really_long(); + + let request = client.startBulkRequest({ + actor: response.testBulk, + type: "jsonReply", + length: reallyLong.length + }); + + // Send bulk data to server + let copyDeferred = defer(); + request.on("bulk-send-ready", ({writer, done}) => { + let input = Cc["@mozilla.org/io/string-input-stream;1"]. + createInstance(Ci.nsIStringInputStream); + input.setData(reallyLong, reallyLong.length); + try { + writer.copyFrom(input, () => { + input.close(); + done(); + }); + do_throw(new Error("Copying should fail, the stream is not async.")); + } catch (e) { + do_check_true(true); + copyDeferred.resolve(); + } + }); + + return copyDeferred.promise; +} diff --git a/devtools/shared/transport/tests/unit/test_client_server_bulk.js b/devtools/shared/transport/tests/unit/test_client_server_bulk.js new file mode 100644 index 000000000..e4d17d216 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_client_server_bulk.js @@ -0,0 +1,271 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +var { FileUtils } = Cu.import("resource://gre/modules/FileUtils.jsm", {}); +var Pipe = CC("@mozilla.org/pipe;1", "nsIPipe", "init"); + +function run_test() { + initTestDebuggerServer(); + add_test_bulk_actor(); + + add_task(function* () { + yield test_bulk_request_cs(socket_transport, "jsonReply", "json"); + yield test_bulk_request_cs(local_transport, "jsonReply", "json"); + yield test_bulk_request_cs(socket_transport, "bulkEcho", "bulk"); + yield test_bulk_request_cs(local_transport, "bulkEcho", "bulk"); + yield test_json_request_cs(socket_transport, "bulkReply", "bulk"); + yield test_json_request_cs(local_transport, "bulkReply", "bulk"); + DebuggerServer.destroy(); + }); + + run_next_test(); +} + +/** * Sample Bulk Actor ***/ + +function TestBulkActor(conn) { + this.conn = conn; +} + +TestBulkActor.prototype = { + + actorPrefix: "testBulk", + + bulkEcho: function ({actor, type, length, copyTo}) { + do_check_eq(length, really_long().length); + this.conn.startBulkSend({ + actor: actor, + type: type, + length: length + }).then(({copyFrom}) => { + // We'll just echo back the same thing + let pipe = new Pipe(true, true, 0, 0, null); + copyTo(pipe.outputStream).then(() => { + pipe.outputStream.close(); + }); + copyFrom(pipe.inputStream).then(() => { + pipe.inputStream.close(); + }); + }); + }, + + bulkReply: function ({to, type}) { + this.conn.startBulkSend({ + actor: to, + type: type, + length: really_long().length + }).then(({copyFrom}) => { + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true + }, input => { + copyFrom(input).then(() => { + input.close(); + }); + }); + }); + }, + + jsonReply: function ({length, copyTo}) { + do_check_eq(length, really_long().length); + + let outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + let output = FileUtils.openSafeFileOutputStream(outputFile); + + return copyTo(output).then(() => { + FileUtils.closeSafeFileOutputStream(output); + return verify_files(); + }).then(() => { + return { allDone: true }; + }, do_throw); + } + +}; + +TestBulkActor.prototype.requestTypes = { + "bulkEcho": TestBulkActor.prototype.bulkEcho, + "bulkReply": TestBulkActor.prototype.bulkReply, + "jsonReply": TestBulkActor.prototype.jsonReply +}; + +function add_test_bulk_actor() { + DebuggerServer.addGlobalActor(TestBulkActor); +} + +/** * Reply Handlers ***/ + +var replyHandlers = { + + json: function (request) { + // Receive JSON reply from server + let replyDeferred = defer(); + request.on("json-reply", (reply) => { + do_check_true(reply.allDone); + replyDeferred.resolve(); + }); + return replyDeferred.promise; + }, + + bulk: function (request) { + // Receive bulk data reply from server + let replyDeferred = defer(); + request.on("bulk-reply", ({length, copyTo}) => { + do_check_eq(length, really_long().length); + + let outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + let output = FileUtils.openSafeFileOutputStream(outputFile); + + copyTo(output).then(() => { + FileUtils.closeSafeFileOutputStream(output); + replyDeferred.resolve(verify_files()); + }); + }); + return replyDeferred.promise; + } + +}; + +/** * Tests ***/ + +var test_bulk_request_cs = Task.async(function* (transportFactory, actorType, replyType) { + // Ensure test files are not present from a failed run + cleanup_files(); + writeTestTempFile("bulk-input", really_long()); + + let clientDeferred = defer(); + let serverDeferred = defer(); + let bulkCopyDeferred = defer(); + + let transport = yield transportFactory(); + + let client = new DebuggerClient(transport); + client.connect().then(([app, traits]) => { + do_check_eq(traits.bulk, true); + client.listTabs(clientDeferred.resolve); + }); + + clientDeferred.promise.then(response => { + let request = client.startBulkRequest({ + actor: response.testBulk, + type: actorType, + length: really_long().length + }); + + // Send bulk data to server + request.on("bulk-send-ready", ({copyFrom}) => { + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true + }, input => { + copyFrom(input).then(() => { + input.close(); + bulkCopyDeferred.resolve(); + }); + }); + }); + + // Set up reply handling for this type + replyHandlers[replyType](request).then(() => { + client.close(); + transport.close(); + }); + }).then(null, do_throw); + + DebuggerServer.on("connectionchange", (event, type) => { + if (type === "closed") { + serverDeferred.resolve(); + } + }); + + return promise.all([ + clientDeferred.promise, + bulkCopyDeferred.promise, + serverDeferred.promise + ]); +}); + +var test_json_request_cs = Task.async(function* (transportFactory, actorType, replyType) { + // Ensure test files are not present from a failed run + cleanup_files(); + writeTestTempFile("bulk-input", really_long()); + + let clientDeferred = defer(); + let serverDeferred = defer(); + + let transport = yield transportFactory(); + + let client = new DebuggerClient(transport); + client.connect((app, traits) => { + do_check_eq(traits.bulk, true); + client.listTabs(clientDeferred.resolve); + }); + + clientDeferred.promise.then(response => { + let request = client.request({ + to: response.testBulk, + type: actorType + }); + + // Set up reply handling for this type + replyHandlers[replyType](request).then(() => { + client.close(); + transport.close(); + }); + }).then(null, do_throw); + + DebuggerServer.on("connectionchange", (event, type) => { + if (type === "closed") { + serverDeferred.resolve(); + } + }); + + return promise.all([ + clientDeferred.promise, + serverDeferred.promise + ]); +}); + +/** * Test Utils ***/ + +function verify_files() { + let reallyLong = really_long(); + + let inputFile = getTestTempFile("bulk-input"); + let outputFile = getTestTempFile("bulk-output"); + + do_check_eq(inputFile.fileSize, reallyLong.length); + do_check_eq(outputFile.fileSize, reallyLong.length); + + // Ensure output file contents actually match + let compareDeferred = defer(); + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-output")), + loadUsingSystemPrincipal: true + }, input => { + let outputData = NetUtil.readInputStreamToString(input, reallyLong.length); + // Avoid do_check_eq here so we don't log the contents + do_check_true(outputData === reallyLong); + input.close(); + compareDeferred.resolve(); + }); + + return compareDeferred.promise.then(cleanup_files); +} + +function cleanup_files() { + let inputFile = getTestTempFile("bulk-input", true); + if (inputFile.exists()) { + inputFile.remove(false); + } + + let outputFile = getTestTempFile("bulk-output", true); + if (outputFile.exists()) { + outputFile.remove(false); + } +} diff --git a/devtools/shared/transport/tests/unit/test_dbgsocket.js b/devtools/shared/transport/tests/unit/test_dbgsocket.js new file mode 100644 index 000000000..79111f877 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_dbgsocket.js @@ -0,0 +1,124 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +var gPort; +var gExtraListener; + +function run_test() +{ + do_print("Starting test at " + new Date().toTimeString()); + initTestDebuggerServer(); + + add_task(test_socket_conn); + add_task(test_socket_shutdown); + add_test(test_pipe_conn); + + run_next_test(); +} + +function* test_socket_conn() +{ + do_check_eq(DebuggerServer.listeningSockets, 0); + let AuthenticatorType = DebuggerServer.Authenticators.get("PROMPT"); + let authenticator = new AuthenticatorType.Server(); + authenticator.allowConnection = () => { + return DebuggerServer.AuthenticationResult.ALLOW; + }; + let listener = DebuggerServer.createListener(); + do_check_true(listener); + listener.portOrPath = -1; + listener.authenticator = authenticator; + listener.open(); + do_check_eq(DebuggerServer.listeningSockets, 1); + gPort = DebuggerServer._listeners[0].port; + do_print("Debugger server port is " + gPort); + // Open a second, separate listener + gExtraListener = DebuggerServer.createListener(); + gExtraListener.portOrPath = -1; + gExtraListener.authenticator = authenticator; + gExtraListener.open(); + do_check_eq(DebuggerServer.listeningSockets, 2); + + do_print("Starting long and unicode tests at " + new Date().toTimeString()); + let unicodeString = "(╯°□°)╯︵ ┻━┻"; + let transport = yield DebuggerClient.socketConnect({ + host: "127.0.0.1", + port: gPort + }); + + // Assert that connection settings are available on transport object + let settings = transport.connectionSettings; + do_check_eq(settings.host, "127.0.0.1"); + do_check_eq(settings.port, gPort); + + let closedDeferred = defer(); + transport.hooks = { + onPacket: function (aPacket) { + this.onPacket = function (aPacket) { + do_check_eq(aPacket.unicode, unicodeString); + transport.close(); + }; + // Verify that things work correctly when bigger than the output + // transport buffers and when transporting unicode... + transport.send({to: "root", + type: "echo", + reallylong: really_long(), + unicode: unicodeString}); + do_check_eq(aPacket.from, "root"); + }, + onClosed: function (aStatus) { + closedDeferred.resolve(); + }, + }; + transport.ready(); + return closedDeferred.promise; +} + +function* test_socket_shutdown() +{ + do_check_eq(DebuggerServer.listeningSockets, 2); + gExtraListener.close(); + do_check_eq(DebuggerServer.listeningSockets, 1); + do_check_true(DebuggerServer.closeAllListeners()); + do_check_eq(DebuggerServer.listeningSockets, 0); + // Make sure closing the listener twice does nothing. + do_check_false(DebuggerServer.closeAllListeners()); + do_check_eq(DebuggerServer.listeningSockets, 0); + + do_print("Connecting to a server socket at " + new Date().toTimeString()); + try { + let transport = yield DebuggerClient.socketConnect({ + host: "127.0.0.1", + port: gPort + }); + } catch (e) { + if (e.result == Cr.NS_ERROR_CONNECTION_REFUSED || + e.result == Cr.NS_ERROR_NET_TIMEOUT) { + // The connection should be refused here, but on slow or overloaded + // machines it may just time out. + do_check_true(true); + return; + } else { + throw e; + } + } + + // Shouldn't reach this, should never connect. + do_check_true(false); +} + +function test_pipe_conn() +{ + let transport = DebuggerServer.connectPipe(); + transport.hooks = { + onPacket: function (aPacket) { + do_check_eq(aPacket.from, "root"); + transport.close(); + }, + onClosed: function (aStatus) { + run_next_test(); + } + }; + + transport.ready(); +} diff --git a/devtools/shared/transport/tests/unit/test_dbgsocket_connection_drop.js b/devtools/shared/transport/tests/unit/test_dbgsocket_connection_drop.js new file mode 100644 index 000000000..9221939b1 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_dbgsocket_connection_drop.js @@ -0,0 +1,81 @@ +/** + * Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +/** + * Bug 755412 - checks if the server drops the connection on an improperly + * framed packet, i.e. when the length header is invalid. + */ + +const { RawPacket } = require("devtools/shared/transport/packets"); + +function run_test() { + do_print("Starting test at " + new Date().toTimeString()); + initTestDebuggerServer(); + + add_task(test_socket_conn_drops_after_invalid_header); + add_task(test_socket_conn_drops_after_invalid_header_2); + add_task(test_socket_conn_drops_after_too_large_length); + add_task(test_socket_conn_drops_after_too_long_header); + run_next_test(); +} + +function test_socket_conn_drops_after_invalid_header() { + return test_helper('fluff30:27:{"to":"root","type":"echo"}'); +} + +function test_socket_conn_drops_after_invalid_header_2() { + return test_helper('27asd:{"to":"root","type":"echo"}'); +} + +function test_socket_conn_drops_after_too_large_length() { + // Packet length is limited (semi-arbitrarily) to 1 TiB (2^40) + return test_helper("4305724038957487634549823475894325:"); +} + +function test_socket_conn_drops_after_too_long_header() { + // The packet header is currently limited to no more than 200 bytes + let rawPacket = "4305724038957487634549823475894325"; + for (let i = 0; i < 8; i++) { + rawPacket += rawPacket; + } + return test_helper(rawPacket + ":"); +} + +var test_helper = Task.async(function* (payload) { + let AuthenticatorType = DebuggerServer.Authenticators.get("PROMPT"); + let authenticator = new AuthenticatorType.Server(); + authenticator.allowConnection = () => { + return DebuggerServer.AuthenticationResult.ALLOW; + }; + + let listener = DebuggerServer.createListener(); + listener.portOrPath = -1; + listener.authenticator = authenticator; + listener.open(); + + let transport = yield DebuggerClient.socketConnect({ + host: "127.0.0.1", + port: listener.port + }); + let closedDeferred = defer(); + transport.hooks = { + onPacket: function (aPacket) { + this.onPacket = function (aPacket) { + do_throw(new Error("This connection should be dropped.")); + transport.close(); + }; + + // Inject the payload directly into the stream. + transport._outgoing.push(new RawPacket(transport, payload)); + transport._flushOutgoing(); + }, + onClosed: function (aStatus) { + do_check_true(true); + closedDeferred.resolve(); + }, + }; + transport.ready(); + return closedDeferred.promise; +}); diff --git a/devtools/shared/transport/tests/unit/test_delimited_read.js b/devtools/shared/transport/tests/unit/test_delimited_read.js new file mode 100644 index 000000000..a5a7baf7b --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_delimited_read.js @@ -0,0 +1,26 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +const StreamUtils = require("devtools/shared/transport/stream-utils"); + +const StringInputStream = CC("@mozilla.org/io/string-input-stream;1", + "nsIStringInputStream", "setData"); + +function run_test() { + add_task(function* () { + yield test_delimited_read("0123:", "0123:"); + yield test_delimited_read("0123:4567:", "0123:"); + yield test_delimited_read("012345678901:", "0123456789"); + yield test_delimited_read("0123/0123", "0123/0123"); + }); + + run_next_test(); +} + +/** * Tests ***/ + +function test_delimited_read(input, expected) { + input = new StringInputStream(input, input.length); + let result = StreamUtils.delimitedRead(input, ":", 10); + do_check_eq(result, expected); +} diff --git a/devtools/shared/transport/tests/unit/test_no_bulk.js b/devtools/shared/transport/tests/unit/test_no_bulk.js new file mode 100644 index 000000000..f621a2a52 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_no_bulk.js @@ -0,0 +1,38 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +function run_test() { + DebuggerServer.registerModule("xpcshell-test/testactors-no-bulk"); + // Allow incoming connections. + DebuggerServer.init(); + + add_task(function* () { + yield test_bulk_send_error(socket_transport); + yield test_bulk_send_error(local_transport); + DebuggerServer.destroy(); + }); + + run_next_test(); +} + +/** * Tests ***/ + +var test_bulk_send_error = Task.async(function* (transportFactory) { + let deferred = defer(); + let transport = yield transportFactory(); + + let client = new DebuggerClient(transport); + return client.connect().then(([app, traits]) => { + do_check_false(traits.bulk); + + try { + client.startBulkRequest(); + do_throw(new Error("Can't use bulk since server doesn't support it")); + } catch (e) { + do_check_true(true); + } + + }); +}); diff --git a/devtools/shared/transport/tests/unit/test_packet.js b/devtools/shared/transport/tests/unit/test_packet.js new file mode 100644 index 000000000..7e1896555 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_packet.js @@ -0,0 +1,21 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +const { JSONPacket, BulkPacket } = + require("devtools/shared/transport/packets"); + +function run_test() { + add_test(test_packet_done); + run_next_test(); +} + +// Ensure done can be checked without getting an error +function test_packet_done() { + let json = new JSONPacket(); + do_check_false(!!json.done); + + let bulk = new BulkPacket(); + do_check_false(!!bulk.done); + + run_next_test(); +} diff --git a/devtools/shared/transport/tests/unit/test_queue.js b/devtools/shared/transport/tests/unit/test_queue.js new file mode 100644 index 000000000..5de14baee --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_queue.js @@ -0,0 +1,177 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +/** + * This test verifies that the transport's queue operates correctly when various + * packets are scheduled simultaneously. + */ + +var { FileUtils } = Cu.import("resource://gre/modules/FileUtils.jsm", {}); + +function run_test() { + initTestDebuggerServer(); + + add_task(function* () { + yield test_transport(socket_transport); + yield test_transport(local_transport); + DebuggerServer.destroy(); + }); + + run_next_test(); +} + +/** * Tests ***/ + +var test_transport = Task.async(function* (transportFactory) { + let clientDeferred = defer(); + let serverDeferred = defer(); + + // Ensure test files are not present from a failed run + cleanup_files(); + let reallyLong = really_long(); + writeTestTempFile("bulk-input", reallyLong); + + do_check_eq(Object.keys(DebuggerServer._connections).length, 0); + + let transport = yield transportFactory(); + + // Sending from client to server + function write_data({copyFrom}) { + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true + }, function (input, status) { + copyFrom(input).then(() => { + input.close(); + }); + }); + } + + // Receiving on server from client + function on_bulk_packet({actor, type, length, copyTo}) { + do_check_eq(actor, "root"); + do_check_eq(type, "file-stream"); + do_check_eq(length, reallyLong.length); + + let outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + let output = FileUtils.openSafeFileOutputStream(outputFile); + + copyTo(output).then(() => { + FileUtils.closeSafeFileOutputStream(output); + return verify(); + }).then(() => { + // It's now safe to close + transport.hooks.onClosed = () => { + clientDeferred.resolve(); + }; + transport.close(); + }); + } + + // Client + + function send_packets() { + // Specifically, we want to ensure that multiple send()s proceed without + // causing the transport to die. + transport.send({ + actor: "root", + type: "explode" + }); + + transport.startBulkSend({ + actor: "root", + type: "file-stream", + length: reallyLong.length + }).then(write_data); + } + + transport.hooks = { + onPacket: function (packet) { + if (packet.error) { + transport.hooks.onError(packet); + } else if (packet.applicationType) { + transport.hooks.onServerHello(packet); + } else { + do_throw("Unexpected server reply"); + } + }, + + onServerHello: function (packet) { + // We've received the initial start up packet + do_check_eq(packet.from, "root"); + do_check_eq(packet.applicationType, "xpcshell-tests"); + + // Server + do_check_eq(Object.keys(DebuggerServer._connections).length, 1); + do_print(Object.keys(DebuggerServer._connections)); + for (let connId in DebuggerServer._connections) { + DebuggerServer._connections[connId].onBulkPacket = on_bulk_packet; + } + + DebuggerServer.on("connectionchange", (event, type) => { + if (type === "closed") { + serverDeferred.resolve(); + } + }); + + send_packets(); + }, + + onError: function (packet) { + // The explode actor doesn't exist + do_check_eq(packet.from, "root"); + do_check_eq(packet.error, "noSuchActor"); + }, + + onClosed: function () { + do_throw("Transport closed before we expected"); + } + }; + + transport.ready(); + + return promise.all([clientDeferred.promise, serverDeferred.promise]); +}); + +/** * Test Utils ***/ + +function verify() { + let reallyLong = really_long(); + + let inputFile = getTestTempFile("bulk-input"); + let outputFile = getTestTempFile("bulk-output"); + + do_check_eq(inputFile.fileSize, reallyLong.length); + do_check_eq(outputFile.fileSize, reallyLong.length); + + // Ensure output file contents actually match + let compareDeferred = defer(); + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-output")), + loadUsingSystemPrincipal: true + }, input => { + let outputData = NetUtil.readInputStreamToString(input, reallyLong.length); + // Avoid do_check_eq here so we don't log the contents + do_check_true(outputData === reallyLong); + input.close(); + compareDeferred.resolve(); + }); + + return compareDeferred.promise.then(cleanup_files); +} + +function cleanup_files() { + let inputFile = getTestTempFile("bulk-input", true); + if (inputFile.exists()) { + inputFile.remove(false); + } + + let outputFile = getTestTempFile("bulk-output", true); + if (outputFile.exists()) { + outputFile.remove(false); + } +} diff --git a/devtools/shared/transport/tests/unit/test_transport_bulk.js b/devtools/shared/transport/tests/unit/test_transport_bulk.js new file mode 100644 index 000000000..a21216acf --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_transport_bulk.js @@ -0,0 +1,148 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +var { FileUtils } = Cu.import("resource://gre/modules/FileUtils.jsm", {}); + +function run_test() { + initTestDebuggerServer(); + + add_task(function* () { + yield test_bulk_transfer_transport(socket_transport); + yield test_bulk_transfer_transport(local_transport); + DebuggerServer.destroy(); + }); + + run_next_test(); +} + +/** * Tests ***/ + +/** + * This tests a one-way bulk transfer at the transport layer. + */ +var test_bulk_transfer_transport = Task.async(function* (transportFactory) { + do_print("Starting bulk transfer test at " + new Date().toTimeString()); + + let clientDeferred = defer(); + let serverDeferred = defer(); + + // Ensure test files are not present from a failed run + cleanup_files(); + let reallyLong = really_long(); + writeTestTempFile("bulk-input", reallyLong); + + do_check_eq(Object.keys(DebuggerServer._connections).length, 0); + + let transport = yield transportFactory(); + + // Sending from client to server + function write_data({copyFrom}) { + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true + }, function (input, status) { + copyFrom(input).then(() => { + input.close(); + }); + }); + } + + // Receiving on server from client + function on_bulk_packet({actor, type, length, copyTo}) { + do_check_eq(actor, "root"); + do_check_eq(type, "file-stream"); + do_check_eq(length, reallyLong.length); + + let outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + let output = FileUtils.openSafeFileOutputStream(outputFile); + + copyTo(output).then(() => { + FileUtils.closeSafeFileOutputStream(output); + return verify(); + }).then(() => { + // It's now safe to close + transport.hooks.onClosed = () => { + clientDeferred.resolve(); + }; + transport.close(); + }); + } + + // Client + transport.hooks = { + onPacket: function (aPacket) { + // We've received the initial start up packet + do_check_eq(aPacket.from, "root"); + + // Server + do_check_eq(Object.keys(DebuggerServer._connections).length, 1); + do_print(Object.keys(DebuggerServer._connections)); + for (let connId in DebuggerServer._connections) { + DebuggerServer._connections[connId].onBulkPacket = on_bulk_packet; + } + + DebuggerServer.on("connectionchange", (event, type) => { + if (type === "closed") { + serverDeferred.resolve(); + } + }); + + transport.startBulkSend({ + actor: "root", + type: "file-stream", + length: reallyLong.length + }).then(write_data); + }, + + onClosed: function () { + do_throw("Transport closed before we expected"); + } + }; + + transport.ready(); + + return promise.all([clientDeferred.promise, serverDeferred.promise]); +}); + +/** * Test Utils ***/ + +function verify() { + let reallyLong = really_long(); + + let inputFile = getTestTempFile("bulk-input"); + let outputFile = getTestTempFile("bulk-output"); + + do_check_eq(inputFile.fileSize, reallyLong.length); + do_check_eq(outputFile.fileSize, reallyLong.length); + + // Ensure output file contents actually match + let compareDeferred = defer(); + NetUtil.asyncFetch({ + uri: NetUtil.newURI(getTestTempFile("bulk-output")), + loadUsingSystemPrincipal: true + }, input => { + let outputData = NetUtil.readInputStreamToString(input, reallyLong.length); + // Avoid do_check_eq here so we don't log the contents + do_check_true(outputData === reallyLong); + input.close(); + compareDeferred.resolve(); + }); + + return compareDeferred.promise.then(cleanup_files); +} + +function cleanup_files() { + let inputFile = getTestTempFile("bulk-input", true); + if (inputFile.exists()) { + inputFile.remove(false); + } + + let outputFile = getTestTempFile("bulk-output", true); + if (outputFile.exists()) { + outputFile.remove(false); + } +} diff --git a/devtools/shared/transport/tests/unit/test_transport_events.js b/devtools/shared/transport/tests/unit/test_transport_events.js new file mode 100644 index 000000000..ae20f6cf8 --- /dev/null +++ b/devtools/shared/transport/tests/unit/test_transport_events.js @@ -0,0 +1,75 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +function run_test() { + initTestDebuggerServer(); + + add_task(function* () { + yield test_transport_events("socket", socket_transport); + yield test_transport_events("local", local_transport); + DebuggerServer.destroy(); + }); + + run_next_test(); +} + +function* test_transport_events(name, transportFactory) { + do_print(`Started testing of transport: ${name}`); + + do_check_eq(Object.keys(DebuggerServer._connections).length, 0); + + let transport = yield transportFactory(); + + // Transport expects the hooks to be not null + transport.hooks = { + onPacket: () => {}, + onClosed: () => {}, + }; + + let rootReceived = transport.once("packet", (event, packet) => { + do_print(`Packet event: ${event} ${JSON.stringify(packet)}`); + do_check_eq(event, "packet"); + do_check_eq(packet.from, "root"); + }); + + transport.ready(); + yield rootReceived; + + let echoSent = transport.once("send", (event, packet) => { + do_print(`Send event: ${event} ${JSON.stringify(packet)}`); + do_check_eq(event, "send"); + do_check_eq(packet.to, "root"); + do_check_eq(packet.type, "echo"); + }); + + let echoReceived = transport.once("packet", (event, packet) => { + do_print(`Packet event: ${event} ${JSON.stringify(packet)}`); + do_check_eq(event, "packet"); + do_check_eq(packet.from, "root"); + do_check_eq(packet.type, "echo"); + }); + + transport.send({ to: "root", type: "echo" }); + yield echoSent; + yield echoReceived; + + let clientClosed = transport.once("close", (event) => { + do_print(`Close event: ${event}`); + do_check_eq(event, "close"); + }); + + let serverClosed = DebuggerServer.once("connectionchange", (event, type) => { + do_print(`Server closed`); + do_check_eq(event, "connectionchange"); + do_check_eq(type, "closed"); + }); + + transport.close(); + + yield clientClosed; + yield serverClosed; + + do_print(`Finished testing of transport: ${name}`); +} diff --git a/devtools/shared/transport/tests/unit/testactors-no-bulk.js b/devtools/shared/transport/tests/unit/testactors-no-bulk.js new file mode 100644 index 000000000..d2c8193fe --- /dev/null +++ b/devtools/shared/transport/tests/unit/testactors-no-bulk.js @@ -0,0 +1,27 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +const { RootActor } = require("devtools/server/actors/root"); +const { DebuggerServer } = require("devtools/server/main"); + +/** + * Root actor that doesn't have the bulk trait. + */ +function createRootActor(aConnection) { + let root = new RootActor(aConnection, { + globalActorFactories: DebuggerServer.globalActorFactories + }); + root.applicationType = "xpcshell-tests"; + root.traits = { + bulk: false + }; + return root; +} + +exports.register = function (handle) { + handle.setRootActor(createRootActor); +}; + +exports.unregister = function (handle) { + handle.setRootActor(null); +}; diff --git a/devtools/shared/transport/tests/unit/testactors.js b/devtools/shared/transport/tests/unit/testactors.js new file mode 100644 index 000000000..80d5d4e18 --- /dev/null +++ b/devtools/shared/transport/tests/unit/testactors.js @@ -0,0 +1,131 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +const { ActorPool, appendExtraActors, createExtraActors } = + require("devtools/server/actors/common"); +const { RootActor } = require("devtools/server/actors/root"); +const { ThreadActor } = require("devtools/server/actors/script"); +const { DebuggerServer } = require("devtools/server/main"); +const promise = require("promise"); + +var gTestGlobals = []; +DebuggerServer.addTestGlobal = function (aGlobal) { + gTestGlobals.push(aGlobal); +}; + +// A mock tab list, for use by tests. This simply presents each global in +// gTestGlobals as a tab, and the list is fixed: it never calls its +// onListChanged handler. +// +// As implemented now, we consult gTestGlobals when we're constructed, not +// when we're iterated over, so tests have to add their globals before the +// root actor is created. +function TestTabList(aConnection) { + this.conn = aConnection; + + // An array of actors for each global added with + // DebuggerServer.addTestGlobal. + this._tabActors = []; + + // A pool mapping those actors' names to the actors. + this._tabActorPool = new ActorPool(aConnection); + + for (let global of gTestGlobals) { + let actor = new TestTabActor(aConnection, global); + actor.selected = false; + this._tabActors.push(actor); + this._tabActorPool.addActor(actor); + } + if (this._tabActors.length > 0) { + this._tabActors[0].selected = true; + } + + aConnection.addActorPool(this._tabActorPool); +} + +TestTabList.prototype = { + constructor: TestTabList, + getList: function () { + return promise.resolve([...this._tabActors]); + } +}; + +function createRootActor(aConnection) { + let root = new RootActor(aConnection, { + tabList: new TestTabList(aConnection), + globalActorFactories: DebuggerServer.globalActorFactories + }); + root.applicationType = "xpcshell-tests"; + return root; +} + +function TestTabActor(aConnection, aGlobal) { + this.conn = aConnection; + this._global = aGlobal; + this._threadActor = new ThreadActor(this, this._global); + this.conn.addActor(this._threadActor); + this._attached = false; + this._extraActors = {}; +} + +TestTabActor.prototype = { + constructor: TestTabActor, + actorPrefix: "TestTabActor", + + get window() { + return { wrappedJSObject: this._global }; + }, + + get url() { + return this._global.__name; + }, + + form: function () { + let response = { actor: this.actorID, title: this._global.__name }; + + // Walk over tab actors added by extensions and add them to a new ActorPool. + let actorPool = new ActorPool(this.conn); + this._createExtraActors(DebuggerServer.tabActorFactories, actorPool); + if (!actorPool.isEmpty()) { + this._tabActorPool = actorPool; + this.conn.addActorPool(this._tabActorPool); + } + + this._appendExtraActors(response); + + return response; + }, + + onAttach: function (aRequest) { + this._attached = true; + + let response = { type: "tabAttached", threadActor: this._threadActor.actorID }; + this._appendExtraActors(response); + + return response; + }, + + onDetach: function (aRequest) { + if (!this._attached) { + return { "error":"wrongState" }; + } + return { type: "detached" }; + }, + + /* Support for DebuggerServer.addTabActor. */ + _createExtraActors: createExtraActors, + _appendExtraActors: appendExtraActors +}; + +TestTabActor.prototype.requestTypes = { + "attach": TestTabActor.prototype.onAttach, + "detach": TestTabActor.prototype.onDetach +}; + +exports.register = function (handle) { + handle.setRootActor(createRootActor); +}; + +exports.unregister = function (handle) { + handle.setRootActor(null); +}; diff --git a/devtools/shared/transport/tests/unit/xpcshell.ini b/devtools/shared/transport/tests/unit/xpcshell.ini new file mode 100644 index 000000000..b79906e82 --- /dev/null +++ b/devtools/shared/transport/tests/unit/xpcshell.ini @@ -0,0 +1,21 @@ +[DEFAULT] +tags = devtools +head = head_dbg.js +tail = +firefox-appdir = browser +skip-if = toolkit == 'android' + +support-files = + testactors.js + testactors-no-bulk.js + +[test_bulk_error.js] +[test_client_server_bulk.js] +[test_dbgsocket.js] +[test_dbgsocket_connection_drop.js] +[test_delimited_read.js] +[test_no_bulk.js] +[test_packet.js] +[test_queue.js] +[test_transport_bulk.js] +[test_transport_events.js] diff --git a/devtools/shared/transport/transport.js b/devtools/shared/transport/transport.js new file mode 100644 index 000000000..e83bf8de6 --- /dev/null +++ b/devtools/shared/transport/transport.js @@ -0,0 +1,908 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +/* global Pipe, ScriptableInputStream, uneval */ + +// TODO: Get rid of this code once the marionette server loads transport.js as +// an SDK module (see bug 1000814) +(function (factory) { + if (this.module && module.id.indexOf("transport") >= 0) { + // require + factory.call(this, require, exports); + } else if (this.require) { + // loadSubScript + factory.call(this, require, this); + } else { + // Cu.import + const Cu = Components.utils; + const { require } = Cu.import("resource://devtools/shared/Loader.jsm", {}); + factory.call(this, require, this); + } +}).call(this, function (require, exports) { + const { Cc, Cr, CC } = require("chrome"); + const DevToolsUtils = require("devtools/shared/DevToolsUtils"); + const { dumpn, dumpv } = DevToolsUtils; + const flags = require("devtools/shared/flags"); + const StreamUtils = require("devtools/shared/transport/stream-utils"); + const { Packet, JSONPacket, BulkPacket } = + require("devtools/shared/transport/packets"); + const promise = require("promise"); + const defer = require("devtools/shared/defer"); + const EventEmitter = require("devtools/shared/event-emitter"); + + DevToolsUtils.defineLazyGetter(this, "Pipe", () => { + return CC("@mozilla.org/pipe;1", "nsIPipe", "init"); + }); + + DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => { + return CC("@mozilla.org/scriptableinputstream;1", + "nsIScriptableInputStream", "init"); + }); + + const PACKET_HEADER_MAX = 200; + + /** + * An adapter that handles data transfers between the debugger client and + * server. It can work with both nsIPipe and nsIServerSocket transports so + * long as the properly created input and output streams are specified. + * (However, for intra-process connections, LocalDebuggerTransport, below, + * is more efficient than using an nsIPipe pair with DebuggerTransport.) + * + * @param input nsIAsyncInputStream + * The input stream. + * @param output nsIAsyncOutputStream + * The output stream. + * + * Given a DebuggerTransport instance dt: + * 1) Set dt.hooks to a packet handler object (described below). + * 2) Call dt.ready() to begin watching for input packets. + * 3) Call dt.send() / dt.startBulkSend() to send packets. + * 4) Call dt.close() to close the connection, and disengage from the event + * loop. + * + * A packet handler is an object with the following methods: + * + * - onPacket(packet) - called when we have received a complete packet. + * |packet| is the parsed form of the packet --- a JavaScript value, not + * a JSON-syntax string. + * + * - onBulkPacket(packet) - called when we have switched to bulk packet + * receiving mode. |packet| is an object containing: + * * actor: Name of actor that will receive the packet + * * type: Name of actor's method that should be called on receipt + * * length: Size of the data to be read + * * stream: This input stream should only be used directly if you can ensure + * that you will read exactly |length| bytes and will not close the + * stream when reading is complete + * * done: If you use the stream directly (instead of |copyTo| below), you + * must signal completion by resolving / rejecting this deferred. + * If it's rejected, the transport will be closed. If an Error is + * supplied as a rejection value, it will be logged via |dumpn|. + * If you do use |copyTo|, resolving is taken care of for you when + * copying completes. + * * copyTo: A helper function for getting your data out of the stream that + * meets the stream handling requirements above, and has the + * following signature: + * @param output nsIAsyncOutputStream + * The stream to copy to. + * @return Promise + * The promise is resolved when copying completes or rejected if any + * (unexpected) errors occur. + * This object also emits "progress" events for each chunk that is + * copied. See stream-utils.js. + * + * - onClosed(reason) - called when the connection is closed. |reason| is + * an optional nsresult or object, typically passed when the transport is + * closed due to some error in a underlying stream. + * + * See ./packets.js and the Remote Debugging Protocol specification for more + * details on the format of these packets. + */ + function DebuggerTransport(input, output) { + EventEmitter.decorate(this); + + this._input = input; + this._scriptableInput = new ScriptableInputStream(input); + this._output = output; + + // The current incoming (possibly partial) header, which will determine which + // type of Packet |_incoming| below will become. + this._incomingHeader = ""; + // The current incoming Packet object + this._incoming = null; + // A queue of outgoing Packet objects + this._outgoing = []; + + this.hooks = null; + this.active = false; + + this._incomingEnabled = true; + this._outgoingEnabled = true; + + this.close = this.close.bind(this); + } + + DebuggerTransport.prototype = { + /** + * Transmit an object as a JSON packet. + * + * This method returns immediately, without waiting for the entire + * packet to be transmitted, registering event handlers as needed to + * transmit the entire packet. Packets are transmitted in the order + * they are passed to this method. + */ + send: function (object) { + this.emit("send", object); + + let packet = new JSONPacket(this); + packet.object = object; + this._outgoing.push(packet); + this._flushOutgoing(); + }, + + /** + * Transmit streaming data via a bulk packet. + * + * This method initiates the bulk send process by queuing up the header data. + * The caller receives eventual access to a stream for writing. + * + * N.B.: Do *not* attempt to close the stream handed to you, as it will + * continue to be used by this transport afterwards. Most users should + * instead use the provided |copyFrom| function instead. + * + * @param header Object + * This is modeled after the format of JSON packets above, but does not + * actually contain the data, but is instead just a routing header: + * * actor: Name of actor that will receive the packet + * * type: Name of actor's method that should be called on receipt + * * length: Size of the data to be sent + * @return Promise + * The promise will be resolved when you are allowed to write to the + * stream with an object containing: + * * stream: This output stream should only be used directly if + * you can ensure that you will write exactly |length| + * bytes and will not close the stream when writing is + * complete + * * done: If you use the stream directly (instead of |copyFrom| + * below), you must signal completion by resolving / + * rejecting this deferred. If it's rejected, the + * transport will be closed. If an Error is supplied as + * a rejection value, it will be logged via |dumpn|. If + * you do use |copyFrom|, resolving is taken care of for + * you when copying completes. + * * copyFrom: A helper function for getting your data onto the + * stream that meets the stream handling requirements + * above, and has the following signature: + * @param input nsIAsyncInputStream + * The stream to copy from. + * @return Promise + * The promise is resolved when copying completes or + * rejected if any (unexpected) errors occur. + * This object also emits "progress" events for each chunk + * that is copied. See stream-utils.js. + */ + startBulkSend: function (header) { + this.emit("startbulksend", header); + + let packet = new BulkPacket(this); + packet.header = header; + this._outgoing.push(packet); + this._flushOutgoing(); + return packet.streamReadyForWriting; + }, + + /** + * Close the transport. + * @param reason nsresult / object (optional) + * The status code or error message that corresponds to the reason for + * closing the transport (likely because a stream closed or failed). + */ + close: function (reason) { + this.emit("close", reason); + + this.active = false; + this._input.close(); + this._scriptableInput.close(); + this._output.close(); + this._destroyIncoming(); + this._destroyAllOutgoing(); + if (this.hooks) { + this.hooks.onClosed(reason); + this.hooks = null; + } + if (reason) { + dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason)); + } else { + dumpn("Transport closed."); + } + }, + + /** + * The currently outgoing packet (at the top of the queue). + */ + get _currentOutgoing() { + return this._outgoing[0]; + }, + + /** + * Flush data to the outgoing stream. Waits until the output stream notifies + * us that it is ready to be written to (via onOutputStreamReady). + */ + _flushOutgoing: function () { + if (!this._outgoingEnabled || this._outgoing.length === 0) { + return; + } + + // If the top of the packet queue has nothing more to send, remove it. + if (this._currentOutgoing.done) { + this._finishCurrentOutgoing(); + } + + if (this._outgoing.length > 0) { + let threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); + this._output.asyncWait(this, 0, 0, threadManager.currentThread); + } + }, + + /** + * Pause this transport's attempts to write to the output stream. This is + * used when we've temporarily handed off our output stream for writing bulk + * data. + */ + pauseOutgoing: function () { + this._outgoingEnabled = false; + }, + + /** + * Resume this transport's attempts to write to the output stream. + */ + resumeOutgoing: function () { + this._outgoingEnabled = true; + this._flushOutgoing(); + }, + + // nsIOutputStreamCallback + /** + * This is called when the output stream is ready for more data to be written. + * The current outgoing packet will attempt to write some amount of data, but + * may not complete. + */ + onOutputStreamReady: DevToolsUtils.makeInfallible(function (stream) { + if (!this._outgoingEnabled || this._outgoing.length === 0) { + return; + } + + try { + this._currentOutgoing.write(stream); + } catch (e) { + if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this.close(e.result); + return; + } + throw e; + } + + this._flushOutgoing(); + }, "DebuggerTransport.prototype.onOutputStreamReady"), + + /** + * Remove the current outgoing packet from the queue upon completion. + */ + _finishCurrentOutgoing: function () { + if (this._currentOutgoing) { + this._currentOutgoing.destroy(); + this._outgoing.shift(); + } + }, + + /** + * Clear the entire outgoing queue. + */ + _destroyAllOutgoing: function () { + for (let packet of this._outgoing) { + packet.destroy(); + } + this._outgoing = []; + }, + + /** + * Initialize the input stream for reading. Once this method has been called, + * we watch for packets on the input stream, and pass them to the appropriate + * handlers via this.hooks. + */ + ready: function () { + this.active = true; + this._waitForIncoming(); + }, + + /** + * Asks the input stream to notify us (via onInputStreamReady) when it is + * ready for reading. + */ + _waitForIncoming: function () { + if (this._incomingEnabled) { + let threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); + this._input.asyncWait(this, 0, 0, threadManager.currentThread); + } + }, + + /** + * Pause this transport's attempts to read from the input stream. This is + * used when we've temporarily handed off our input stream for reading bulk + * data. + */ + pauseIncoming: function () { + this._incomingEnabled = false; + }, + + /** + * Resume this transport's attempts to read from the input stream. + */ + resumeIncoming: function () { + this._incomingEnabled = true; + this._flushIncoming(); + this._waitForIncoming(); + }, + + // nsIInputStreamCallback + /** + * Called when the stream is either readable or closed. + */ + onInputStreamReady: DevToolsUtils.makeInfallible(function (stream) { + try { + while (stream.available() && this._incomingEnabled && + this._processIncoming(stream, stream.available())) { + // Loop until there is nothing more to process + } + this._waitForIncoming(); + } catch (e) { + if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this.close(e.result); + } else { + throw e; + } + } + }, "DebuggerTransport.prototype.onInputStreamReady"), + + /** + * Process the incoming data. Will create a new currently incoming Packet if + * needed. Tells the incoming Packet to read as much data as it can, but + * reading may not complete. The Packet signals that its data is ready for + * delivery by calling one of this transport's _on*Ready methods (see + * ./packets.js and the _on*Ready methods below). + * @return boolean + * Whether incoming stream processing should continue for any + * remaining data. + */ + _processIncoming: function (stream, count) { + dumpv("Data available: " + count); + + if (!count) { + dumpv("Nothing to read, skipping"); + return false; + } + + try { + if (!this._incoming) { + dumpv("Creating a new packet from incoming"); + + if (!this._readHeader(stream)) { + // Not enough data to read packet type + return false; + } + + // Attempt to create a new Packet by trying to parse each possible + // header pattern. + this._incoming = Packet.fromHeader(this._incomingHeader, this); + if (!this._incoming) { + throw new Error("No packet types for header: " + + this._incomingHeader); + } + } + + if (!this._incoming.done) { + // We have an incomplete packet, keep reading it. + dumpv("Existing packet incomplete, keep reading"); + this._incoming.read(stream, this._scriptableInput); + } + } catch (e) { + let msg = "Error reading incoming packet: (" + e + " - " + e.stack + ")"; + dumpn(msg); + + // Now in an invalid state, shut down the transport. + this.close(); + return false; + } + + if (!this._incoming.done) { + // Still not complete, we'll wait for more data. + dumpv("Packet not done, wait for more"); + return true; + } + + // Ready for next packet + this._flushIncoming(); + return true; + }, + + /** + * Read as far as we can into the incoming data, attempting to build up a + * complete packet header (which terminates with ":"). We'll only read up to + * PACKET_HEADER_MAX characters. + * @return boolean + * True if we now have a complete header. + */ + _readHeader: function () { + let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length; + this._incomingHeader += + StreamUtils.delimitedRead(this._scriptableInput, ":", amountToRead); + if (flags.wantVerbose) { + dumpv("Header read: " + this._incomingHeader); + } + + if (this._incomingHeader.endsWith(":")) { + if (flags.wantVerbose) { + dumpv("Found packet header successfully: " + this._incomingHeader); + } + return true; + } + + if (this._incomingHeader.length >= PACKET_HEADER_MAX) { + throw new Error("Failed to parse packet header!"); + } + + // Not enough data yet. + return false; + }, + + /** + * If the incoming packet is done, log it as needed and clear the buffer. + */ + _flushIncoming: function () { + if (!this._incoming.done) { + return; + } + if (flags.wantLogging) { + dumpn("Got: " + this._incoming); + } + this._destroyIncoming(); + }, + + /** + * Handler triggered by an incoming JSONPacket completing it's |read| method. + * Delivers the packet to this.hooks.onPacket. + */ + _onJSONObjectReady: function (object) { + DevToolsUtils.executeSoon(DevToolsUtils.makeInfallible(() => { + // Ensure the transport is still alive by the time this runs. + if (this.active) { + this.emit("packet", object); + this.hooks.onPacket(object); + } + }, "DebuggerTransport instance's this.hooks.onPacket")); + }, + + /** + * Handler triggered by an incoming BulkPacket entering the |read| phase for + * the stream portion of the packet. Delivers info about the incoming + * streaming data to this.hooks.onBulkPacket. See the main comment on the + * transport at the top of this file for more details. + */ + _onBulkReadReady: function (...args) { + DevToolsUtils.executeSoon(DevToolsUtils.makeInfallible(() => { + // Ensure the transport is still alive by the time this runs. + if (this.active) { + this.emit("bulkpacket", ...args); + this.hooks.onBulkPacket(...args); + } + }, "DebuggerTransport instance's this.hooks.onBulkPacket")); + }, + + /** + * Remove all handlers and references related to the current incoming packet, + * either because it is now complete or because the transport is closing. + */ + _destroyIncoming: function () { + if (this._incoming) { + this._incoming.destroy(); + } + this._incomingHeader = ""; + this._incoming = null; + } + + }; + + exports.DebuggerTransport = DebuggerTransport; + + /** + * An adapter that handles data transfers between the debugger client and + * server when they both run in the same process. It presents the same API as + * DebuggerTransport, but instead of transmitting serialized messages across a + * connection it merely calls the packet dispatcher of the other side. + * + * @param other LocalDebuggerTransport + * The other endpoint for this debugger connection. + * + * @see DebuggerTransport + */ + function LocalDebuggerTransport(other) { + EventEmitter.decorate(this); + + this.other = other; + this.hooks = null; + + // A packet number, shared between this and this.other. This isn't used by the + // protocol at all, but it makes the packet traces a lot easier to follow. + this._serial = this.other ? this.other._serial : { count: 0 }; + this.close = this.close.bind(this); + } + + LocalDebuggerTransport.prototype = { + /** + * Transmit a message by directly calling the onPacket handler of the other + * endpoint. + */ + send: function (packet) { + this.emit("send", packet); + + let serial = this._serial.count++; + if (flags.wantLogging) { + // Check 'from' first, as 'echo' packets have both. + if (packet.from) { + dumpn("Packet " + serial + " sent from " + uneval(packet.from)); + } else if (packet.to) { + dumpn("Packet " + serial + " sent to " + uneval(packet.to)); + } + } + this._deepFreeze(packet); + let other = this.other; + if (other) { + DevToolsUtils.executeSoon(DevToolsUtils.makeInfallible(() => { + // Avoid the cost of JSON.stringify() when logging is disabled. + if (flags.wantLogging) { + dumpn("Received packet " + serial + ": " + JSON.stringify(packet, null, 2)); + } + if (other.hooks) { + other.emit("packet", packet); + other.hooks.onPacket(packet); + } + }, "LocalDebuggerTransport instance's this.other.hooks.onPacket")); + } + }, + + /** + * Send a streaming bulk packet directly to the onBulkPacket handler of the + * other endpoint. + * + * This case is much simpler than the full DebuggerTransport, since there is + * no primary stream we have to worry about managing while we hand it off to + * others temporarily. Instead, we can just make a single use pipe and be + * done with it. + */ + startBulkSend: function ({actor, type, length}) { + this.emit("startbulksend", {actor, type, length}); + + let serial = this._serial.count++; + + dumpn("Sent bulk packet " + serial + " for actor " + actor); + if (!this.other) { + let error = new Error("startBulkSend: other side of transport missing"); + return promise.reject(error); + } + + let pipe = new Pipe(true, true, 0, 0, null); + + DevToolsUtils.executeSoon(DevToolsUtils.makeInfallible(() => { + dumpn("Received bulk packet " + serial); + if (!this.other.hooks) { + return; + } + + // Receiver + let deferred = defer(); + let packet = { + actor: actor, + type: type, + length: length, + copyTo: (output) => { + let copying = + StreamUtils.copyStream(pipe.inputStream, output, length); + deferred.resolve(copying); + return copying; + }, + stream: pipe.inputStream, + done: deferred + }; + + this.other.emit("bulkpacket", packet); + this.other.hooks.onBulkPacket(packet); + + // Await the result of reading from the stream + deferred.promise.then(() => pipe.inputStream.close(), this.close); + }, "LocalDebuggerTransport instance's this.other.hooks.onBulkPacket")); + + // Sender + let sendDeferred = defer(); + + // The remote transport is not capable of resolving immediately here, so we + // shouldn't be able to either. + DevToolsUtils.executeSoon(() => { + let copyDeferred = defer(); + + sendDeferred.resolve({ + copyFrom: (input) => { + let copying = + StreamUtils.copyStream(input, pipe.outputStream, length); + copyDeferred.resolve(copying); + return copying; + }, + stream: pipe.outputStream, + done: copyDeferred + }); + + // Await the result of writing to the stream + copyDeferred.promise.then(() => pipe.outputStream.close(), this.close); + }); + + return sendDeferred.promise; + }, + + /** + * Close the transport. + */ + close: function () { + this.emit("close"); + + if (this.other) { + // Remove the reference to the other endpoint before calling close(), to + // avoid infinite recursion. + let other = this.other; + this.other = null; + other.close(); + } + if (this.hooks) { + try { + this.hooks.onClosed(); + } catch (ex) { + console.error(ex); + } + this.hooks = null; + } + }, + + /** + * An empty method for emulating the DebuggerTransport API. + */ + ready: function () {}, + + /** + * Helper function that makes an object fully immutable. + */ + _deepFreeze: function (object) { + Object.freeze(object); + for (let prop in object) { + // Freeze the properties that are objects, not on the prototype, and not + // already frozen. Note that this might leave an unfrozen reference + // somewhere in the object if there is an already frozen object containing + // an unfrozen object. + if (object.hasOwnProperty(prop) && typeof object === "object" && + !Object.isFrozen(object)) { + this._deepFreeze(object[prop]); + } + } + } + }; + + exports.LocalDebuggerTransport = LocalDebuggerTransport; + + /** + * A transport for the debugging protocol that uses nsIMessageManagers to + * exchange packets with servers running in child processes. + * + * In the parent process, |mm| should be the nsIMessageSender for the + * child process. In a child process, |mm| should be the child process + * message manager, which sends packets to the parent. + * + * |prefix| is a string included in the message names, to distinguish + * multiple servers running in the same child process. + * + * This transport exchanges messages named 'debug:<prefix>:packet', where + * <prefix> is |prefix|, whose data is the protocol packet. + */ + function ChildDebuggerTransport(mm, prefix) { + EventEmitter.decorate(this); + + this._mm = mm; + this._messageName = "debug:" + prefix + ":packet"; + } + + /* + * To avoid confusion, we use 'message' to mean something that + * nsIMessageSender conveys, and 'packet' to mean a remote debugging + * protocol packet. + */ + ChildDebuggerTransport.prototype = { + constructor: ChildDebuggerTransport, + + hooks: null, + + _addListener() { + this._mm.addMessageListener(this._messageName, this); + }, + + _removeListener() { + try { + this._mm.removeMessageListener(this._messageName, this); + } catch (e) { + if (e.result != Cr.NS_ERROR_NULL_POINTER) { + throw e; + } + // In some cases, especially when using messageManagers in non-e10s mode, we reach + // this point with a dead messageManager which only throws errors but does not + // seem to indicate in any other way that it is dead. + } + }, + + ready: function () { + this._addListener(); + }, + + close: function () { + this._removeListener(); + this.emit("close"); + this.hooks.onClosed(); + }, + + receiveMessage: function ({data}) { + this.emit("packet", data); + this.hooks.onPacket(data); + }, + + send: function (packet) { + this.emit("send", packet); + try { + this._mm.sendAsyncMessage(this._messageName, packet); + } catch (e) { + if (e.result != Cr.NS_ERROR_NULL_POINTER) { + throw e; + } + // In some cases, especially when using messageManagers in non-e10s mode, we reach + // this point with a dead messageManager which only throws errors but does not + // seem to indicate in any other way that it is dead. + } + }, + + startBulkSend: function () { + throw new Error("Can't send bulk data to child processes."); + }, + + swapBrowser(mm) { + this._removeListener(); + this._mm = mm; + this._addListener(); + }, + }; + + exports.ChildDebuggerTransport = ChildDebuggerTransport; + + // WorkerDebuggerTransport is defined differently depending on whether we are + // on the main thread or a worker thread. In the former case, we are required + // by the devtools loader, and isWorker will be false. Otherwise, we are + // required by the worker loader, and isWorker will be true. + // + // Each worker debugger supports only a single connection to the main thread. + // However, its theoretically possible for multiple servers to connect to the + // same worker. Consequently, each transport has a connection id, to allow + // messages from multiple connections to be multiplexed on a single channel. + + if (!this.isWorker) { + // Main thread + (function () { + /** + * A transport that uses a WorkerDebugger to send packets from the main + * thread to a worker thread. + */ + function WorkerDebuggerTransport(dbg, id) { + this._dbg = dbg; + this._id = id; + this.onMessage = this._onMessage.bind(this); + } + + WorkerDebuggerTransport.prototype = { + constructor: WorkerDebuggerTransport, + + ready: function () { + this._dbg.addListener(this); + }, + + close: function () { + this._dbg.removeListener(this); + if (this.hooks) { + this.hooks.onClosed(); + } + }, + + send: function (packet) { + this._dbg.postMessage(JSON.stringify({ + type: "message", + id: this._id, + message: packet + })); + }, + + startBulkSend: function () { + throw new Error("Can't send bulk data from worker threads!"); + }, + + _onMessage: function (message) { + let packet = JSON.parse(message); + if (packet.type !== "message" || packet.id !== this._id) { + return; + } + + if (this.hooks) { + this.hooks.onPacket(packet.message); + } + } + }; + + exports.WorkerDebuggerTransport = WorkerDebuggerTransport; + }).call(this); + } else { + // Worker thread + (function () { + /** + * A transport that uses a WorkerDebuggerGlobalScope to send packets from a + * worker thread to the main thread. + */ + function WorkerDebuggerTransport(scope, id) { + this._scope = scope; + this._id = id; + this._onMessage = this._onMessage.bind(this); + } + + WorkerDebuggerTransport.prototype = { + constructor: WorkerDebuggerTransport, + + ready: function () { + this._scope.addEventListener("message", this._onMessage); + }, + + close: function () { + this._scope.removeEventListener("message", this._onMessage); + if (this.hooks) { + this.hooks.onClosed(); + } + }, + + send: function (packet) { + this._scope.postMessage(JSON.stringify({ + type: "message", + id: this._id, + message: packet + })); + }, + + startBulkSend: function () { + throw new Error("Can't send bulk data from worker threads!"); + }, + + _onMessage: function (event) { + let packet = JSON.parse(event.data); + if (packet.type !== "message" || packet.id !== this._id) { + return; + } + + if (this.hooks) { + this.hooks.onPacket(packet.message); + } + } + }; + + exports.WorkerDebuggerTransport = WorkerDebuggerTransport; + }).call(this); + } +}); diff --git a/devtools/shared/transport/websocket-transport.js b/devtools/shared/transport/websocket-transport.js new file mode 100644 index 000000000..6ba474106 --- /dev/null +++ b/devtools/shared/transport/websocket-transport.js @@ -0,0 +1,79 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const EventEmitter = require("devtools/shared/event-emitter"); + +function WebSocketDebuggerTransport(socket) { + EventEmitter.decorate(this); + + this.active = false; + this.hooks = null; + this.socket = socket; +} + +WebSocketDebuggerTransport.prototype = { + ready() { + if (this.active) { + return; + } + + this.socket.addEventListener("message", this); + this.socket.addEventListener("close", this); + + this.active = true; + }, + + send(object) { + this.emit("send", object); + if (this.socket) { + this.socket.send(JSON.stringify(object)); + } + }, + + startBulkSend() { + throw new Error("Bulk send is not supported by WebSocket transport"); + }, + + close() { + this.emit("close"); + this.active = false; + + this.socket.removeEventListener("message", this); + this.socket.removeEventListener("close", this); + this.socket.close(); + this.socket = null; + + if (this.hooks) { + this.hooks.onClosed(); + this.hooks = null; + } + }, + + handleEvent(event) { + switch (event.type) { + case "message": + this.onMessage(event); + break; + case "close": + this.close(); + break; + } + }, + + onMessage({ data }) { + if (typeof data !== "string") { + throw new Error("Binary messages are not supported by WebSocket transport"); + } + + let object = JSON.parse(data); + this.emit("packet", object); + if (this.hooks) { + this.hooks.onPacket(object); + } + }, +}; + +module.exports = WebSocketDebuggerTransport; |