'use strict'; const SERVER_BACKLOG = -1; const SOCKET_EVENTS = ['open', 'data', 'drain', 'error', 'close']; function concatUint8Arrays(a, b) { let newArr = new Uint8Array(a.length + b.length); newArr.set(a, 0); newArr.set(b, a.length); return newArr; } function assertUint8ArraysEqual(a, b, comparingWhat) { if (a.length !== b.length) { ok(false, comparingWhat + ' arrays do not have the same length; ' + a.length + ' versus ' + b.length); return; } for (let i = 0; i < a.length; i++) { if (a[i] !== b[i]) { ok(false, comparingWhat + ' arrays differ at index ' + i + a[i] + ' versus ' + b[i]); return; } } ok(true, comparingWhat + ' arrays were equivalent.'); } /** * Helper method to add event listeners to a socket and provide two Promise-returning * helpers (see below for docs on them). This *must* be called during the turn of * the event loop where TCPSocket's constructor is called or the onconnect method is being * invoked. */ function listenForEventsOnSocket(socket, socketType) { let wantDataLength = null; let wantDataAndClose = false; let pendingResolve = null; let receivedEvents = []; let receivedData = null; let handleGenericEvent = function(event) { dump('(' + socketType + ' event: ' + event.type + ')\n'); if (pendingResolve && wantDataLength === null) { pendingResolve(event); pendingResolve = null; } else { receivedEvents.push(event); } }; socket.onopen = handleGenericEvent; socket.ondrain = handleGenericEvent; socket.onerror = handleGenericEvent; socket.onclose = function(event) { if (!wantDataAndClose) { handleGenericEvent(event); } else if (pendingResolve) { dump('(' + socketType + ' event: close)\n'); pendingResolve(receivedData); pendingResolve = null; wantDataAndClose = false; } } socket.ondata = function(event) { dump('(' + socketType + ' event: ' + event.type + ' length: ' + event.data.byteLength + ')\n'); ok(socketCompartmentInstanceOfArrayBuffer(event.data), 'payload is ArrayBuffer'); var arr = new Uint8Array(event.data); if (receivedData === null) { receivedData = arr; } else { receivedData = concatUint8Arrays(receivedData, arr); } if (wantDataLength !== null && receivedData.length >= wantDataLength) { pendingResolve(receivedData); pendingResolve = null; receivedData = null; wantDataLength = null; } }; return { /** * Return a Promise that will be resolved with the next (non-data) event * received by the socket. If there are queued events, the Promise will * be immediately resolved (but you won't see that until a future turn of * the event loop). */ waitForEvent: function() { if (pendingResolve) { throw new Error('only one wait allowed at a time.'); } if (receivedEvents.length) { return Promise.resolve(receivedEvents.shift()); } dump('(' + socketType + ' waiting for event)\n'); return new Promise(function(resolve, reject) { pendingResolve = resolve; }); }, /** * Return a Promise that will be resolved with a Uint8Array of at least the * given length. We buffer / accumulate received data until we have enough * data. Data is buffered even before you call this method, so be sure to * explicitly wait for any and all data sent by the other side. */ waitForDataWithAtLeastLength: function(length) { if (pendingResolve) { throw new Error('only one wait allowed at a time.'); } if (receivedData && receivedData.length >= length) { let promise = Promise.resolve(receivedData); receivedData = null; return promise; } dump('(' + socketType + ' waiting for ' + length + ' bytes)\n'); return new Promise(function(resolve, reject) { pendingResolve = resolve; wantDataLength = length; }); }, waitForAnyDataAndClose: function() { if (pendingResolve) { throw new Error('only one wait allowed at a time.'); } return new Promise(function(resolve, reject) { pendingResolve = resolve; // we may receive no data before getting close, in which case we want to // return an empty array receivedData = new Uint8Array(); wantDataAndClose = true; }); } }; } /** * Return a promise that is resolved when the server receives a connection. The * promise is resolved with { socket, queue } where `queue` is the result of * calling listenForEventsOnSocket(socket). This must be done because we need * to add the event listener during the connection. */ function waitForConnection(listeningServer) { return new Promise(function(resolve, reject) { // Because of the event model of sockets, we can't use the // listenForEventsOnSocket mechanism; we need to hook up listeners during // the connect event. listeningServer.onconnect = function(event) { // Clobber the listener to get upset if it receives any more connections // after this. listeningServer.onconnect = function() { ok(false, 'Received a connection when not expecting one.'); }; ok(true, 'Listening server accepted socket'); resolve({ socket: event.socket, queue: listenForEventsOnSocket(event.socket, 'server') }); }; }); } function defer() { var deferred = {}; deferred.promise = new Promise(function(resolve, reject) { deferred.resolve = resolve; deferred.reject = reject; }); return deferred; } function* test_basics() { // See bug 903830; in e10s mode we never get to find out the localPort if we // let it pick a free port by choosing 0. This is the same port the xpcshell // test was using. let serverPort = 8085; // - Start up a listening socket. let listeningServer = createServer(serverPort, { binaryType: 'arraybuffer' }, SERVER_BACKLOG); let connectedPromise = waitForConnection(listeningServer); // -- Open a connection to the server let clientSocket = createSocket('127.0.0.1', serverPort, { binaryType: 'arraybuffer' }); let clientQueue = listenForEventsOnSocket(clientSocket, 'client'); // (the client connects) is((yield clientQueue.waitForEvent()).type, 'open', 'got open event'); is(clientSocket.readyState, 'open', 'client readyState is open'); // (the server connected) let { socket: serverSocket, queue: serverQueue } = yield connectedPromise; is(serverSocket.readyState, 'open', 'server readyState is open'); // -- Simple send / receive // - Send data from client to server // (But not so much we cross the drain threshold.) let smallUint8Array = new Uint8Array(256); for (let i = 0; i < smallUint8Array.length; i++) { smallUint8Array[i] = i; } is(clientSocket.send(smallUint8Array.buffer, 0, smallUint8Array.length), true, 'Client sending less than 64k, buffer should not be full.'); let serverReceived = yield serverQueue.waitForDataWithAtLeastLength(256); assertUint8ArraysEqual(serverReceived, smallUint8Array, 'Server received/client sent'); // - Send data from server to client // (But not so much we cross the drain threshold.) is(serverSocket.send(smallUint8Array.buffer, 0, smallUint8Array.length), true, 'Server sending less than 64k, buffer should not be full.'); let clientReceived = yield clientQueue.waitForDataWithAtLeastLength(256); assertUint8ArraysEqual(clientReceived, smallUint8Array, 'Client received/server sent'); // -- Perform sending multiple times with different buffer slices // - Send data from client to server // (But not so much we cross the drain threshold.) is(clientSocket.send(smallUint8Array.buffer, 0, 7), true, 'Client sending less than 64k, buffer should not be full.'); is(clientSocket.send(smallUint8Array.buffer, 7, smallUint8Array.length - 7), true, 'Client sending less than 64k, buffer should not be full.'); serverReceived = yield serverQueue.waitForDataWithAtLeastLength(256); assertUint8ArraysEqual(serverReceived, smallUint8Array, 'Server received/client sent'); // - Send data from server to client // (But not so much we cross the drain threshold.) is(serverSocket.send(smallUint8Array.buffer, 0, 7), true, 'Server sending less than 64k, buffer should not be full.'); is(serverSocket.send(smallUint8Array.buffer, 7, smallUint8Array.length - 7), true, 'Server sending less than 64k, buffer should not be full.'); clientReceived = yield clientQueue.waitForDataWithAtLeastLength(256); assertUint8ArraysEqual(clientReceived, smallUint8Array, 'Client received/server sent'); // -- Send "big" data in both directions // (Enough to cross the buffering/drain threshold; 64KiB) let bigUint8Array = new Uint8Array(65536 + 3); for (let i = 0; i < bigUint8Array.length; i++) { bigUint8Array[i] = i % 256; } // Do this twice so we have confidence that the 'drain' event machinery // doesn't break after the first use. for (let iSend = 0; iSend < 2; iSend++) { // - Send "big" data from the client to the server is(clientSocket.send(bigUint8Array.buffer, 0, bigUint8Array.length), false, 'Client sending more than 64k should result in the buffer being full.'); is((yield clientQueue.waitForEvent()).type, 'drain', 'The drain event should fire after a large send that indicated full.'); serverReceived = yield serverQueue.waitForDataWithAtLeastLength( bigUint8Array.length); assertUint8ArraysEqual(serverReceived, bigUint8Array, 'server received/client sent'); // - Send "big" data from the server to the client is(serverSocket.send(bigUint8Array.buffer, 0, bigUint8Array.length), false, 'Server sending more than 64k should result in the buffer being full.'); is((yield serverQueue.waitForEvent()).type, 'drain', 'The drain event should fire after a large send that indicated full.'); clientReceived = yield clientQueue.waitForDataWithAtLeastLength( bigUint8Array.length); assertUint8ArraysEqual(clientReceived, bigUint8Array, 'client received/server sent'); } // -- Server closes the connection serverSocket.close(); is(serverSocket.readyState, 'closing', 'readyState should be closing immediately after calling close'); is((yield clientQueue.waitForEvent()).type, 'close', 'The client should get a close event when the server closes.'); is(clientSocket.readyState, 'closed', 'client readyState should be closed after close event'); is((yield serverQueue.waitForEvent()).type, 'close', 'The server should get a close event when it closes itself.'); is(serverSocket.readyState, 'closed', 'server readyState should be closed after close event'); // -- Re-establish connection connectedPromise = waitForConnection(listeningServer); clientSocket = createSocket('127.0.0.1', serverPort, { binaryType: 'arraybuffer' }); clientQueue = listenForEventsOnSocket(clientSocket, 'client'); is((yield clientQueue.waitForEvent()).type, 'open', 'got open event'); let connectedResult = yield connectedPromise; // destructuring assignment is not yet ES6 compliant, must manually unpack serverSocket = connectedResult.socket; serverQueue = connectedResult.queue; // -- Client closes the connection clientSocket.close(); is(clientSocket.readyState, 'closing', 'client readyState should be losing immediately after calling close'); is((yield clientQueue.waitForEvent()).type, 'close', 'The client should get a close event when it closes itself.'); is(clientSocket.readyState, 'closed', 'client readyState should be closed after the close event is received'); is((yield serverQueue.waitForEvent()).type, 'close', 'The server should get a close event when the client closes.'); is(serverSocket.readyState, 'closed', 'server readyState should be closed after the close event is received'); // -- Re-establish connection connectedPromise = waitForConnection(listeningServer); clientSocket = createSocket('127.0.0.1', serverPort, { binaryType: 'arraybuffer' }); clientQueue = listenForEventsOnSocket(clientSocket, 'client'); is((yield clientQueue.waitForEvent()).type, 'open', 'got open event'); connectedResult = yield connectedPromise; // destructuring assignment is not yet ES6 compliant, must manually unpack serverSocket = connectedResult.socket; serverQueue = connectedResult.queue; // -- Call close after enqueueing a lot of data, make sure it goes through. // We'll have the client send and close. is(clientSocket.send(bigUint8Array.buffer, 0, bigUint8Array.length), false, 'Client sending more than 64k should result in the buffer being full.'); clientSocket.close(); // The drain will still fire is((yield clientQueue.waitForEvent()).type, 'drain', 'The drain event should fire after a large send that returned true.'); // Then we'll get a close is((yield clientQueue.waitForEvent()).type, 'close', 'The close event should fire after the drain event.'); // The server will get its data serverReceived = yield serverQueue.waitForDataWithAtLeastLength( bigUint8Array.length); assertUint8ArraysEqual(serverReceived, bigUint8Array, 'server received/client sent'); // And a close. is((yield serverQueue.waitForEvent()).type, 'close', 'The drain event should fire after a large send that returned true.'); // -- Re-establish connection connectedPromise = waitForConnection(listeningServer); clientSocket = createSocket('127.0.0.1', serverPort, { binaryType: 'string' }); clientQueue = listenForEventsOnSocket(clientSocket, 'client'); is((yield clientQueue.waitForEvent()).type, 'open', 'got open event'); connectedResult = yield connectedPromise; // destructuring assignment is not yet ES6 compliant, must manually unpack serverSocket = connectedResult.socket; serverQueue = connectedResult.queue; // -- Attempt to send non-string data. // Restore the original behavior by replacing toString with // Object.prototype.toString. (bug 1121938) bigUint8Array.toString = Object.prototype.toString; is(clientSocket.send(bigUint8Array), true, 'Client sending a large non-string should only send a small string.'); clientSocket.close(); // The server will get its data serverReceived = yield serverQueue.waitForDataWithAtLeastLength( bigUint8Array.toString().length); // Then we'll get a close is((yield clientQueue.waitForEvent()).type, 'close', 'The close event should fire after the drain event.'); // -- Re-establish connection (Test for Close Immediately) connectedPromise = waitForConnection(listeningServer); clientSocket = createSocket('127.0.0.1', serverPort, { binaryType: 'arraybuffer' }); clientQueue = listenForEventsOnSocket(clientSocket, 'client'); is((yield clientQueue.waitForEvent()).type, 'open', 'got open event'); connectedResult = yield connectedPromise; // destructuring assignment is not yet ES6 compliant, must manually unpack serverSocket = connectedResult.socket; serverQueue = connectedResult.queue; // -- Attempt to send two non-string data. is(clientSocket.send(bigUint8Array.buffer, 0, bigUint8Array.length), false, 'Server sending more than 64k should result in the buffer being full.'); is(clientSocket.send(bigUint8Array.buffer, 0, bigUint8Array.length), false, 'Server sending more than 64k should result in the buffer being full.'); clientSocket.closeImmediately(); serverReceived = yield serverQueue.waitForAnyDataAndClose(); is(serverReceived.length < (2 * bigUint8Array.length), true, 'Received array length less than sent array length'); // -- Close the listening server (and try to connect) // We want to verify that the server actually closes / stops listening when // we tell it to. listeningServer.close(); // - try and connect, get an error clientSocket = createSocket('127.0.0.1', serverPort, { binaryType: 'arraybuffer' }); clientQueue = listenForEventsOnSocket(clientSocket, 'client'); is((yield clientQueue.waitForEvent()).type, 'error', 'fail to connect'); is(clientSocket.readyState, 'closed', 'client readyState should be closed after the failure to connect'); } add_task(test_basics);