diff options
Diffstat (limited to 'testing/xpcshell/node-http2/lib')
-rw-r--r-- | testing/xpcshell/node-http2/lib/http.js | 1262 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/index.js | 52 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/compressor.js | 1366 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/connection.js | 619 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/endpoint.js | 262 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/flow.js | 353 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/framer.js | 1165 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/index.js | 91 | ||||
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/stream.js | 659 |
9 files changed, 5829 insertions, 0 deletions
diff --git a/testing/xpcshell/node-http2/lib/http.js b/testing/xpcshell/node-http2/lib/http.js new file mode 100644 index 000000000..4c4234c5c --- /dev/null +++ b/testing/xpcshell/node-http2/lib/http.js @@ -0,0 +1,1262 @@ +// Public API +// ========== + +// The main governing power behind the http2 API design is that it should look very similar to the +// existing node.js [HTTPS API][1] (which is, in turn, almost identical to the [HTTP API][2]). The +// additional features of HTTP/2 are exposed as extensions to this API. Furthermore, node-http2 +// should fall back to using HTTP/1.1 if needed. Compatibility with undocumented or deprecated +// elements of the node.js HTTP/HTTPS API is a non-goal. +// +// Additional and modified API elements +// ------------------------------------ +// +// - **Class: http2.Endpoint**: an API for using the raw HTTP/2 framing layer. For documentation +// see [protocol/endpoint.js](protocol/endpoint.html). +// +// - **Class: http2.Server** +// - **Event: 'connection' (socket, [endpoint])**: there's a second argument if the negotiation of +// HTTP/2 was successful: the reference to the [Endpoint](protocol/endpoint.html) object tied to the +// socket. +// +// - **http2.createServer(options, [requestListener])**: additional option: +// - **log**: an optional [bunyan](https://github.com/trentm/node-bunyan) logger object +// +// - **Class: http2.ServerResponse** +// - **response.push(options)**: initiates a server push. `options` describes the 'imaginary' +// request to which the push stream is a response; the possible options are identical to the +// ones accepted by `http2.request`. Returns a ServerResponse object that can be used to send +// the response headers and content. +// +// - **Class: http2.Agent** +// - **new Agent(options)**: additional option: +// - **log**: an optional [bunyan](https://github.com/trentm/node-bunyan) logger object +// - **agent.sockets**: only contains TCP sockets that corresponds to HTTP/1 requests. +// - **agent.endpoints**: contains [Endpoint](protocol/endpoint.html) objects for HTTP/2 connections. +// +// - **http2.request(options, [callback])**: +// - similar to http.request +// +// - **http2.get(options, [callback])**: +// - similar to http.get +// +// - **Class: http2.ClientRequest** +// - **Event: 'socket' (socket)**: in case of an HTTP/2 incoming message, `socket` is a reference +// to the associated [HTTP/2 Stream](protocol/stream.html) object (and not to the TCP socket). +// - **Event: 'push' (promise)**: signals the intention of a server push associated to this +// request. `promise` is an IncomingPromise. If there's no listener for this event, the server +// push is cancelled. +// - **request.setPriority(priority)**: assign a priority to this request. `priority` is a number +// between 0 (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. +// +// - **Class: http2.IncomingMessage** +// - has two subclasses for easier interface description: **IncomingRequest** and +// **IncomingResponse** +// - **message.socket**: in case of an HTTP/2 incoming message, it's a reference to the associated +// [HTTP/2 Stream](protocol/stream.html) object (and not to the TCP socket). +// +// - **Class: http2.IncomingRequest (IncomingMessage)** +// - **message.url**: in case of an HTTP/2 incoming request, the `url` field always contains the +// path, and never a full url (it contains the path in most cases in the HTTPS api as well). +// - **message.scheme**: additional field. Mandatory HTTP/2 request metadata. +// - **message.host**: additional field. Mandatory HTTP/2 request metadata. Note that this +// replaces the old Host header field, but node-http2 will add Host to the `message.headers` for +// backwards compatibility. +// +// - **Class: http2.IncomingPromise (IncomingRequest)** +// - contains the metadata of the 'imaginary' request to which the server push is an answer. +// - **Event: 'response' (response)**: signals the arrival of the actual push stream. `response` +// is an IncomingResponse. +// - **Event: 'push' (promise)**: signals the intention of a server push associated to this +// request. `promise` is an IncomingPromise. If there's no listener for this event, the server +// push is cancelled. +// - **promise.cancel()**: cancels the promised server push. +// - **promise.setPriority(priority)**: assign a priority to this push stream. `priority` is a +// number between 0 (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. +// +// API elements not yet implemented +// -------------------------------- +// +// - **Class: http2.Server** +// - **server.maxHeadersCount** +// +// API elements that are not applicable to HTTP/2 +// ---------------------------------------------- +// +// The reason may be deprecation of certain HTTP/1.1 features, or that some API elements simply +// don't make sense when using HTTP/2. These will not be present when a request is done with HTTP/2, +// but will function normally when falling back to using HTTP/1.1. +// +// - **Class: http2.Server** +// - **Event: 'checkContinue'**: not in the spec +// - **Event: 'upgrade'**: upgrade is deprecated in HTTP/2 +// - **Event: 'timeout'**: HTTP/2 sockets won't timeout because of application level keepalive +// (PING frames) +// - **Event: 'connect'**: not yet supported +// - **server.setTimeout(msecs, [callback])** +// - **server.timeout** +// +// - **Class: http2.ServerResponse** +// - **Event: 'close'** +// - **Event: 'timeout'** +// - **response.writeContinue()** +// - **response.writeHead(statusCode, [reasonPhrase], [headers])**: reasonPhrase will always be +// ignored since [it's not supported in HTTP/2][3] +// - **response.setTimeout(timeout, [callback])** +// +// - **Class: http2.Agent** +// - **agent.maxSockets**: only affects HTTP/1 connection pool. When using HTTP/2, there's always +// one connection per host. +// +// - **Class: http2.ClientRequest** +// - **Event: 'upgrade'** +// - **Event: 'connect'** +// - **Event: 'continue'** +// - **request.setTimeout(timeout, [callback])** +// - **request.setNoDelay([noDelay])** +// - **request.setSocketKeepAlive([enable], [initialDelay])** +// +// - **Class: http2.IncomingMessage** +// - **Event: 'close'** +// - **message.setTimeout(timeout, [callback])** +// +// [1]: https://nodejs.org/api/https.html +// [2]: https://nodejs.org/api/http.html +// [3]: https://tools.ietf.org/html/rfc7540#section-8.1.2.4 + +// Common server and client side code +// ================================== + +var net = require('net'); +var url = require('url'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var PassThrough = require('stream').PassThrough; +var Readable = require('stream').Readable; +var Writable = require('stream').Writable; +var protocol = require('./protocol'); +var Endpoint = protocol.Endpoint; +var http = require('http'); +var https = require('https'); + +exports.STATUS_CODES = http.STATUS_CODES; +exports.IncomingMessage = IncomingMessage; +exports.OutgoingMessage = OutgoingMessage; +exports.protocol = protocol; + +var deprecatedHeaders = [ + 'connection', + 'host', + 'keep-alive', + 'proxy-connection', + 'transfer-encoding', + 'upgrade' +]; + +// When doing NPN/ALPN negotiation, HTTP/1.1 is used as fallback +var supportedProtocols = [protocol.VERSION, 'http/1.1', 'http/1.0']; + +// Ciphersuite list based on the recommendations of https://wiki.mozilla.org/Security/Server_Side_TLS +// The only modification is that kEDH+AESGCM were placed after DHE and ECDHE suites +var cipherSuites = [ + 'ECDHE-RSA-AES128-GCM-SHA256', + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'ECDHE-RSA-AES256-GCM-SHA384', + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'DHE-RSA-AES128-GCM-SHA256', + 'DHE-DSS-AES128-GCM-SHA256', + 'ECDHE-RSA-AES128-SHA256', + 'ECDHE-ECDSA-AES128-SHA256', + 'ECDHE-RSA-AES128-SHA', + 'ECDHE-ECDSA-AES128-SHA', + 'ECDHE-RSA-AES256-SHA384', + 'ECDHE-ECDSA-AES256-SHA384', + 'ECDHE-RSA-AES256-SHA', + 'ECDHE-ECDSA-AES256-SHA', + 'DHE-RSA-AES128-SHA256', + 'DHE-RSA-AES128-SHA', + 'DHE-DSS-AES128-SHA256', + 'DHE-RSA-AES256-SHA256', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA', + 'kEDH+AESGCM', + 'AES128-GCM-SHA256', + 'AES256-GCM-SHA384', + 'ECDHE-RSA-RC4-SHA', + 'ECDHE-ECDSA-RC4-SHA', + 'AES128', + 'AES256', + 'RC4-SHA', + 'HIGH', + '!aNULL', + '!eNULL', + '!EXPORT', + '!DES', + '!3DES', + '!MD5', + '!PSK' +].join(':'); + +// Logging +// ------- + +// Logger shim, used when no logger is provided by the user. +function noop() {} +var defaultLogger = { + fatal: noop, + error: noop, + warn : noop, + info : noop, + debug: noop, + trace: noop, + + child: function() { return this; } +}; + +// Bunyan serializers exported by submodules that are worth adding when creating a logger. +exports.serializers = protocol.serializers; + +// IncomingMessage class +// --------------------- + +function IncomingMessage(stream) { + // * This is basically a read-only wrapper for the [Stream](protocol/stream.html) class. + PassThrough.call(this); + stream.pipe(this); + this.socket = this.stream = stream; + + this._log = stream._log.child({ component: 'http' }); + + // * HTTP/2.0 does not define a way to carry the version identifier that is included in the + // HTTP/1.1 request/status line. Version is always 2.0. + this.httpVersion = '2.0'; + this.httpVersionMajor = 2; + this.httpVersionMinor = 0; + + // * `this.headers` will store the regular headers (and none of the special colon headers) + this.headers = {}; + this.trailers = undefined; + this._lastHeadersSeen = undefined; + + // * Other metadata is filled in when the headers arrive. + stream.once('headers', this._onHeaders.bind(this)); + stream.once('end', this._onEnd.bind(this)); +} +IncomingMessage.prototype = Object.create(PassThrough.prototype, { constructor: { value: IncomingMessage } }); + +// [Request Header Fields](https://tools.ietf.org/html/rfc7540#section-8.1.2.3) +// * `headers` argument: HTTP/2.0 request and response header fields carry information as a series +// of key-value pairs. This includes the target URI for the request, the status code for the +// response, as well as HTTP header fields. +IncomingMessage.prototype._onHeaders = function _onHeaders(headers) { + // * Detects malformed headers + this._validateHeaders(headers); + + // * Store the _regular_ headers in `this.headers` + for (var name in headers) { + if (name[0] !== ':') { + if (name === 'set-cookie' && !Array.isArray(headers[name])) { + this.headers[name] = [headers[name]]; + } else { + this.headers[name] = headers[name]; + } + } + } + + // * The last header block, if it's not the first, will represent the trailers + var self = this; + this.stream.on('headers', function(headers) { + self._lastHeadersSeen = headers; + }); +}; + +IncomingMessage.prototype._onEnd = function _onEnd() { + this.trailers = this._lastHeadersSeen; +}; + +IncomingMessage.prototype.setTimeout = noop; + +IncomingMessage.prototype._checkSpecialHeader = function _checkSpecialHeader(key, value) { + if ((typeof value !== 'string') || (value.length === 0)) { + this._log.error({ key: key, value: value }, 'Invalid or missing special header field'); + this.stream.reset('PROTOCOL_ERROR'); + } + + return value; +}; + +IncomingMessage.prototype._validateHeaders = function _validateHeaders(headers) { + // * An HTTP/2.0 request or response MUST NOT include any of the following header fields: + // Connection, Host, Keep-Alive, Proxy-Connection, Transfer-Encoding, and Upgrade. A server + // MUST treat the presence of any of these header fields as a stream error of type + // PROTOCOL_ERROR. + // If the TE header is present, it's only valid value is 'trailers' + for (var i = 0; i < deprecatedHeaders.length; i++) { + var key = deprecatedHeaders[i]; + if (key in headers || (key === 'te' && headers[key] !== 'trailers')) { + this._log.error({ key: key, value: headers[key] }, 'Deprecated header found'); + this.stream.reset('PROTOCOL_ERROR'); + return; + } + } + + for (var headerName in headers) { + // * Empty header name field is malformed + if (headerName.length <= 1) { + this.stream.reset('PROTOCOL_ERROR'); + return; + } + // * A request or response containing uppercase header name field names MUST be + // treated as malformed (Section 8.1.3.5). Implementations that detect malformed + // requests or responses need to ensure that the stream ends. + if(/[A-Z]/.test(headerName)) { + this.stream.reset('PROTOCOL_ERROR'); + return; + } + } +}; + +// OutgoingMessage class +// --------------------- + +function OutgoingMessage() { + // * This is basically a read-only wrapper for the [Stream](protocol/stream.html) class. + Writable.call(this); + + this._headers = {}; + this._trailers = undefined; + this.headersSent = false; + this.finished = false; + + this.on('finish', this._finish); +} +OutgoingMessage.prototype = Object.create(Writable.prototype, { constructor: { value: OutgoingMessage } }); + +OutgoingMessage.prototype._write = function _write(chunk, encoding, callback) { + if (this.stream) { + this.stream.write(chunk, encoding, callback); + } else { + this.once('socket', this._write.bind(this, chunk, encoding, callback)); + } +}; + +OutgoingMessage.prototype._finish = function _finish() { + if (this.stream) { + if (this._trailers) { + if (this.request) { + this.request.addTrailers(this._trailers); + } else { + this.stream.headers(this._trailers); + } + } + this.finished = true; + this.stream.end(); + } else { + this.once('socket', this._finish.bind(this)); + } +}; + +OutgoingMessage.prototype.setHeader = function setHeader(name, value) { + if (this.headersSent) { + return this.emit('error', new Error('Can\'t set headers after they are sent.')); + } else { + name = name.toLowerCase(); + if (deprecatedHeaders.indexOf(name) !== -1) { + return this.emit('error', new Error('Cannot set deprecated header: ' + name)); + } + this._headers[name] = value; + } +}; + +OutgoingMessage.prototype.removeHeader = function removeHeader(name) { + if (this.headersSent) { + return this.emit('error', new Error('Can\'t remove headers after they are sent.')); + } else { + delete this._headers[name.toLowerCase()]; + } +}; + +OutgoingMessage.prototype.getHeader = function getHeader(name) { + return this._headers[name.toLowerCase()]; +}; + +OutgoingMessage.prototype.addTrailers = function addTrailers(trailers) { + this._trailers = trailers; +}; + +OutgoingMessage.prototype.setTimeout = noop; + +OutgoingMessage.prototype._checkSpecialHeader = IncomingMessage.prototype._checkSpecialHeader; + +// Server side +// =========== + +exports.Server = Server; +exports.IncomingRequest = IncomingRequest; +exports.OutgoingResponse = OutgoingResponse; +exports.ServerResponse = OutgoingResponse; // for API compatibility + +// Forward events `event` on `source` to all listeners on `target`. +// +// Note: The calling context is `source`. +function forwardEvent(event, source, target) { + function forward() { + var listeners = target.listeners(event); + + var n = listeners.length; + + // Special case for `error` event with no listeners. + if (n === 0 && event === 'error') { + var args = [event]; + args.push.apply(args, arguments); + + target.emit.apply(target, args); + return; + } + + for (var i = 0; i < n; ++i) { + listeners[i].apply(source, arguments); + } + } + + source.on(event, forward); + + // A reference to the function is necessary to be able to stop + // forwarding. + return forward; +} + +// Server class +// ------------ + +function Server(options) { + options = util._extend({}, options); + + this._log = (options.log || defaultLogger).child({ component: 'http' }); + this._settings = options.settings; + + var start = this._start.bind(this); + var fallback = this._fallback.bind(this); + + // HTTP2 over TLS (using NPN or ALPN) + if ((options.key && options.cert) || options.pfx) { + this._log.info('Creating HTTP/2 server over TLS'); + this._mode = 'tls'; + options.ALPNProtocols = supportedProtocols; + options.NPNProtocols = supportedProtocols; + options.ciphers = options.ciphers || cipherSuites; + options.honorCipherOrder = (options.honorCipherOrder != false); + this._server = https.createServer(options); + this._originalSocketListeners = this._server.listeners('secureConnection'); + this._server.removeAllListeners('secureConnection'); + this._server.on('secureConnection', function(socket) { + var negotiatedProtocol = socket.alpnProtocol || socket.npnProtocol; + // It's true that the client MUST use SNI, but if it doesn't, we don't care, don't fall back to HTTP/1, + // since if the ALPN negotiation is otherwise successful, the client thinks we speak HTTP/2 but we don't. + if (negotiatedProtocol === protocol.VERSION) { + start(socket); + } else { + fallback(socket); + } + }); + this._server.on('request', this.emit.bind(this, 'request')); + + forwardEvent('error', this._server, this); + forwardEvent('listening', this._server, this); + } + + // HTTP2 over plain TCP + else if (options.plain) { + this._log.info('Creating HTTP/2 server over plain TCP'); + this._mode = 'plain'; + this._server = net.createServer(start); + } + + // HTTP/2 with HTTP/1.1 upgrade + else { + this._log.error('Trying to create HTTP/2 server with Upgrade from HTTP/1.1'); + throw new Error('HTTP1.1 -> HTTP2 upgrade is not yet supported. Please provide TLS keys.'); + } + + this._server.on('close', this.emit.bind(this, 'close')); +} +Server.prototype = Object.create(EventEmitter.prototype, { constructor: { value: Server } }); + +// Starting HTTP/2 +Server.prototype._start = function _start(socket) { + var endpoint = new Endpoint(this._log, 'SERVER', this._settings); + + this._log.info({ e: endpoint, + client: socket.remoteAddress + ':' + socket.remotePort, + SNI: socket.servername + }, 'New incoming HTTP/2 connection'); + + endpoint.pipe(socket).pipe(endpoint); + + var self = this; + endpoint.on('stream', function _onStream(stream) { + var response = new OutgoingResponse(stream); + var request = new IncomingRequest(stream); + + // Some conformance to Node.js Https specs allows to distinguish clients: + request.remoteAddress = socket.remoteAddress; + request.remotePort = socket.remotePort; + request.connection = request.socket = response.socket = socket; + + request.once('ready', self.emit.bind(self, 'request', request, response)); + }); + + endpoint.on('error', this.emit.bind(this, 'clientError')); + socket.on('error', this.emit.bind(this, 'clientError')); + + this.emit('connection', socket, endpoint); +}; + +Server.prototype._fallback = function _fallback(socket) { + var negotiatedProtocol = socket.alpnProtocol || socket.npnProtocol; + + this._log.info({ client: socket.remoteAddress + ':' + socket.remotePort, + protocol: negotiatedProtocol, + SNI: socket.servername + }, 'Falling back to simple HTTPS'); + + for (var i = 0; i < this._originalSocketListeners.length; i++) { + this._originalSocketListeners[i].call(this._server, socket); + } + + this.emit('connection', socket); +}; + +// There are [3 possible signatures][1] of the `listen` function. Every arguments is forwarded to +// the backing TCP or HTTPS server. +// [1]: https://nodejs.org/api/http.html#http_server_listen_port_hostname_backlog_callback +Server.prototype.listen = function listen(port, hostname) { + this._log.info({ on: ((typeof hostname === 'string') ? (hostname + ':' + port) : port) }, + 'Listening for incoming connections'); + this._server.listen.apply(this._server, arguments); + + return this._server; +}; + +Server.prototype.close = function close(callback) { + this._log.info('Closing server'); + this._server.close(callback); +}; + +Server.prototype.setTimeout = function setTimeout(timeout, callback) { + if (this._mode === 'tls') { + this._server.setTimeout(timeout, callback); + } +}; + +Object.defineProperty(Server.prototype, 'timeout', { + get: function getTimeout() { + if (this._mode === 'tls') { + return this._server.timeout; + } else { + return undefined; + } + }, + set: function setTimeout(timeout) { + if (this._mode === 'tls') { + this._server.timeout = timeout; + } + } +}); + +// Overriding `EventEmitter`'s `on(event, listener)` method to forward certain subscriptions to +// `server`.There are events on the `http.Server` class where it makes difference whether someone is +// listening on the event or not. In these cases, we can not simply forward the events from the +// `server` to `this` since that means a listener. Instead, we forward the subscriptions. +Server.prototype.on = function on(event, listener) { + if ((event === 'upgrade') || (event === 'timeout')) { + return this._server.on(event, listener && listener.bind(this)); + } else { + return EventEmitter.prototype.on.call(this, event, listener); + } +}; + +// `addContext` is used to add Server Name Indication contexts +Server.prototype.addContext = function addContext(hostname, credentials) { + if (this._mode === 'tls') { + this._server.addContext(hostname, credentials); + } +}; + +Server.prototype.address = function address() { + return this._server.address() +}; + +function createServerRaw(options, requestListener) { + if (typeof options === 'function') { + requestListener = options; + options = {}; + } + + if (options.pfx || (options.key && options.cert)) { + throw new Error('options.pfx, options.key, and options.cert are nonsensical!'); + } + + options.plain = true; + var server = new Server(options); + + if (requestListener) { + server.on('request', requestListener); + } + + return server; +} + +function createServerTLS(options, requestListener) { + if (typeof options === 'function') { + throw new Error('options are required!'); + } + if (!options.pfx && !(options.key && options.cert)) { + throw new Error('options.pfx or options.key and options.cert are required!'); + } + options.plain = false; + + var server = new Server(options); + + if (requestListener) { + server.on('request', requestListener); + } + + return server; +} + +// Exposed main interfaces for HTTPS connections (the default) +exports.https = {}; +exports.createServer = exports.https.createServer = createServerTLS; +exports.request = exports.https.request = requestTLS; +exports.get = exports.https.get = getTLS; + +// Exposed main interfaces for raw TCP connections (not recommended) +exports.raw = {}; +exports.raw.createServer = createServerRaw; +exports.raw.request = requestRaw; +exports.raw.get = getRaw; + +// Exposed main interfaces for HTTP plaintext upgrade connections (not implemented) +function notImplemented() { + throw new Error('HTTP UPGRADE is not implemented!'); +} + +exports.http = {}; +exports.http.createServer = exports.http.request = exports.http.get = notImplemented; + +// IncomingRequest class +// --------------------- + +function IncomingRequest(stream) { + IncomingMessage.call(this, stream); +} +IncomingRequest.prototype = Object.create(IncomingMessage.prototype, { constructor: { value: IncomingRequest } }); + +// [Request Header Fields](https://tools.ietf.org/html/rfc7540#section-8.1.2.3) +// * `headers` argument: HTTP/2.0 request and response header fields carry information as a series +// of key-value pairs. This includes the target URI for the request, the status code for the +// response, as well as HTTP header fields. +IncomingRequest.prototype._onHeaders = function _onHeaders(headers) { + // * The ":method" header field includes the HTTP method + // * The ":scheme" header field includes the scheme portion of the target URI + // * The ":authority" header field includes the authority portion of the target URI + // * The ":path" header field includes the path and query parts of the target URI. + // This field MUST NOT be empty; URIs that do not contain a path component MUST include a value + // of '/', unless the request is an OPTIONS request for '*', in which case the ":path" header + // field MUST include '*'. + // * All HTTP/2.0 requests MUST include exactly one valid value for all of these header fields. A + // server MUST treat the absence of any of these header fields, presence of multiple values, or + // an invalid value as a stream error of type PROTOCOL_ERROR. + this.method = this._checkSpecialHeader(':method' , headers[':method']); + this.scheme = this._checkSpecialHeader(':scheme' , headers[':scheme']); + this.host = this._checkSpecialHeader(':authority', headers[':authority'] ); + this.url = this._checkSpecialHeader(':path' , headers[':path'] ); + if (!this.method || !this.scheme || !this.host || !this.url) { + // This is invalid, and we've sent a RST_STREAM, so don't continue processing + return; + } + + // * Host header is included in the headers object for backwards compatibility. + this.headers.host = this.host; + + // * Handling regular headers. + IncomingMessage.prototype._onHeaders.call(this, headers); + + // * Signaling that the headers arrived. + this._log.info({ method: this.method, scheme: this.scheme, host: this.host, + path: this.url, headers: this.headers }, 'Incoming request'); + this.emit('ready'); +}; + +// OutgoingResponse class +// ---------------------- + +function OutgoingResponse(stream) { + OutgoingMessage.call(this); + + this._log = stream._log.child({ component: 'http' }); + + this.stream = stream; + this.statusCode = 200; + this.sendDate = true; + + this.stream.once('headers', this._onRequestHeaders.bind(this)); +} +OutgoingResponse.prototype = Object.create(OutgoingMessage.prototype, { constructor: { value: OutgoingResponse } }); + +OutgoingResponse.prototype.writeHead = function writeHead(statusCode, reasonPhrase, headers) { + if (this.headersSent) { + return; + } + + if (typeof reasonPhrase === 'string') { + this._log.warn('Reason phrase argument was present but ignored by the writeHead method'); + } else { + headers = reasonPhrase; + } + + for (var name in headers) { + this.setHeader(name, headers[name]); + } + headers = this._headers; + + if (this.sendDate && !('date' in this._headers)) { + headers.date = (new Date()).toUTCString(); + } + + this._log.info({ status: statusCode, headers: this._headers }, 'Sending server response'); + + headers[':status'] = this.statusCode = statusCode; + + this.stream.headers(headers); + this.headersSent = true; +}; + +OutgoingResponse.prototype._implicitHeaders = function _implicitHeaders() { + if (!this.headersSent) { + this.writeHead(this.statusCode); + } +}; + +OutgoingResponse.prototype._implicitHeader = function() { + this._implicitHeaders(); +}; + +OutgoingResponse.prototype.write = function write() { + this._implicitHeaders(); + return OutgoingMessage.prototype.write.apply(this, arguments); +}; + +OutgoingResponse.prototype.end = function end() { + this.finshed = true; + this._implicitHeaders(); + return OutgoingMessage.prototype.end.apply(this, arguments); +}; + +OutgoingResponse.prototype._onRequestHeaders = function _onRequestHeaders(headers) { + this._requestHeaders = headers; +}; + +OutgoingResponse.prototype.push = function push(options) { + if (typeof options === 'string') { + options = url.parse(options); + } + + if (!options.path) { + throw new Error('`path` option is mandatory.'); + } + + var promise = util._extend({ + ':method': (options.method || 'GET').toUpperCase(), + ':scheme': (options.protocol && options.protocol.slice(0, -1)) || this._requestHeaders[':scheme'], + ':authority': options.hostname || options.host || this._requestHeaders[':authority'], + ':path': options.path + }, options.headers); + + this._log.info({ method: promise[':method'], scheme: promise[':scheme'], + authority: promise[':authority'], path: promise[':path'], + headers: options.headers }, 'Promising push stream'); + + var pushStream = this.stream.promise(promise); + + return new OutgoingResponse(pushStream); +}; + +OutgoingResponse.prototype.altsvc = function altsvc(host, port, protocolID, maxAge, origin) { + if (origin === undefined) { + origin = ""; + } + this.stream.altsvc(host, port, protocolID, maxAge, origin); +}; + +// Overriding `EventEmitter`'s `on(event, listener)` method to forward certain subscriptions to +// `request`. See `Server.prototype.on` for explanation. +OutgoingResponse.prototype.on = function on(event, listener) { + if (this.request && (event === 'timeout')) { + this.request.on(event, listener && listener.bind(this)); + } else { + OutgoingMessage.prototype.on.call(this, event, listener); + } +}; + +// Client side +// =========== + +exports.ClientRequest = OutgoingRequest; // for API compatibility +exports.OutgoingRequest = OutgoingRequest; +exports.IncomingResponse = IncomingResponse; +exports.Agent = Agent; +exports.globalAgent = undefined; + +function requestRaw(options, callback) { + if (typeof options === "string") { + options = url.parse(options); + } + options.plain = true; + if (options.protocol && options.protocol !== "http:") { + throw new Error('This interface only supports http-schemed URLs'); + } + if (options.agent && typeof(options.agent.request) === 'function') { + var agentOptions = util._extend({}, options); + delete agentOptions.agent; + return options.agent.request(agentOptions, callback); + } + return exports.globalAgent.request(options, callback); +} + +function requestTLS(options, callback) { + if (typeof options === "string") { + options = url.parse(options); + } + options.plain = false; + if (options.protocol && options.protocol !== "https:") { + throw new Error('This interface only supports https-schemed URLs'); + } + if (options.agent && typeof(options.agent.request) === 'function') { + var agentOptions = util._extend({}, options); + delete agentOptions.agent; + return options.agent.request(agentOptions, callback); + } + return exports.globalAgent.request(options, callback); +} + +function getRaw(options, callback) { + if (typeof options === "string") { + options = url.parse(options); + } + options.plain = true; + if (options.protocol && options.protocol !== "http:") { + throw new Error('This interface only supports http-schemed URLs'); + } + if (options.agent && typeof(options.agent.get) === 'function') { + var agentOptions = util._extend({}, options); + delete agentOptions.agent; + return options.agent.get(agentOptions, callback); + } + return exports.globalAgent.get(options, callback); +} + +function getTLS(options, callback) { + if (typeof options === "string") { + options = url.parse(options); + } + options.plain = false; + if (options.protocol && options.protocol !== "https:") { + throw new Error('This interface only supports https-schemed URLs'); + } + if (options.agent && typeof(options.agent.get) === 'function') { + var agentOptions = util._extend({}, options); + delete agentOptions.agent; + return options.agent.get(agentOptions, callback); + } + return exports.globalAgent.get(options, callback); +} + +// Agent class +// ----------- + +function Agent(options) { + EventEmitter.call(this); + this.setMaxListeners(0); + + options = util._extend({}, options); + + this._settings = options.settings; + this._log = (options.log || defaultLogger).child({ component: 'http' }); + this.endpoints = {}; + + // * Using an own HTTPS agent, because the global agent does not look at `NPN/ALPNProtocols` when + // generating the key identifying the connection, so we may get useless non-negotiated TLS + // channels even if we ask for a negotiated one. This agent will contain only negotiated + // channels. + options.ALPNProtocols = supportedProtocols; + options.NPNProtocols = supportedProtocols; + this._httpsAgent = new https.Agent(options); + + this.sockets = this._httpsAgent.sockets; + this.requests = this._httpsAgent.requests; +} +Agent.prototype = Object.create(EventEmitter.prototype, { constructor: { value: Agent } }); + +Agent.prototype.request = function request(options, callback) { + if (typeof options === 'string') { + options = url.parse(options); + } else { + options = util._extend({}, options); + } + + options.method = (options.method || 'GET').toUpperCase(); + options.protocol = options.protocol || 'https:'; + options.host = options.hostname || options.host || 'localhost'; + options.port = options.port || 443; + options.path = options.path || '/'; + + if (!options.plain && options.protocol === 'http:') { + this._log.error('Trying to negotiate client request with Upgrade from HTTP/1.1'); + this.emit('error', new Error('HTTP1.1 -> HTTP2 upgrade is not yet supported.')); + } + + var request = new OutgoingRequest(this._log); + + if (callback) { + request.on('response', callback); + } + + var key = [ + !!options.plain, + options.host, + options.port + ].join(':'); + var self = this; + + // * There's an existing HTTP/2 connection to this host + if (key in this.endpoints) { + var endpoint = this.endpoints[key]; + request._start(endpoint.createStream(), options); + } + + // * HTTP/2 over plain TCP + else if (options.plain) { + endpoint = new Endpoint(this._log, 'CLIENT', this._settings); + endpoint.socket = net.connect({ + host: options.host, + port: options.port, + localAddress: options.localAddress + }); + + endpoint.socket.on('error', function (error) { + self._log.error('Socket error: ' + error.toString()); + request.emit('error', error); + }); + + endpoint.on('error', function(error){ + self._log.error('Connection error: ' + error.toString()); + request.emit('error', error); + }); + + this.endpoints[key] = endpoint; + endpoint.pipe(endpoint.socket).pipe(endpoint); + request._start(endpoint.createStream(), options); + } + + // * HTTP/2 over TLS negotiated using NPN or ALPN, or fallback to HTTPS1 + else { + var started = false; + var createAgent = hasAgentOptions(options); + options.ALPNProtocols = supportedProtocols; + options.NPNProtocols = supportedProtocols; + options.servername = options.host; // Server Name Indication + options.ciphers = options.ciphers || cipherSuites; + if (createAgent) { + options.agent = new https.Agent(options); + } else if (options.agent == null) { + options.agent = this._httpsAgent; + } + var httpsRequest = https.request(options); + + httpsRequest.on('error', function (error) { + self._log.error('Socket error: ' + error.toString()); + self.removeAllListeners(key); + request.emit('error', error); + }); + + httpsRequest.on('socket', function(socket) { + var negotiatedProtocol = socket.alpnProtocol || socket.npnProtocol; + if (negotiatedProtocol != null) { // null in >=0.11.0, undefined in <0.11.0 + negotiated(); + } else { + socket.on('secureConnect', negotiated); + } + }); + + function negotiated() { + var endpoint; + var negotiatedProtocol = httpsRequest.socket.alpnProtocol || httpsRequest.socket.npnProtocol; + if (negotiatedProtocol === protocol.VERSION) { + httpsRequest.socket.emit('agentRemove'); + unbundleSocket(httpsRequest.socket); + endpoint = new Endpoint(self._log, 'CLIENT', self._settings); + endpoint.socket = httpsRequest.socket; + endpoint.pipe(endpoint.socket).pipe(endpoint); + } + if (started) { + // ** In the meantime, an other connection was made to the same host... + if (endpoint) { + // *** and it turned out to be HTTP2 and the request was multiplexed on that one, so we should close this one + endpoint.close(); + } + // *** otherwise, the fallback to HTTPS1 is already done. + } else { + if (endpoint) { + self._log.info({ e: endpoint, server: options.host + ':' + options.port }, + 'New outgoing HTTP/2 connection'); + self.endpoints[key] = endpoint; + self.emit(key, endpoint); + } else { + self.emit(key, undefined); + } + } + } + + this.once(key, function(endpoint) { + started = true; + if (endpoint) { + request._start(endpoint.createStream(), options); + } else { + request._fallback(httpsRequest); + } + }); + } + + return request; +}; + +Agent.prototype.get = function get(options, callback) { + var request = this.request(options, callback); + request.end(); + return request; +}; + +Agent.prototype.destroy = function(error) { + if (this._httpsAgent) { + this._httpsAgent.destroy(); + } + for (var key in this.endpoints) { + this.endpoints[key].close(error); + } +}; + +function unbundleSocket(socket) { + socket.removeAllListeners('data'); + socket.removeAllListeners('end'); + socket.removeAllListeners('readable'); + socket.removeAllListeners('close'); + socket.removeAllListeners('error'); + socket.unpipe(); + delete socket.ondata; + delete socket.onend; +} + +function hasAgentOptions(options) { + return options.pfx != null || + options.key != null || + options.passphrase != null || + options.cert != null || + options.ca != null || + options.ciphers != null || + options.rejectUnauthorized != null || + options.secureProtocol != null; +} + +Object.defineProperty(Agent.prototype, 'maxSockets', { + get: function getMaxSockets() { + return this._httpsAgent.maxSockets; + }, + set: function setMaxSockets(value) { + this._httpsAgent.maxSockets = value; + } +}); + +exports.globalAgent = new Agent(); + +// OutgoingRequest class +// --------------------- + +function OutgoingRequest() { + OutgoingMessage.call(this); + + this._log = undefined; + + this.stream = undefined; +} +OutgoingRequest.prototype = Object.create(OutgoingMessage.prototype, { constructor: { value: OutgoingRequest } }); + +OutgoingRequest.prototype._start = function _start(stream, options) { + this.stream = stream; + this.options = options; + + this._log = stream._log.child({ component: 'http' }); + + for (var key in options.headers) { + this.setHeader(key, options.headers[key]); + } + var headers = this._headers; + delete headers.host; + + if (options.auth) { + headers.authorization = 'Basic ' + new Buffer(options.auth).toString('base64'); + } + + headers[':scheme'] = options.protocol.slice(0, -1); + headers[':method'] = options.method; + headers[':authority'] = options.host; + headers[':path'] = options.path; + + this._log.info({ scheme: headers[':scheme'], method: headers[':method'], + authority: headers[':authority'], path: headers[':path'], + headers: (options.headers || {}) }, 'Sending request'); + this.stream.headers(headers); + this.headersSent = true; + + this.emit('socket', this.stream); + var response = new IncomingResponse(this.stream); + response.req = this; + response.once('ready', this.emit.bind(this, 'response', response)); + + this.stream.on('promise', this._onPromise.bind(this)); +}; + +OutgoingRequest.prototype._fallback = function _fallback(request) { + request.on('response', this.emit.bind(this, 'response')); + this.stream = this.request = request; + this.emit('socket', this.socket); +}; + +OutgoingRequest.prototype.setPriority = function setPriority(priority) { + if (this.stream) { + this.stream.priority(priority); + } else { + this.once('socket', this.setPriority.bind(this, priority)); + } +}; + +// Overriding `EventEmitter`'s `on(event, listener)` method to forward certain subscriptions to +// `request`. See `Server.prototype.on` for explanation. +OutgoingRequest.prototype.on = function on(event, listener) { + if (this.request && (event === 'upgrade')) { + this.request.on(event, listener && listener.bind(this)); + } else { + OutgoingMessage.prototype.on.call(this, event, listener); + } +}; + +// Methods only in fallback mode +OutgoingRequest.prototype.setNoDelay = function setNoDelay(noDelay) { + if (this.request) { + this.request.setNoDelay(noDelay); + } else if (!this.stream) { + this.on('socket', this.setNoDelay.bind(this, noDelay)); + } +}; + +OutgoingRequest.prototype.setSocketKeepAlive = function setSocketKeepAlive(enable, initialDelay) { + if (this.request) { + this.request.setSocketKeepAlive(enable, initialDelay); + } else if (!this.stream) { + this.on('socket', this.setSocketKeepAlive.bind(this, enable, initialDelay)); + } +}; + +OutgoingRequest.prototype.setTimeout = function setTimeout(timeout, callback) { + if (this.request) { + this.request.setTimeout(timeout, callback); + } else if (!this.stream) { + this.on('socket', this.setTimeout.bind(this, timeout, callback)); + } +}; + +// Aborting the request +OutgoingRequest.prototype.abort = function abort() { + if (this.request) { + this.request.abort(); + } else if (this.stream) { + this.stream.reset('CANCEL'); + } else { + this.on('socket', this.abort.bind(this)); + } +}; + +// Receiving push promises +OutgoingRequest.prototype._onPromise = function _onPromise(stream, headers) { + this._log.info({ push_stream: stream.id }, 'Receiving push promise'); + + var promise = new IncomingPromise(stream, headers); + + if (this.listeners('push').length > 0) { + this.emit('push', promise); + } else { + promise.cancel(); + } +}; + +// IncomingResponse class +// ---------------------- + +function IncomingResponse(stream) { + IncomingMessage.call(this, stream); +} +IncomingResponse.prototype = Object.create(IncomingMessage.prototype, { constructor: { value: IncomingResponse } }); + +// [Response Header Fields](https://tools.ietf.org/html/rfc7540#section-8.1.2.4) +// * `headers` argument: HTTP/2.0 request and response header fields carry information as a series +// of key-value pairs. This includes the target URI for the request, the status code for the +// response, as well as HTTP header fields. +IncomingResponse.prototype._onHeaders = function _onHeaders(headers) { + // * A single ":status" header field is defined that carries the HTTP status code field. This + // header field MUST be included in all responses. + // * A client MUST treat the absence of the ":status" header field, the presence of multiple + // values, or an invalid value as a stream error of type PROTOCOL_ERROR. + // Note: currently, we do not enforce it strictly: we accept any format, and parse it as int + // * HTTP/2.0 does not define a way to carry the reason phrase that is included in an HTTP/1.1 + // status line. + this.statusCode = parseInt(this._checkSpecialHeader(':status', headers[':status'])); + + // * Handling regular headers. + IncomingMessage.prototype._onHeaders.call(this, headers); + + // * Signaling that the headers arrived. + this._log.info({ status: this.statusCode, headers: this.headers}, 'Incoming response'); + this.emit('ready'); +}; + +// IncomingPromise class +// ------------------------- + +function IncomingPromise(responseStream, promiseHeaders) { + var stream = new Readable(); + stream._read = noop; + stream.push(null); + stream._log = responseStream._log; + + IncomingRequest.call(this, stream); + + this._onHeaders(promiseHeaders); + + this._responseStream = responseStream; + + var response = new IncomingResponse(this._responseStream); + response.once('ready', this.emit.bind(this, 'response', response)); + + this.stream.on('promise', this._onPromise.bind(this)); +} +IncomingPromise.prototype = Object.create(IncomingRequest.prototype, { constructor: { value: IncomingPromise } }); + +IncomingPromise.prototype.cancel = function cancel() { + this._responseStream.reset('CANCEL'); +}; + +IncomingPromise.prototype.setPriority = function setPriority(priority) { + this._responseStream.priority(priority); +}; + +IncomingPromise.prototype._onPromise = OutgoingRequest.prototype._onPromise; diff --git a/testing/xpcshell/node-http2/lib/index.js b/testing/xpcshell/node-http2/lib/index.js new file mode 100644 index 000000000..c67883def --- /dev/null +++ b/testing/xpcshell/node-http2/lib/index.js @@ -0,0 +1,52 @@ +// [node-http2][homepage] is an [HTTP/2][http2] implementation for [node.js][node]. +// +// The core of the protocol is implemented in the protocol sub-directory. This directory provides +// two important features on top of the protocol: +// +// * Implementation of different negotiation schemes that can be used to start a HTTP2 connection. +// These include TLS ALPN, Upgrade and Plain TCP. +// +// * Providing an API very similar to the standard node.js [HTTPS module API][node-https] +// (which is in turn very similar to the [HTTP module API][node-http]). +// +// [homepage]: https://github.com/molnarg/node-http2 +// [http2]: https://tools.ietf.org/html/rfc7540 +// [node]: https://nodejs.org/ +// [node-https]: https://nodejs.org/api/https.html +// [node-http]: https://nodejs.org/api/http.html + +module.exports = require('./http'); + +/* + HTTP API + + | ^ + | | + +-------------|------------|------------------------------------------------------+ + | | | Server/Agent | + | v | | + | +----------+ +----------+ | + | | Outgoing | | Incoming | | + | | req/res. | | req/res. | | + | +----------+ +----------+ | + | | ^ | + | | | | + | +---------|------------|-------------------------------------+ +----- | + | | | | Endpoint | | | + | | | | | | | + | | v | | | | + | | +-----------------------+ +-------------------- | | | + | | | Stream | | Stream ... | | | + | | +-----------------------+ +-------------------- | | | + | | | | | + | +------------------------------------------------------------+ +----- | + | | | | + | | | | + | v | | + | +------------------------------------------------------------+ +----- | + | | TCP stream | | ... | + | +------------------------------------------------------------+ +----- | + | | + +---------------------------------------------------------------------------------+ + +*/ diff --git a/testing/xpcshell/node-http2/lib/protocol/compressor.js b/testing/xpcshell/node-http2/lib/protocol/compressor.js new file mode 100644 index 000000000..3923a9107 --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/compressor.js @@ -0,0 +1,1366 @@ +// The implementation of the [HTTP/2 Header Compression][http2-compression] spec is separated from +// the 'integration' part which handles HEADERS and PUSH_PROMISE frames. The compression itself is +// implemented in the first part of the file, and consists of three classes: `HeaderTable`, +// `HeaderSetDecompressor` and `HeaderSetCompressor`. The two latter classes are +// [Transform Stream][node-transform] subclasses that operate in [object mode][node-objectmode]. +// These transform chunks of binary data into `[name, value]` pairs and vice versa, and store their +// state in `HeaderTable` instances. +// +// The 'integration' part is also implemented by two [Transform Stream][node-transform] subclasses +// that operate in [object mode][node-objectmode]: the `Compressor` and the `Decompressor`. These +// provide a layer between the [framer](framer.html) and the +// [connection handling component](connection.html). +// +// [node-transform]: https://nodejs.org/api/stream.html#stream_class_stream_transform +// [node-objectmode]: https://nodejs.org/api/stream.html#stream_new_stream_readable_options +// [http2-compression]: https://tools.ietf.org/html/rfc7541 + +exports.HeaderTable = HeaderTable; +exports.HuffmanTable = HuffmanTable; +exports.HeaderSetCompressor = HeaderSetCompressor; +exports.HeaderSetDecompressor = HeaderSetDecompressor; +exports.Compressor = Compressor; +exports.Decompressor = Decompressor; + +var TransformStream = require('stream').Transform; +var assert = require('assert'); +var util = require('util'); + +// Header compression +// ================== + +// The HeaderTable class +// --------------------- + +// The [Header Table] is a component used to associate headers to index values. It is basically an +// ordered list of `[name, value]` pairs, so it's implemented as a subclass of `Array`. +// In this implementation, the Header Table and the [Static Table] are handled as a single table. +// [Header Table]: https://tools.ietf.org/html/rfc7541#section-2.3.2 +// [Static Table]: https://tools.ietf.org/html/rfc7541#section-2.3.1 +function HeaderTable(log, limit) { + var self = HeaderTable.staticTable.map(entryFromPair); + self._log = log; + self._limit = limit || DEFAULT_HEADER_TABLE_LIMIT; + self._staticLength = self.length; + self._size = 0; + self._enforceLimit = HeaderTable.prototype._enforceLimit; + self.add = HeaderTable.prototype.add; + self.setSizeLimit = HeaderTable.prototype.setSizeLimit; + return self; +} + +function entryFromPair(pair) { + var entry = pair.slice(); + entry._size = size(entry); + return entry; +} + +// The encoder decides how to update the header table and as such can control how much memory is +// used by the header table. To limit the memory requirements on the decoder side, the header table +// size is bounded. +// +// * The default header table size limit is 4096 bytes. +// * The size of an entry is defined as follows: the size of an entry is the sum of its name's +// length in bytes, of its value's length in bytes and of 32 bytes. +// * The size of a header table is the sum of the size of its entries. +var DEFAULT_HEADER_TABLE_LIMIT = 4096; + +function size(entry) { + return (new Buffer(entry[0] + entry[1], 'utf8')).length + 32; +} + +// The `add(index, entry)` can be used to [manage the header table][tablemgmt]: +// [tablemgmt]: https://tools.ietf.org/html/rfc7541#section-4 +// +// * it pushes the new `entry` at the beggining of the table +// * before doing such a modification, it has to be ensured that the header table size will stay +// lower than or equal to the header table size limit. To achieve this, entries are evicted from +// the end of the header table until the size of the header table is less than or equal to +// `(this._limit - entry.size)`, or until the table is empty. +// +// <---------- Index Address Space ----------> +// <-- Static Table --> <-- Header Table --> +// +---+-----------+---+ +---+-----------+---+ +// | 0 | ... | k | |k+1| ... | n | +// +---+-----------+---+ +---+-----------+---+ +// ^ | +// | V +// Insertion Point Drop Point + +HeaderTable.prototype._enforceLimit = function _enforceLimit(limit) { + var droppedEntries = []; + while ((this._size > 0) && (this._size > limit)) { + var dropped = this.pop(); + this._size -= dropped._size; + droppedEntries.unshift(dropped); + } + return droppedEntries; +}; + +HeaderTable.prototype.add = function(entry) { + var limit = this._limit - entry._size; + var droppedEntries = this._enforceLimit(limit); + + if (this._size <= limit) { + this.splice(this._staticLength, 0, entry); + this._size += entry._size; + } + + return droppedEntries; +}; + +// The table size limit can be changed externally. In this case, the same eviction algorithm is used +HeaderTable.prototype.setSizeLimit = function setSizeLimit(limit) { + this._limit = limit; + this._enforceLimit(this._limit); +}; + +// [The Static Table](https://tools.ietf.org/html/rfc7541#section-2.3.1) +// ------------------ + +// The table is generated with feeding the table from the spec to the following sed command: +// +// sed -re "s/\s*\| [0-9]+\s*\| ([^ ]*)/ [ '\1'/g" -e "s/\|\s([^ ]*)/, '\1'/g" -e 's/ \|/],/g' + +HeaderTable.staticTable = [ + [ ':authority' , '' ], + [ ':method' , 'GET' ], + [ ':method' , 'POST' ], + [ ':path' , '/' ], + [ ':path' , '/index.html' ], + [ ':scheme' , 'http' ], + [ ':scheme' , 'https' ], + [ ':status' , '200' ], + [ ':status' , '204' ], + [ ':status' , '206' ], + [ ':status' , '304' ], + [ ':status' , '400' ], + [ ':status' , '404' ], + [ ':status' , '500' ], + [ 'accept-charset' , '' ], + [ 'accept-encoding' , 'gzip, deflate'], + [ 'accept-language' , '' ], + [ 'accept-ranges' , '' ], + [ 'accept' , '' ], + [ 'access-control-allow-origin' , '' ], + [ 'age' , '' ], + [ 'allow' , '' ], + [ 'authorization' , '' ], + [ 'cache-control' , '' ], + [ 'content-disposition' , '' ], + [ 'content-encoding' , '' ], + [ 'content-language' , '' ], + [ 'content-length' , '' ], + [ 'content-location' , '' ], + [ 'content-range' , '' ], + [ 'content-type' , '' ], + [ 'cookie' , '' ], + [ 'date' , '' ], + [ 'etag' , '' ], + [ 'expect' , '' ], + [ 'expires' , '' ], + [ 'from' , '' ], + [ 'host' , '' ], + [ 'if-match' , '' ], + [ 'if-modified-since' , '' ], + [ 'if-none-match' , '' ], + [ 'if-range' , '' ], + [ 'if-unmodified-since' , '' ], + [ 'last-modified' , '' ], + [ 'link' , '' ], + [ 'location' , '' ], + [ 'max-forwards' , '' ], + [ 'proxy-authenticate' , '' ], + [ 'proxy-authorization' , '' ], + [ 'range' , '' ], + [ 'referer' , '' ], + [ 'refresh' , '' ], + [ 'retry-after' , '' ], + [ 'server' , '' ], + [ 'set-cookie' , '' ], + [ 'strict-transport-security' , '' ], + [ 'transfer-encoding' , '' ], + [ 'user-agent' , '' ], + [ 'vary' , '' ], + [ 'via' , '' ], + [ 'www-authenticate' , '' ] +]; + +// The HeaderSetDecompressor class +// ------------------------------- + +// A `HeaderSetDecompressor` instance is a transform stream that can be used to *decompress a +// single header set*. Its input is a stream of binary data chunks and its output is a stream of +// `[name, value]` pairs. +// +// Currently, it is not a proper streaming decompressor implementation, since it buffer its input +// until the end os the stream, and then processes the whole header block at once. + +util.inherits(HeaderSetDecompressor, TransformStream); +function HeaderSetDecompressor(log, table) { + TransformStream.call(this, { objectMode: true }); + + this._log = log.child({ component: 'compressor' }); + this._table = table; + this._chunks = []; +} + +// `_transform` is the implementation of the [corresponding virtual function][_transform] of the +// TransformStream class. It collects the data chunks for later processing. +// [_transform]: https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback +HeaderSetDecompressor.prototype._transform = function _transform(chunk, encoding, callback) { + this._chunks.push(chunk); + callback(); +}; + +// `execute(rep)` executes the given [header representation][representation]. +// [representation]: https://tools.ietf.org/html/rfc7541#section-6 + +// The *JavaScript object representation* of a header representation: +// +// { +// name: String || Integer, // string literal or index +// value: String || Integer, // string literal or index +// index: Boolean // with or without indexing +// } +// +// *Important:* to ease the indexing of the header table, indexes start at 0 instead of 1. +// +// Examples: +// +// Indexed: +// { name: 2 , value: 2 , index: false } +// Literal: +// { name: 2 , value: 'X', index: false } // without indexing +// { name: 2 , value: 'Y', index: true } // with indexing +// { name: 'A', value: 'Z', index: true } // with indexing, literal name +HeaderSetDecompressor.prototype._execute = function _execute(rep) { + this._log.trace({ key: rep.name, value: rep.value, index: rep.index }, + 'Executing header representation'); + + var entry, pair; + + if (rep.contextUpdate) { + this._table.setSizeLimit(rep.newMaxSize); + } + + // * An _indexed representation_ entails the following actions: + // * The header field corresponding to the referenced entry is emitted + else if (typeof rep.value === 'number') { + var index = rep.value; + entry = this._table[index]; + + pair = entry.slice(); + this.push(pair); + } + + // * A _literal representation_ that is _not added_ to the header table entails the following + // action: + // * The header is emitted. + // * A _literal representation_ that is _added_ to the header table entails the following further + // actions: + // * The header is added to the header table. + // * The header is emitted. + else { + if (typeof rep.name === 'number') { + pair = [this._table[rep.name][0], rep.value]; + } else { + pair = [rep.name, rep.value]; + } + + if (rep.index) { + entry = entryFromPair(pair); + this._table.add(entry); + } + + this.push(pair); + } +}; + +// `_flush` is the implementation of the [corresponding virtual function][_flush] of the +// TransformStream class. The whole decompressing process is done in `_flush`. It gets called when +// the input stream is over. +// [_flush]: https://nodejs.org/api/stream.html#stream_transform_flush_callback +HeaderSetDecompressor.prototype._flush = function _flush(callback) { + var buffer = concat(this._chunks); + + // * processes the header representations + buffer.cursor = 0; + while (buffer.cursor < buffer.length) { + this._execute(HeaderSetDecompressor.header(buffer)); + } + + callback(); +}; + +// The HeaderSetCompressor class +// ----------------------------- + +// A `HeaderSetCompressor` instance is a transform stream that can be used to *compress a single +// header set*. Its input is a stream of `[name, value]` pairs and its output is a stream of +// binary data chunks. +// +// It is a real streaming compressor, since it does not wait until the header set is complete. +// +// The compression algorithm is (intentionally) not specified by the spec. Therefore, the current +// compression algorithm can probably be improved in the future. + +util.inherits(HeaderSetCompressor, TransformStream); +function HeaderSetCompressor(log, table) { + TransformStream.call(this, { objectMode: true }); + + this._log = log.child({ component: 'compressor' }); + this._table = table; + this.push = TransformStream.prototype.push.bind(this); +} + +HeaderSetCompressor.prototype.send = function send(rep) { + this._log.trace({ key: rep.name, value: rep.value, index: rep.index }, + 'Emitting header representation'); + + if (!rep.chunks) { + rep.chunks = HeaderSetCompressor.header(rep); + } + rep.chunks.forEach(this.push); +}; + +// `_transform` is the implementation of the [corresponding virtual function][_transform] of the +// TransformStream class. It processes the input headers one by one: +// [_transform]: https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback +HeaderSetCompressor.prototype._transform = function _transform(pair, encoding, callback) { + var name = pair[0].toLowerCase(); + var value = pair[1]; + var entry, rep; + + // * tries to find full (name, value) or name match in the header table + var nameMatch = -1, fullMatch = -1; + for (var droppedIndex = 0; droppedIndex < this._table.length; droppedIndex++) { + entry = this._table[droppedIndex]; + if (entry[0] === name) { + if (entry[1] === value) { + fullMatch = droppedIndex; + break; + } else if (nameMatch === -1) { + nameMatch = droppedIndex; + } + } + } + + var mustNeverIndex = ((name === 'cookie' && value.length < 20) || + (name === 'set-cookie' && value.length < 20) || + name === 'authorization'); + + if (fullMatch !== -1 && !mustNeverIndex) { + this.send({ name: fullMatch, value: fullMatch, index: false }); + } + + // * otherwise, it will be a literal representation (with a name index if there's a name match) + else { + entry = entryFromPair(pair); + + var indexing = (entry._size < this._table._limit / 2) && !mustNeverIndex; + + if (indexing) { + this._table.add(entry); + } + + this.send({ name: (nameMatch !== -1) ? nameMatch : name, value: value, index: indexing, mustNeverIndex: mustNeverIndex, contextUpdate: false }); + } + + callback(); +}; + +// `_flush` is the implementation of the [corresponding virtual function][_flush] of the +// TransformStream class. It gets called when there's no more header to compress. The final step: +// [_flush]: https://nodejs.org/api/stream.html#stream_transform_flush_callback +HeaderSetCompressor.prototype._flush = function _flush(callback) { + callback(); +}; + +// [Detailed Format](https://tools.ietf.org/html/rfc7541#section-5) +// ----------------- + +// ### Integer representation ### +// +// The algorithm to represent an integer I is as follows: +// +// 1. If I < 2^N - 1, encode I on N bits +// 2. Else, encode 2^N - 1 on N bits and do the following steps: +// 1. Set I to (I - (2^N - 1)) and Q to 1 +// 2. While Q > 0 +// 1. Compute Q and R, quotient and remainder of I divided by 2^7 +// 2. If Q is strictly greater than 0, write one 1 bit; otherwise, write one 0 bit +// 3. Encode R on the next 7 bits +// 4. I = Q + +HeaderSetCompressor.integer = function writeInteger(I, N) { + var limit = Math.pow(2,N) - 1; + if (I < limit) { + return [new Buffer([I])]; + } + + var bytes = []; + if (N !== 0) { + bytes.push(limit); + } + I -= limit; + + var Q = 1, R; + while (Q > 0) { + Q = Math.floor(I / 128); + R = I % 128; + + if (Q > 0) { + R += 128; + } + bytes.push(R); + + I = Q; + } + + return [new Buffer(bytes)]; +}; + +// The inverse algorithm: +// +// 1. Set I to the number coded on the lower N bits of the first byte +// 2. If I is smaller than 2^N - 1 then return I +// 2. Else the number is encoded on more than one byte, so do the following steps: +// 1. Set M to 0 +// 2. While returning with I +// 1. Let B be the next byte (the first byte if N is 0) +// 2. Read out the lower 7 bits of B and multiply it with 2^M +// 3. Increase I with this number +// 4. Increase M by 7 +// 5. Return I if the most significant bit of B is 0 + +HeaderSetDecompressor.integer = function readInteger(buffer, N) { + var limit = Math.pow(2,N) - 1; + + var I = buffer[buffer.cursor] & limit; + if (N !== 0) { + buffer.cursor += 1; + } + + if (I === limit) { + var M = 0; + do { + I += (buffer[buffer.cursor] & 127) << M; + M += 7; + buffer.cursor += 1; + } while (buffer[buffer.cursor - 1] & 128); + } + + return I; +}; + +// ### Huffman Encoding ### + +function HuffmanTable(table) { + function createTree(codes, position) { + if (codes.length === 1) { + return [table.indexOf(codes[0])]; + } + + else { + position = position || 0; + var zero = []; + var one = []; + for (var i = 0; i < codes.length; i++) { + var string = codes[i]; + if (string[position] === '0') { + zero.push(string); + } else { + one.push(string); + } + } + return [createTree(zero, position + 1), createTree(one, position + 1)]; + } + } + + this.tree = createTree(table); + + this.codes = table.map(function(bits) { + return parseInt(bits, 2); + }); + this.lengths = table.map(function(bits) { + return bits.length; + }); +} + +HuffmanTable.prototype.encode = function encode(buffer) { + var result = []; + var space = 8; + + function add(data) { + if (space === 8) { + result.push(data); + } else { + result[result.length - 1] |= data; + } + } + + for (var i = 0; i < buffer.length; i++) { + var byte = buffer[i]; + var code = this.codes[byte]; + var length = this.lengths[byte]; + + while (length !== 0) { + if (space >= length) { + add(code << (space - length)); + code = 0; + space -= length; + length = 0; + } else { + var shift = length - space; + var msb = code >> shift; + add(msb); + code -= msb << shift; + length -= space; + space = 0; + } + + if (space === 0) { + space = 8; + } + } + } + + if (space !== 8) { + add(this.codes[256] >> (this.lengths[256] - space)); + } + + return new Buffer(result); +}; + +HuffmanTable.prototype.decode = function decode(buffer) { + var result = []; + var subtree = this.tree; + + for (var i = 0; i < buffer.length; i++) { + var byte = buffer[i]; + + for (var j = 0; j < 8; j++) { + var bit = (byte & 128) ? 1 : 0; + byte = byte << 1; + + subtree = subtree[bit]; + if (subtree.length === 1) { + result.push(subtree[0]); + subtree = this.tree; + } + } + } + + return new Buffer(result); +}; + +// The initializer arrays for the Huffman tables are generated with feeding the tables from the +// spec to this sed command: +// +// sed -e "s/^.* [|]//g" -e "s/|//g" -e "s/ .*//g" -e "s/^/ '/g" -e "s/$/',/g" + +HuffmanTable.huffmanTable = new HuffmanTable([ + '1111111111000', + '11111111111111111011000', + '1111111111111111111111100010', + '1111111111111111111111100011', + '1111111111111111111111100100', + '1111111111111111111111100101', + '1111111111111111111111100110', + '1111111111111111111111100111', + '1111111111111111111111101000', + '111111111111111111101010', + '111111111111111111111111111100', + '1111111111111111111111101001', + '1111111111111111111111101010', + '111111111111111111111111111101', + '1111111111111111111111101011', + '1111111111111111111111101100', + '1111111111111111111111101101', + '1111111111111111111111101110', + '1111111111111111111111101111', + '1111111111111111111111110000', + '1111111111111111111111110001', + '1111111111111111111111110010', + '111111111111111111111111111110', + '1111111111111111111111110011', + '1111111111111111111111110100', + '1111111111111111111111110101', + '1111111111111111111111110110', + '1111111111111111111111110111', + '1111111111111111111111111000', + '1111111111111111111111111001', + '1111111111111111111111111010', + '1111111111111111111111111011', + '010100', + '1111111000', + '1111111001', + '111111111010', + '1111111111001', + '010101', + '11111000', + '11111111010', + '1111111010', + '1111111011', + '11111001', + '11111111011', + '11111010', + '010110', + '010111', + '011000', + '00000', + '00001', + '00010', + '011001', + '011010', + '011011', + '011100', + '011101', + '011110', + '011111', + '1011100', + '11111011', + '111111111111100', + '100000', + '111111111011', + '1111111100', + '1111111111010', + '100001', + '1011101', + '1011110', + '1011111', + '1100000', + '1100001', + '1100010', + '1100011', + '1100100', + '1100101', + '1100110', + '1100111', + '1101000', + '1101001', + '1101010', + '1101011', + '1101100', + '1101101', + '1101110', + '1101111', + '1110000', + '1110001', + '1110010', + '11111100', + '1110011', + '11111101', + '1111111111011', + '1111111111111110000', + '1111111111100', + '11111111111100', + '100010', + '111111111111101', + '00011', + '100011', + '00100', + '100100', + '00101', + '100101', + '100110', + '100111', + '00110', + '1110100', + '1110101', + '101000', + '101001', + '101010', + '00111', + '101011', + '1110110', + '101100', + '01000', + '01001', + '101101', + '1110111', + '1111000', + '1111001', + '1111010', + '1111011', + '111111111111110', + '11111111100', + '11111111111101', + '1111111111101', + '1111111111111111111111111100', + '11111111111111100110', + '1111111111111111010010', + '11111111111111100111', + '11111111111111101000', + '1111111111111111010011', + '1111111111111111010100', + '1111111111111111010101', + '11111111111111111011001', + '1111111111111111010110', + '11111111111111111011010', + '11111111111111111011011', + '11111111111111111011100', + '11111111111111111011101', + '11111111111111111011110', + '111111111111111111101011', + '11111111111111111011111', + '111111111111111111101100', + '111111111111111111101101', + '1111111111111111010111', + '11111111111111111100000', + '111111111111111111101110', + '11111111111111111100001', + '11111111111111111100010', + '11111111111111111100011', + '11111111111111111100100', + '111111111111111011100', + '1111111111111111011000', + '11111111111111111100101', + '1111111111111111011001', + '11111111111111111100110', + '11111111111111111100111', + '111111111111111111101111', + '1111111111111111011010', + '111111111111111011101', + '11111111111111101001', + '1111111111111111011011', + '1111111111111111011100', + '11111111111111111101000', + '11111111111111111101001', + '111111111111111011110', + '11111111111111111101010', + '1111111111111111011101', + '1111111111111111011110', + '111111111111111111110000', + '111111111111111011111', + '1111111111111111011111', + '11111111111111111101011', + '11111111111111111101100', + '111111111111111100000', + '111111111111111100001', + '1111111111111111100000', + '111111111111111100010', + '11111111111111111101101', + '1111111111111111100001', + '11111111111111111101110', + '11111111111111111101111', + '11111111111111101010', + '1111111111111111100010', + '1111111111111111100011', + '1111111111111111100100', + '11111111111111111110000', + '1111111111111111100101', + '1111111111111111100110', + '11111111111111111110001', + '11111111111111111111100000', + '11111111111111111111100001', + '11111111111111101011', + '1111111111111110001', + '1111111111111111100111', + '11111111111111111110010', + '1111111111111111101000', + '1111111111111111111101100', + '11111111111111111111100010', + '11111111111111111111100011', + '11111111111111111111100100', + '111111111111111111111011110', + '111111111111111111111011111', + '11111111111111111111100101', + '111111111111111111110001', + '1111111111111111111101101', + '1111111111111110010', + '111111111111111100011', + '11111111111111111111100110', + '111111111111111111111100000', + '111111111111111111111100001', + '11111111111111111111100111', + '111111111111111111111100010', + '111111111111111111110010', + '111111111111111100100', + '111111111111111100101', + '11111111111111111111101000', + '11111111111111111111101001', + '1111111111111111111111111101', + '111111111111111111111100011', + '111111111111111111111100100', + '111111111111111111111100101', + '11111111111111101100', + '111111111111111111110011', + '11111111111111101101', + '111111111111111100110', + '1111111111111111101001', + '111111111111111100111', + '111111111111111101000', + '11111111111111111110011', + '1111111111111111101010', + '1111111111111111101011', + '1111111111111111111101110', + '1111111111111111111101111', + '111111111111111111110100', + '111111111111111111110101', + '11111111111111111111101010', + '11111111111111111110100', + '11111111111111111111101011', + '111111111111111111111100110', + '11111111111111111111101100', + '11111111111111111111101101', + '111111111111111111111100111', + '111111111111111111111101000', + '111111111111111111111101001', + '111111111111111111111101010', + '111111111111111111111101011', + '1111111111111111111111111110', + '111111111111111111111101100', + '111111111111111111111101101', + '111111111111111111111101110', + '111111111111111111111101111', + '111111111111111111111110000', + '11111111111111111111101110', + '111111111111111111111111111111' +]); + +// ### String literal representation ### +// +// Literal **strings** can represent header names or header values. There's two variant of the +// string encoding: +// +// String literal with Huffman encoding: +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 1 | Value Length Prefix (7) | +// +---+---+---+---+---+---+---+---+ +// | Value Length (0-N bytes) | +// +---+---+---+---+---+---+---+---+ +// ... +// +---+---+---+---+---+---+---+---+ +// | Huffman Encoded Data |Padding| +// +---+---+---+---+---+---+---+---+ +// +// String literal without Huffman encoding: +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | Value Length Prefix (7) | +// +---+---+---+---+---+---+---+---+ +// | Value Length (0-N bytes) | +// +---+---+---+---+---+---+---+---+ +// ... +// +---+---+---+---+---+---+---+---+ +// | Field Bytes Without Encoding | +// +---+---+---+---+---+---+---+---+ + +HeaderSetCompressor.string = function writeString(str) { + str = new Buffer(str, 'utf8'); + + var huffman = HuffmanTable.huffmanTable.encode(str); + if (huffman.length < str.length) { + var length = HeaderSetCompressor.integer(huffman.length, 7); + length[0][0] |= 128; + return length.concat(huffman); + } + + else { + length = HeaderSetCompressor.integer(str.length, 7); + return length.concat(str); + } +}; + +HeaderSetDecompressor.string = function readString(buffer) { + var huffman = buffer[buffer.cursor] & 128; + var length = HeaderSetDecompressor.integer(buffer, 7); + var encoded = buffer.slice(buffer.cursor, buffer.cursor + length); + buffer.cursor += length; + return (huffman ? HuffmanTable.huffmanTable.decode(encoded) : encoded).toString('utf8'); +}; + +// ### Header represenations ### + +// The JavaScript object representation is described near the +// `HeaderSetDecompressor.prototype._execute()` method definition. +// +// **All binary header representations** start with a prefix signaling the representation type and +// an index represented using prefix coded integers: +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 1 | Index (7+) | Indexed Representation +// +---+---------------------------+ +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | 1 | Index (6+) | +// +---+---+---+-------------------+ Literal w/ Indexing +// | Value Length (8+) | +// +-------------------------------+ w/ Indexed Name +// | Value String (Length octets) | +// +-------------------------------+ +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | 1 | 0 | +// +---+---+---+-------------------+ +// | Name Length (8+) | +// +-------------------------------+ Literal w/ Indexing +// | Name String (Length octets) | +// +-------------------------------+ w/ New Name +// | Value Length (8+) | +// +-------------------------------+ +// | Value String (Length octets) | +// +-------------------------------+ +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | 0 | 0 | 0 | Index (4+) | +// +---+---+---+-------------------+ Literal w/o Incremental Indexing +// | Value Length (8+) | +// +-------------------------------+ w/ Indexed Name +// | Value String (Length octets) | +// +-------------------------------+ +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | 0 | 0 | 0 | 0 | +// +---+---+---+-------------------+ +// | Name Length (8+) | +// +-------------------------------+ Literal w/o Incremental Indexing +// | Name String (Length octets) | +// +-------------------------------+ w/ New Name +// | Value Length (8+) | +// +-------------------------------+ +// | Value String (Length octets) | +// +-------------------------------+ +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | 0 | 0 | 1 | Index (4+) | +// +---+---+---+-------------------+ Literal never indexed +// | Value Length (8+) | +// +-------------------------------+ w/ Indexed Name +// | Value String (Length octets) | +// +-------------------------------+ +// +// 0 1 2 3 4 5 6 7 +// +---+---+---+---+---+---+---+---+ +// | 0 | 0 | 0 | 1 | 0 | +// +---+---+---+-------------------+ +// | Name Length (8+) | +// +-------------------------------+ Literal never indexed +// | Name String (Length octets) | +// +-------------------------------+ w/ New Name +// | Value Length (8+) | +// +-------------------------------+ +// | Value String (Length octets) | +// +-------------------------------+ +// +// The **Indexed Representation** consists of the 1-bit prefix and the Index that is represented as +// a 7-bit prefix coded integer and nothing else. +// +// After the first bits, **all literal representations** specify the header name, either as a +// pointer to the Header Table (Index) or a string literal. When the string literal representation +// is used, the Index is set to 0 and the string literal starts at the second byte. +// +// For **all literal representations**, the specification of the header value comes next. It is +// always represented as a string. + +var representations = { + indexed : { prefix: 7, pattern: 0x80 }, + literalIncremental : { prefix: 6, pattern: 0x40 }, + contextUpdate : { prefix: 0, pattern: 0x20 }, + literalNeverIndexed : { prefix: 4, pattern: 0x10 }, + literal : { prefix: 4, pattern: 0x00 } +}; + +HeaderSetCompressor.header = function writeHeader(header) { + var representation, buffers = []; + + if (header.contextUpdate) { + representation = representations.contextUpdate; + } else if (typeof header.value === 'number') { + representation = representations.indexed; + } else if (header.index) { + representation = representations.literalIncremental; + } else if (header.mustNeverIndex) { + representation = representations.literalNeverIndexed; + } else { + representation = representations.literal; + } + + if (representation === representations.contextUpdate) { + buffers.push(HeaderSetCompressor.integer(header.newMaxSize, 5)); + } + + else if (representation === representations.indexed) { + buffers.push(HeaderSetCompressor.integer(header.value + 1, representation.prefix)); + } + + else { + if (typeof header.name === 'number') { + buffers.push(HeaderSetCompressor.integer(header.name + 1, representation.prefix)); + } else { + buffers.push(HeaderSetCompressor.integer(0, representation.prefix)); + buffers.push(HeaderSetCompressor.string(header.name)); + } + buffers.push(HeaderSetCompressor.string(header.value)); + } + + buffers[0][0][0] |= representation.pattern; + + return Array.prototype.concat.apply([], buffers); // array of arrays of buffers -> array of buffers +}; + +HeaderSetDecompressor.header = function readHeader(buffer) { + var representation, header = {}; + + var firstByte = buffer[buffer.cursor]; + if (firstByte & 0x80) { + representation = representations.indexed; + } else if (firstByte & 0x40) { + representation = representations.literalIncremental; + } else if (firstByte & 0x20) { + representation = representations.contextUpdate; + } else if (firstByte & 0x10) { + representation = representations.literalNeverIndexed; + } else { + representation = representations.literal; + } + + header.value = header.name = -1; + header.index = false; + header.contextUpdate = false; + header.newMaxSize = 0; + header.mustNeverIndex = false; + + if (representation === representations.contextUpdate) { + header.contextUpdate = true; + header.newMaxSize = HeaderSetDecompressor.integer(buffer, 5); + } + + else if (representation === representations.indexed) { + header.value = header.name = HeaderSetDecompressor.integer(buffer, representation.prefix) - 1; + } + + else { + header.name = HeaderSetDecompressor.integer(buffer, representation.prefix) - 1; + if (header.name === -1) { + header.name = HeaderSetDecompressor.string(buffer); + } + header.value = HeaderSetDecompressor.string(buffer); + header.index = (representation === representations.literalIncremental); + header.mustNeverIndex = (representation === representations.literalNeverIndexed); + } + + return header; +}; + +// Integration with HTTP/2 +// ======================= + +// This section describes the interaction between the compressor/decompressor and the rest of the +// HTTP/2 implementation. The `Compressor` and the `Decompressor` makes up a layer between the +// [framer](framer.html) and the [connection handling component](connection.html). They let most +// frames pass through, except HEADERS and PUSH_PROMISE frames. They convert the frames between +// these two representations: +// +// { { +// type: 'HEADERS', type: 'HEADERS', +// flags: {}, flags: {}, +// stream: 1, <===> stream: 1, +// headers: { data: Buffer +// N1: 'V1', } +// N2: ['V1', 'V2', ...], +// // ... +// } +// } +// +// There are possibly several binary frame that belong to a single non-binary frame. + +var MAX_HTTP_PAYLOAD_SIZE = 16384; + +// The Compressor class +// -------------------- + +// The Compressor transform stream is basically stateless. +util.inherits(Compressor, TransformStream); +function Compressor(log, type) { + TransformStream.call(this, { objectMode: true }); + + this._log = log.child({ component: 'compressor' }); + + assert((type === 'REQUEST') || (type === 'RESPONSE')); + this._table = new HeaderTable(this._log); + + this.tableSizeChangePending = false; + this.lowestTableSizePending = 0; + this.tableSizeSetting = DEFAULT_HEADER_TABLE_LIMIT; +} + +// Changing the header table size +Compressor.prototype.setTableSizeLimit = function setTableSizeLimit(size) { + this._table.setSizeLimit(size); + if (!this.tableSizeChangePending || size < this.lowestTableSizePending) { + this.lowestTableSizePending = size; + } + this.tableSizeSetting = size; + this.tableSizeChangePending = true; +}; + +// `compress` takes a header set, and compresses it using a new `HeaderSetCompressor` stream +// instance. This means that from now on, the advantages of streaming header encoding are lost, +// but the API becomes simpler. +Compressor.prototype.compress = function compress(headers) { + var compressor = new HeaderSetCompressor(this._log, this._table); + + if (this.tableSizeChangePending) { + if (this.lowestTableSizePending < this.tableSizeSetting) { + compressor.send({contextUpdate: true, newMaxSize: this.lowestTableSizePending, + name: "", value: "", index: 0}); + } + compressor.send({contextUpdate: true, newMaxSize: this.tableSizeSetting, + name: "", value: "", index: 0}); + this.tableSizeChangePending = false; + } + var colonHeaders = []; + var nonColonHeaders = []; + + // To ensure we send colon headers first + for (var name in headers) { + if (name.trim()[0] === ':') { + colonHeaders.push(name); + } else { + nonColonHeaders.push(name); + } + } + + function compressHeader(name) { + var value = headers[name]; + name = String(name).toLowerCase(); + + // * To allow for better compression efficiency, the Cookie header field MAY be split into + // separate header fields, each with one or more cookie-pairs. + if (name == 'cookie') { + if (!(value instanceof Array)) { + value = [value]; + } + value = Array.prototype.concat.apply([], value.map(function(cookie) { + return String(cookie).split(';').map(trim); + })); + } + + if (value instanceof Array) { + for (var i = 0; i < value.length; i++) { + compressor.write([name, String(value[i])]); + } + } else { + compressor.write([name, String(value)]); + } + } + + colonHeaders.forEach(compressHeader); + nonColonHeaders.forEach(compressHeader); + + compressor.end(); + + var chunk, chunks = []; + while (chunk = compressor.read()) { + chunks.push(chunk); + } + return concat(chunks); +}; + +// When a `frame` arrives +Compressor.prototype._transform = function _transform(frame, encoding, done) { + // * and it is a HEADERS or PUSH_PROMISE frame + // * it generates a header block using the compress method + // * cuts the header block into `chunks` that are not larger than `MAX_HTTP_PAYLOAD_SIZE` + // * for each chunk, it pushes out a chunk frame that is identical to the original, except + // the `data` property which holds the given chunk, the type of the frame which is always + // CONTINUATION except for the first frame, and the END_HEADERS/END_PUSH_STREAM flag that + // marks the last frame and the END_STREAM flag which is always false before the end + if (frame.type === 'HEADERS' || frame.type === 'PUSH_PROMISE') { + var buffer = this.compress(frame.headers); + + // This will result in CONTINUATIONs from a PUSH_PROMISE being 4 bytes shorter than they could + // be, but that's not the end of the world, and it prevents us from going over MAX_HTTP_PAYLOAD_SIZE + // on the initial PUSH_PROMISE frame. + var adjustment = frame.type === 'PUSH_PROMISE' ? 4 : 0; + var chunks = cut(buffer, MAX_HTTP_PAYLOAD_SIZE - adjustment); + + for (var i = 0; i < chunks.length; i++) { + var chunkFrame; + var first = (i === 0); + var last = (i === chunks.length - 1); + + if (first) { + chunkFrame = util._extend({}, frame); + chunkFrame.flags = util._extend({}, frame.flags); + chunkFrame.flags['END_' + frame.type] = last; + } else { + chunkFrame = { + type: 'CONTINUATION', + flags: { END_HEADERS: last }, + stream: frame.stream + }; + } + chunkFrame.data = chunks[i]; + + this.push(chunkFrame); + } + } + + // * otherwise, the frame is forwarded without taking any action + else { + this.push(frame); + } + + done(); +}; + +// The Decompressor class +// ---------------------- + +// The Decompressor is a stateful transform stream, since it has to collect multiple frames first, +// and the decoding comes after unifying the payload of those frames. +// +// If there's a frame in progress, `this._inProgress` is `true`. The frames are collected in +// `this._frames`, and the type of the frame and the stream identifier is stored in `this._type` +// and `this._stream` respectively. +util.inherits(Decompressor, TransformStream); +function Decompressor(log, type) { + TransformStream.call(this, { objectMode: true }); + + this._log = log.child({ component: 'compressor' }); + + assert((type === 'REQUEST') || (type === 'RESPONSE')); + this._table = new HeaderTable(this._log); + + this._inProgress = false; + this._base = undefined; +} + +// Changing the header table size +Decompressor.prototype.setTableSizeLimit = function setTableSizeLimit(size) { + this._table.setSizeLimit(size); +}; + +// `decompress` takes a full header block, and decompresses it using a new `HeaderSetDecompressor` +// stream instance. This means that from now on, the advantages of streaming header decoding are +// lost, but the API becomes simpler. +Decompressor.prototype.decompress = function decompress(block) { + var decompressor = new HeaderSetDecompressor(this._log, this._table); + decompressor.end(block); + + var seenNonColonHeader = false; + var headers = {}; + var pair; + while (pair = decompressor.read()) { + var name = pair[0]; + var value = pair[1]; + var isColonHeader = (name.trim()[0] === ':'); + if (seenNonColonHeader && isColonHeader) { + this.emit('error', 'PROTOCOL_ERROR'); + return headers; + } + seenNonColonHeader = !isColonHeader; + if (name in headers) { + if (headers[name] instanceof Array) { + headers[name].push(value); + } else { + headers[name] = [headers[name], value]; + } + } else { + headers[name] = value; + } + } + + // * If there are multiple Cookie header fields after decompression, these MUST be concatenated + // into a single octet string using the two octet delimiter of 0x3B, 0x20 (the ASCII + // string "; "). + if (('cookie' in headers) && (headers['cookie'] instanceof Array)) { + headers['cookie'] = headers['cookie'].join('; '); + } + + return headers; +}; + +// When a `frame` arrives +Decompressor.prototype._transform = function _transform(frame, encoding, done) { + // * and the collection process is already `_inProgress`, the frame is simply stored, except if + // it's an illegal frame + if (this._inProgress) { + if ((frame.type !== 'CONTINUATION') || (frame.stream !== this._base.stream)) { + this._log.error('A series of HEADER frames were not continuous'); + this.emit('error', 'PROTOCOL_ERROR'); + return; + } + this._frames.push(frame); + } + + // * and the collection process is not `_inProgress`, but the new frame's type is HEADERS or + // PUSH_PROMISE, a new collection process begins + else if ((frame.type === 'HEADERS') || (frame.type === 'PUSH_PROMISE')) { + this._inProgress = true; + this._base = util._extend({}, frame); + this._frames = [frame]; + } + + // * otherwise, the frame is forwarded without taking any action + else { + this.push(frame); + } + + // * When the frame signals that it's the last in the series, the header block chunks are + // concatenated, the headers are decompressed, and a new frame gets pushed out with the + // decompressed headers. + if (this._inProgress && (frame.flags.END_HEADERS || frame.flags.END_PUSH_PROMISE)) { + var buffer = concat(this._frames.map(function(frame) { + return frame.data; + })); + try { + var headers = this.decompress(buffer); + } catch(error) { + this._log.error({ err: error }, 'Header decompression error'); + this.emit('error', 'COMPRESSION_ERROR'); + return; + } + this.push(util._extend(this._base, { headers: headers })); + this._inProgress = false; + } + + done(); +}; + +// Helper functions +// ================ + +// Concatenate an array of buffers into a new buffer +function concat(buffers) { + var size = 0; + for (var i = 0; i < buffers.length; i++) { + size += buffers[i].length; + } + + var concatenated = new Buffer(size); + for (var cursor = 0, j = 0; j < buffers.length; cursor += buffers[j].length, j++) { + buffers[j].copy(concatenated, cursor); + } + + return concatenated; +} + +// Cut `buffer` into chunks not larger than `size` +function cut(buffer, size) { + var chunks = []; + var cursor = 0; + do { + var chunkSize = Math.min(size, buffer.length - cursor); + chunks.push(buffer.slice(cursor, cursor + chunkSize)); + cursor += chunkSize; + } while(cursor < buffer.length); + return chunks; +} + +function trim(string) { + return string.trim(); +} diff --git a/testing/xpcshell/node-http2/lib/protocol/connection.js b/testing/xpcshell/node-http2/lib/protocol/connection.js new file mode 100644 index 000000000..2b86b7f1c --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/connection.js @@ -0,0 +1,619 @@ +var assert = require('assert'); + +// The Connection class +// ==================== + +// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport +// stream (TCP stream). It operates by sending and receiving frames and is implemented as a +// [Flow](flow.html) subclass. + +var Flow = require('./flow').Flow; + +exports.Connection = Connection; + +// Public API +// ---------- + +// * **new Connection(log, firstStreamId, settings)**: create a new Connection +// +// * **Event: 'error' (type)**: signals a connection level error made by the other end +// +// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error +// code other than NO_ERROR +// +// * **Event: 'stream' (stream)**: signals that there's an incoming stream +// +// * **createStream(): stream**: initiate a new stream +// +// * **set(settings, callback)**: change the value of one or more settings according to the +// key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. +// +// * **ping([callback])**: send a ping and call callback when the answer arrives +// +// * **close([error])**: close the stream with an error code + +// Constructor +// ----------- + +// The main aspects of managing the connection are: +function Connection(log, firstStreamId, settings) { + // * initializing the base class + Flow.call(this, 0); + + // * logging: every method uses the common logger object + this._log = log.child({ component: 'connection' }); + + // * stream management + this._initializeStreamManagement(firstStreamId); + + // * lifecycle management + this._initializeLifecycleManagement(); + + // * flow control + this._initializeFlowControl(); + + // * settings management + this._initializeSettingsManagement(settings); + + // * multiplexing + this._initializeMultiplexing(); +} +Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } }); + +// Overview +// -------- + +// | ^ | ^ +// v | v | +// +--------------+ +--------------+ +// +---| stream1 |---| stream2 |---- .... ---+ +// | | +----------+ | | +----------+ | | +// | | | stream1. | | | | stream2. | | | +// | +-| upstream |-+ +-| upstream |-+ | +// | +----------+ +----------+ | +// | | ^ | ^ | +// | v | v | | +// | +-----+-------------+-----+-------- .... | +// | ^ | | | | +// | | v | | | +// | +--------------+ | | | +// | | stream0 | | | | +// | | connection | | | | +// | | management | multiplexing | +// | +--------------+ flow control | +// | | ^ | +// | _read() | | _write() | +// | v | | +// | +------------+ +-----------+ | +// | |output queue| |input queue| | +// +----------------+------------+-+-----------+-----------------+ +// | ^ +// read() | | write() +// v | + +// Stream management +// ----------------- + +var Stream = require('./stream').Stream; + +// Initialization: +Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { + // * streams are stored in two data structures: + // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames. + // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames. + this._streamIds = []; + this._streamPriorities = []; + + // * The next outbound stream ID and the last inbound stream id + this._nextStreamId = firstStreamId; + this._lastIncomingStream = 0; + + // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID + this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; + + // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can + // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. + this._streamSlotsFree = Infinity; + this._streamLimit = Infinity; + this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); +}; + +// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It +// broadcasts the message by creating an event on it. +Connection.prototype._writeControlFrame = function _writeControlFrame(frame) { + if ((frame.type === 'SETTINGS') || (frame.type === 'PING') || + (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') || + (frame.type === 'ALTSVC')) { + this._log.debug({ frame: frame }, 'Receiving connection level frame'); + this.emit(frame.type, frame); + } else { + this._log.error({ frame: frame }, 'Invalid connection level frame'); + this.emit('error', 'PROTOCOL_ERROR'); + } +}; + +// Methods to manage the stream slot pool: +Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) { + var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit); + this._streamSlotsFree += newStreamLimit - this._streamLimit; + this._streamLimit = newStreamLimit; + if (wakeup) { + this.emit('wakeup'); + } +}; + +Connection.prototype._changeStreamCount = function _changeStreamCount(change) { + if (change) { + this._log.trace({ free: this._streamSlotsFree, change: change }, + 'Changing active stream count.'); + var wakeup = (this._streamSlotsFree === 0) && (change < 0); + this._streamSlotsFree -= change; + if (wakeup) { + this.emit('wakeup'); + } + } +}; + +// Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of +// an outbound stream) consists of three steps: +// +// 1. var stream = new Stream(this._log, this); +// 2. this._allocateId(stream, id); +// 2. this._allocatePriority(stream); + +// Allocating an ID to a stream +Connection.prototype._allocateId = function _allocateId(stream, id) { + // * initiated stream without definite ID + if (id === undefined) { + id = this._nextStreamId; + this._nextStreamId += 2; + } + + // * incoming stream with a legitim ID (larger than any previous and different parity than ours) + else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) { + this._lastIncomingStream = id; + } + + // * incoming stream with invalid ID + else { + this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream }, + 'Invalid incoming stream ID.'); + this.emit('error', 'PROTOCOL_ERROR'); + return undefined; + } + + assert(!(id in this._streamIds)); + + // * adding to `this._streamIds` + this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.'); + this._streamIds[id] = stream; + stream.id = id; + this.emit('new_stream', stream, id); + + // * forwarding connection errors from streams + stream.on('connectionError', this.emit.bind(this, 'error')); + + return id; +}; + +// Allocating a priority to a stream, and managing priority changes +Connection.prototype._allocatePriority = function _allocatePriority(stream) { + this._log.trace({ s: stream }, 'Allocating priority for stream.'); + this._insert(stream, stream._priority); + stream.on('priority', this._reprioritize.bind(this, stream)); + stream.upstream.on('readable', this.emit.bind(this, 'wakeup')); + this.emit('wakeup'); +}; + +Connection.prototype._insert = function _insert(stream, priority) { + if (priority in this._streamPriorities) { + this._streamPriorities[priority].push(stream); + } else { + this._streamPriorities[priority] = [stream]; + } +}; + +Connection.prototype._reprioritize = function _reprioritize(stream, priority) { + var bucket = this._streamPriorities[stream._priority]; + var index = bucket.indexOf(stream); + assert(index !== -1); + bucket.splice(index, 1); + if (bucket.length === 0) { + delete this._streamPriorities[stream._priority]; + } + + this._insert(stream, priority); +}; + +// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to +// a previously nonexistent stream. +Connection.prototype._createIncomingStream = function _createIncomingStream(id) { + this._log.debug({ stream_id: id }, 'New incoming stream.'); + + var stream = new Stream(this._log, this); + this._allocateId(stream, id); + this._allocatePriority(stream); + this.emit('stream', stream, id); + + return stream; +}; + +// Creating an *outbound* stream +Connection.prototype.createStream = function createStream() { + this._log.trace('Creating new outbound stream.'); + + // * Receiving is enabled immediately, and an ID gets assigned to the stream + var stream = new Stream(this._log, this); + this._allocatePriority(stream); + + return stream; +}; + +// Multiplexing +// ------------ + +Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { + this.on('window_update', this.emit.bind(this, 'wakeup')); + this._sendScheduled = false; + this._firstFrameReceived = false; +}; + +// The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented +// by child classes. It reads frames from streams and pushes them to the output buffer. +Connection.prototype._send = function _send(immediate) { + // * Do not do anything if the connection is already closed + if (this._closed) { + return; + } + + // * Collapsing multiple calls in a turn into a single deferred call + if (immediate) { + this._sendScheduled = false; + } else { + if (!this._sendScheduled) { + this._sendScheduled = true; + setImmediate(this._send.bind(this, true)); + } + return; + } + + this._log.trace('Starting forwarding frames from streams.'); + + // * Looping through priority `bucket`s in priority order. +priority_loop: + for (var priority in this._streamPriorities) { + var bucket = this._streamPriorities[priority]; + var nextBucket = []; + + // * Forwarding frames from buckets with round-robin scheduling. + // 1. pulling out frame + // 2. if there's no frame, skip this stream + // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip + // this stream + // 4. adding stream to the bucket of the next round + // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) + // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream + // 7. forwarding the frame, changing `streamCount` as appropriate + // 8. stepping to the next stream if there's still more frame needed in the output buffer + // 9. switching to the bucket of the next round + while (bucket.length > 0) { + for (var index = 0; index < bucket.length; index++) { + var stream = bucket[index]; + var frame = stream.upstream.read((this._window > 0) ? this._window : -1); + + if (!frame) { + continue; + } else if (frame.count_change > this._streamSlotsFree) { + stream.upstream.unshift(frame); + continue; + } + + nextBucket.push(stream); + + if (frame.stream === undefined) { + frame.stream = stream.id || this._allocateId(stream); + } + + if (frame.type === 'PUSH_PROMISE') { + this._allocatePriority(frame.promised_stream); + frame.promised_stream = this._allocateId(frame.promised_stream); + } + + this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); + var moreNeeded = this.push(frame); + this._changeStreamCount(frame.count_change); + + assert(moreNeeded !== null); // The frame shouldn't be unforwarded + if (moreNeeded === false) { + break priority_loop; + } + } + + bucket = nextBucket; + nextBucket = []; + } + } + + // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event + if (moreNeeded === undefined) { + this.once('wakeup', this._send.bind(this)); + } + + this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.'); +}; + +// The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be +// implemented by child classes. It forwards the given frame to the appropriate stream: +Connection.prototype._receive = function _receive(frame, done) { + this._log.trace({ frame: frame }, 'Forwarding incoming frame'); + + // * first frame needs to be checked by the `_onFirstFrameReceived` method + if (!this._firstFrameReceived) { + this._firstFrameReceived = true; + this._onFirstFrameReceived(frame); + } + + // Do some sanity checking here before we create a stream + if ((frame.type == 'SETTINGS' || + frame.type == 'PING' || + frame.type == 'GOAWAY') && + frame.stream != 0) { + // Got connection-level frame on a stream - EEP! + this.close('PROTOCOL_ERROR'); + return; + } else if ((frame.type == 'DATA' || + frame.type == 'HEADERS' || + frame.type == 'PRIORITY' || + frame.type == 'RST_STREAM' || + frame.type == 'PUSH_PROMISE' || + frame.type == 'CONTINUATION') && + frame.stream == 0) { + // Got stream-level frame on connection - EEP! + this.close('PROTOCOL_ERROR'); + return; + } + // WINDOW_UPDATE can be on either stream or connection + + // * gets the appropriate stream from the stream registry + var stream = this._streamIds[frame.stream]; + + // * or creates one if it's not in `this.streams` + if (!stream) { + stream = this._createIncomingStream(frame.stream); + } + + // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream + if (frame.type === 'PUSH_PROMISE') { + frame.promised_stream = this._createIncomingStream(frame.promised_stream); + } + + frame.count_change = this._changeStreamCount.bind(this); + + // * and writes it to the `stream`'s `upstream` + stream.upstream.write(frame); + + done(); +}; + +// Settings management +// ------------------- + +var defaultSettings = { +}; + +// Settings management initialization: +Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) { + // * Setting up the callback queue for setting acknowledgements + this._settingsAckCallbacks = []; + + // * Sending the initial settings. + this._log.debug({ settings: settings }, + 'Sending the first SETTINGS frame as part of the connection header.'); + this.set(settings || defaultSettings); + + // * Forwarding SETTINGS frames to the `_receiveSettings` method + this.on('SETTINGS', this._receiveSettings); + this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize); +}; + +// * Checking that the first frame the other endpoint sends is SETTINGS +Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) { + if ((frame.stream === 0) && (frame.type === 'SETTINGS')) { + this._log.debug('Receiving the first SETTINGS frame as part of the connection header.'); + } else { + this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.'); + this.emit('error', 'PROTOCOL_ERROR'); + } +}; + +// Handling of incoming SETTINGS frames. +Connection.prototype._receiveSettings = function _receiveSettings(frame) { + // * If it's an ACK, call the appropriate callback + if (frame.flags.ACK) { + var callback = this._settingsAckCallbacks.shift(); + if (callback) { + callback(); + } + } + + // * If it's a setting change request, then send an ACK and change the appropriate settings + else { + if (!this._closed) { + this.push({ + type: 'SETTINGS', + flags: { ACK: true }, + stream: 0, + settings: {} + }); + } + for (var name in frame.settings) { + this.emit('RECEIVING_' + name, frame.settings[name]); + } + } +}; + +Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) { + if ((value < 0x4000) || (value >= 0x01000000)) { + this._log.fatal('Received invalid value for max frame size: ' + value); + this.emit('error'); + } +}; + +// Changing one or more settings value and sending out a SETTINGS frame +Connection.prototype.set = function set(settings, callback) { + // * Calling the callback and emitting event when the change is acknowledges + var self = this; + this._settingsAckCallbacks.push(function() { + for (var name in settings) { + self.emit('ACKNOWLEDGED_' + name, settings[name]); + } + if (callback) { + callback(); + } + }); + + // * Sending out the SETTINGS frame + this.push({ + type: 'SETTINGS', + flags: { ACK: false }, + stream: 0, + settings: settings + }); + for (var name in settings) { + this.emit('SENDING_' + name, settings[name]); + } +}; + +// Lifecycle management +// -------------------- + +// The main responsibilities of lifecycle management code: +// +// * keeping the connection alive by +// * sending PINGs when the connection is idle +// * answering PINGs +// * ending the connection + +Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() { + this._pings = {}; + this.on('PING', this._receivePing); + this.on('GOAWAY', this._receiveGoaway); + this._closed = false; +}; + +// Generating a string of length 16 with random hexadecimal digits +Connection.prototype._generatePingId = function _generatePingId() { + do { + var id = ''; + for (var i = 0; i < 16; i++) { + id += Math.floor(Math.random()*16).toString(16); + } + } while(id in this._pings); + return id; +}; + +// Sending a ping and calling `callback` when the answer arrives +Connection.prototype.ping = function ping(callback) { + var id = this._generatePingId(); + var data = new Buffer(id, 'hex'); + this._pings[id] = callback; + + this._log.debug({ data: data }, 'Sending PING.'); + this.push({ + type: 'PING', + flags: { + ACK: false + }, + stream: 0, + data: data + }); +}; + +// Answering pings +Connection.prototype._receivePing = function _receivePing(frame) { + if (frame.flags.ACK) { + var id = frame.data.toString('hex'); + if (id in this._pings) { + this._log.debug({ data: frame.data }, 'Receiving answer for a PING.'); + var callback = this._pings[id]; + if (callback) { + callback(); + } + delete this._pings[id]; + } else { + this._log.warn({ data: frame.data }, 'Unsolicited PING answer.'); + } + + } else { + this._log.debug({ data: frame.data }, 'Answering PING.'); + this.push({ + type: 'PING', + flags: { + ACK: true + }, + stream: 0, + data: frame.data + }); + } +}; + +// Terminating the connection +Connection.prototype.close = function close(error) { + if (this._closed) { + this._log.warn('Trying to close an already closed connection'); + return; + } + + this._log.debug({ error: error }, 'Closing the connection'); + this.push({ + type: 'GOAWAY', + flags: {}, + stream: 0, + last_stream: this._lastIncomingStream, + error: error || 'NO_ERROR' + }); + this.push(null); + this._closed = true; +}; + +Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { + this._log.debug({ error: frame.error }, 'Other end closed the connection'); + this.push(null); + this._closed = true; + if (frame.error !== 'NO_ERROR') { + this.emit('peerError', frame.error); + } +}; + +// Flow control +// ------------ + +Connection.prototype._initializeFlowControl = function _initializeFlowControl() { + // Handling of initial window size of individual streams. + this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE; + this.on('new_stream', function(stream) { + stream.upstream.setInitialWindow(this._initialStreamWindowSize); + }); + this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize); + this._streamIds[0].upstream.setInitialWindow = function noop() {}; +}; + +// The initial connection flow control window is 65535 bytes. +var INITIAL_STREAM_WINDOW_SIZE = 65535; + +// A SETTINGS frame can alter the initial flow control window size for all current streams. When the +// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all +// stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by +// the difference between the new value and the old value. +Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) { + if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) { + this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.'); + this.emit('error', 'FLOW_CONTROL_ERROR'); + } else { + this._log.debug({ size: size }, 'Changing stream initial window size.'); + this._initialStreamWindowSize = size; + this._streamIds.forEach(function(stream) { + stream.upstream.setInitialWindow(size); + }); + } +}; diff --git a/testing/xpcshell/node-http2/lib/protocol/endpoint.js b/testing/xpcshell/node-http2/lib/protocol/endpoint.js new file mode 100644 index 000000000..a218db040 --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/endpoint.js @@ -0,0 +1,262 @@ +var assert = require('assert'); + +var Serializer = require('./framer').Serializer; +var Deserializer = require('./framer').Deserializer; +var Compressor = require('./compressor').Compressor; +var Decompressor = require('./compressor').Decompressor; +var Connection = require('./connection').Connection; +var Duplex = require('stream').Duplex; +var Transform = require('stream').Transform; + +exports.Endpoint = Endpoint; + +// The Endpoint class +// ================== + +// Public API +// ---------- + +// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. +// +// - `log`: bunyan logger of the parent +// - `role`: 'CLIENT' or 'SERVER' +// - `settings`: initial HTTP/2 settings +// - `filters`: a map of functions that filter the traffic between components (for debugging or +// intentional failure injection). +// +// Filter functions get three arguments: +// 1. `frame`: the current frame +// 2. `forward(frame)`: function that can be used to forward a frame to the next component +// 3. `done()`: callback to signal the end of the filter process +// +// Valid filter names and their position in the stack: +// - `beforeSerialization`: after compression, before serialization +// - `beforeCompression`: after multiplexing, before compression +// - `afterDeserialization`: after deserialization, before decompression +// - `afterDecompression`: after decompression, before multiplexing +// +// * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection +// +// * **Event: 'error' (type)**: signals an error +// +// * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) +// +// * **close([error])**: close the connection with an error code + +// Constructor +// ----------- + +// The process of initialization: +function Endpoint(log, role, settings, filters) { + Duplex.call(this); + + // * Initializing logging infrastructure + this._log = log.child({ component: 'endpoint', e: this }); + + // * First part of the handshake process: sending and receiving the client connection header + // prelude. + assert((role === 'CLIENT') || role === 'SERVER'); + if (role === 'CLIENT') { + this._writePrelude(); + } else { + this._readPrelude(); + } + + // * Initialization of component. This includes the second part of the handshake process: + // sending the first SETTINGS frame. This is done by the connection class right after + // initialization. + this._initializeDataFlow(role, settings, filters || {}); + + // * Initialization of management code. + this._initializeManagement(); + + // * Initializing error handling. + this._initializeErrorHandling(); +} +Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } }); + +// Handshake +// --------- + +var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'); + +// Writing the client header is simple and synchronous. +Endpoint.prototype._writePrelude = function _writePrelude() { + this._log.debug('Sending the client connection header prelude.'); + this.push(CLIENT_PRELUDE); +}; + +// The asynchronous process of reading the client header: +Endpoint.prototype._readPrelude = function _readPrelude() { + // * progress in the header is tracker using a `cursor` + var cursor = 0; + + // * `_write` is temporarily replaced by the comparator function + this._write = function _temporalWrite(chunk, encoding, done) { + // * which compares the stored header with the current `chunk` byte by byte and emits the + // 'error' event if there's a byte that doesn't match + var offset = cursor; + while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) { + if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) { + this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk }, + 'Client connection header prelude does not match.'); + this._error('handshake', 'PROTOCOL_ERROR'); + return; + } + cursor += 1; + } + + // * if the whole header is over, and there were no error then restore the original `_write` + // and call it with the remaining part of the current chunk + if (cursor === CLIENT_PRELUDE.length) { + this._log.debug('Successfully received the client connection header prelude.'); + delete this._write; + chunk = chunk.slice(cursor - offset); + this._write(chunk, encoding, done); + } + }; +}; + +// Data flow +// --------- + +// +---------------------------------------------+ +// | | +// | +-------------------------------------+ | +// | | +---------+ +---------+ +---------+ | | +// | | | stream1 | | stream2 | | ... | | | +// | | +---------+ +---------+ +---------+ | | +// | | connection | | +// | +-------------------------------------+ | +// | | ^ | +// | pipe | | pipe | +// | v | | +// | +------------------+------------------+ | +// | | compressor | decompressor | | +// | +------------------+------------------+ | +// | | ^ | +// | pipe | | pipe | +// | v | | +// | +------------------+------------------+ | +// | | serializer | deserializer | | +// | +------------------+------------------+ | +// | | ^ | +// | _read() | | _write() | +// | v | | +// | +------------+ +-----------+ | +// | |output queue| |input queue| | +// +------+------------+-----+-----------+-------+ +// | ^ +// read() | | write() +// v | + +function createTransformStream(filter) { + var transform = new Transform({ objectMode: true }); + var push = transform.push.bind(transform); + transform._transform = function(frame, encoding, done) { + filter(frame, push, done); + }; + return transform; +} + +function pipeAndFilter(stream1, stream2, filter) { + if (filter) { + stream1.pipe(createTransformStream(filter)).pipe(stream2); + } else { + stream1.pipe(stream2); + } +} + +Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { + var firstStreamId, compressorRole, decompressorRole; + if (role === 'CLIENT') { + firstStreamId = 1; + compressorRole = 'REQUEST'; + decompressorRole = 'RESPONSE'; + } else { + firstStreamId = 2; + compressorRole = 'RESPONSE'; + decompressorRole = 'REQUEST'; + } + + this._serializer = new Serializer(this._log); + this._deserializer = new Deserializer(this._log); + this._compressor = new Compressor(this._log, compressorRole); + this._decompressor = new Decompressor(this._log, decompressorRole); + this._connection = new Connection(this._log, firstStreamId, settings); + + pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); + pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); + pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization); + pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression); + + this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE', + this._decompressor.setTableSizeLimit.bind(this._decompressor)); + this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', + this._compressor.setTableSizeLimit.bind(this._compressor)); +}; + +var noread = {}; +Endpoint.prototype._read = function _read() { + this._readableState.sync = true; + var moreNeeded = noread, chunk; + while (moreNeeded && (chunk = this._serializer.read())) { + moreNeeded = this.push(chunk); + } + if (moreNeeded === noread) { + this._serializer.once('readable', this._read.bind(this)); + } + this._readableState.sync = false; +}; + +Endpoint.prototype._write = function _write(chunk, encoding, done) { + this._deserializer.write(chunk, encoding, done); +}; + +// Management +// -------------- + +Endpoint.prototype._initializeManagement = function _initializeManagement() { + this._connection.on('stream', this.emit.bind(this, 'stream')); +}; + +Endpoint.prototype.createStream = function createStream() { + return this._connection.createStream(); +}; + +// Error handling +// -------------- + +Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() { + this._serializer.on('error', this._error.bind(this, 'serializer')); + this._deserializer.on('error', this._error.bind(this, 'deserializer')); + this._compressor.on('error', this._error.bind(this, 'compressor')); + this._decompressor.on('error', this._error.bind(this, 'decompressor')); + this._connection.on('error', this._error.bind(this, 'connection')); + + this._connection.on('peerError', this.emit.bind(this, 'peerError')); +}; + +Endpoint.prototype._error = function _error(component, error) { + this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection'); + this.close(error); + setImmediate(this.emit.bind(this, 'error', error)); +}; + +Endpoint.prototype.close = function close(error) { + this._connection.close(error); +}; + +// Bunyan serializers +// ------------------ + +exports.serializers = {}; + +var nextId = 0; +exports.serializers.e = function(endpoint) { + if (!('id' in endpoint)) { + endpoint.id = nextId; + nextId += 1; + } + return endpoint.id; +}; diff --git a/testing/xpcshell/node-http2/lib/protocol/flow.js b/testing/xpcshell/node-http2/lib/protocol/flow.js new file mode 100644 index 000000000..4ec5649be --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/flow.js @@ -0,0 +1,353 @@ +var assert = require('assert'); + +// The Flow class +// ============== + +// Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be +// subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html). +// [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex + +var Duplex = require('stream').Duplex; + +exports.Flow = Flow; + +// Public API +// ---------- + +// * **Event: 'error' (type)**: signals an error +// +// * **setInitialWindow(size)**: the initial flow control window size can be changed *any time* +// ([as described in the standard][1]) using this method +// +// [1]: https://tools.ietf.org/html/rfc7540#section-6.9.2 + +// API for child classes +// --------------------- + +// * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames +// with the given `flowControlId` (or every update frame if not given) +// +// * **_send()**: called when more frames should be pushed. The child class is expected to override +// this (instead of the `_read` method of the Duplex class). +// +// * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is +// expected to override this (instead of the `_write` method of the Duplex class). +// +// * **push(frame): bool**: schedules `frame` for sending. +// +// Returns `true` if it needs more frames in the output queue, `false` if the output queue is +// full, and `null` if did not push the frame into the output queue (instead, it pushed it into +// the flow control queue). +// +// * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA +// frames, length of the payload for DATA frames) of the returned frame will be under `limit`. +// Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the +// same thing as [in the original API](https://nodejs.org/api/stream.html#stream_stream_read_0). +// +// * **getLastQueuedFrame(): frame**: returns the last frame in output buffers +// +// * **_log**: the Flow class uses the `_log` object of the parent + +// Constructor +// ----------- + +// When a HTTP/2.0 connection is first established, new streams are created with an initial flow +// control window size of 65535 bytes. +var INITIAL_WINDOW_SIZE = 65535; + +// `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched. +function Flow(flowControlId) { + Duplex.call(this, { objectMode: true }); + + this._window = this._initialWindow = INITIAL_WINDOW_SIZE; + this._flowControlId = flowControlId; + this._queue = []; + this._ended = false; + this._received = 0; + this._blocked = false; +} +Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } }); + +// Incoming frames +// --------------- + +// `_receive` is called when there's an incoming frame. +Flow.prototype._receive = function _receive(frame, callback) { + throw new Error('The _receive(frame, callback) method has to be overridden by the child class!'); +}; + +// `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s +// to the flow. It emits the 'receiving' event and notifies the window size tracking code if the +// incoming frame is a WINDOW_UPDATE. +// [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 +Flow.prototype._write = function _write(frame, encoding, callback) { + var sentToUs = (this._flowControlId === undefined) || (frame.stream === this._flowControlId); + + if (sentToUs && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { + this._ended = true; + } + + if ((frame.type === 'DATA') && (frame.data.length > 0)) { + this._receive(frame, function() { + this._received += frame.data.length; + if (!this._restoreWindowTimer) { + this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this)); + } + callback(); + }.bind(this)); + } + + else { + this._receive(frame, callback); + } + + if (sentToUs && (frame.type === 'WINDOW_UPDATE')) { + this._updateWindow(frame); + } +}; + +// `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends +// a WINDOW_UPDATE that restores the flow control window of the remote end. +// TODO: push this directly into the output queue. No need to wait for DATA frames in the queue. +Flow.prototype._restoreWindow = function _restoreWindow() { + delete this._restoreWindowTimer; + if (!this._ended && (this._received > 0)) { + this.push({ + type: 'WINDOW_UPDATE', + flags: {}, + stream: this._flowControlId, + window_size: this._received + }); + this._received = 0; + } +}; + +// Outgoing frames - sending procedure +// ----------------------------------- + +// flow +// +-------------------------------------------------+ +// | | +// +--------+ +---------+ | +// read() | output | _read() | flow | _send() | +// <----------| |<----------| control |<------------- | +// | buffer | | buffer | | +// +--------+ +---------+ | +// | input | | +// ---------->| |-----------------------------------> | +// write() | buffer | _write() _receive() | +// +--------+ | +// | | +// +-------------------------------------------------+ + +// `_send` is called when more frames should be pushed to the output buffer. +Flow.prototype._send = function _send() { + throw new Error('The _send() method has to be overridden by the child class!'); +}; + +// `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more +// items in the output queue. +// [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 +Flow.prototype._read = function _read() { + // * if the flow control queue is empty, then let the user push more frames + if (this._queue.length === 0) { + this._send(); + } + + // * if there are items in the flow control queue, then let's put them into the output queue (to + // the extent it is possible with respect to the window size and output queue feedback) + else if (this._window > 0) { + this._blocked = false; + this._readableState.sync = true; // to avoid reentrant calls + do { + var moreNeeded = this._push(this._queue[0]); + if (moreNeeded !== null) { + this._queue.shift(); + } + } while (moreNeeded && (this._queue.length > 0)); + this._readableState.sync = false; + + assert((!moreNeeded) || // * output queue is full + (this._queue.length === 0) || // * flow control queue is empty + (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update + } + + // * otherwise, come back when the flow control window is positive + else if (!this._blocked) { + this._parentPush({ + type: 'BLOCKED', + flags: {}, + stream: this._flowControlId + }); + this.once('window_update', this._read); + this._blocked = true; + } +}; + +var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383 + +// `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control +// size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will +// be under `limit`. +Flow.prototype.read = function read(limit) { + if (limit === 0) { + return Duplex.prototype.read.call(this, 0); + } else if (limit === -1) { + limit = 0; + } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) { + limit = MAX_PAYLOAD_SIZE; + } + + // * Looking at the first frame in the queue without pulling it out if possible. + var frame = this._readableState.buffer[0]; + if (!frame && !this._readableState.ended) { + this._read(); + frame = this._readableState.buffer[0]; + } + + if (frame && (frame.type === 'DATA')) { + // * If the frame is DATA, then there's two special cases: + // * if the limit is 0, we shouldn't return anything + // * if the size of the frame is larger than limit, then the frame should be split + if (limit === 0) { + return Duplex.prototype.read.call(this, 0); + } + + else if (frame.data.length > limit) { + this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit }, + 'Splitting out forwardable part of a DATA frame.'); + this.unshift({ + type: 'DATA', + flags: {}, + stream: frame.stream, + data: frame.data.slice(0, limit) + }); + frame.data = frame.data.slice(limit); + } + } + + return Duplex.prototype.read.call(this); +}; + +// `_parentPush` pushes the given `frame` into the output queue +Flow.prototype._parentPush = function _parentPush(frame) { + this._log.trace({ frame: frame }, 'Pushing frame into the output queue'); + + if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) { + this._log.trace({ window: this._window, by: frame.data.length }, + 'Decreasing flow control window size.'); + this._window -= frame.data.length; + assert(this._window >= 0); + } + + return Duplex.prototype.push.call(this, frame); +}; + +// `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size. +// It is capable of splitting DATA frames into smaller parts, if the window size is not enough to +// push the whole frame. The return value is similar to `push` except that it returns `null` if it +// did not push the whole frame to the output queue (but maybe it did push part of the frame). +Flow.prototype._push = function _push(frame) { + var data = frame && (frame.type === 'DATA') && frame.data; + var maxFrameLength = (this._window < 16384) ? this._window : 16384; + + if (!data || (data.length <= maxFrameLength)) { + return this._parentPush(frame); + } + + else if (this._window <= 0) { + return null; + } + + else { + this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window }, + 'Splitting out forwardable part of a DATA frame.'); + frame.data = data.slice(maxFrameLength); + this._parentPush({ + type: 'DATA', + flags: {}, + stream: frame.stream, + data: data.slice(0, maxFrameLength) + }); + return null; + } +}; + +// Push `frame` into the flow control queue, or if it's empty, then directly into the output queue +Flow.prototype.push = function push(frame) { + if (frame === null) { + this._log.debug('Enqueueing outgoing End Of Stream'); + } else { + this._log.debug({ frame: frame }, 'Enqueueing outgoing frame'); + } + + var moreNeeded = null; + if (this._queue.length === 0) { + moreNeeded = this._push(frame); + } + + if (moreNeeded === null) { + this._queue.push(frame); + } + + return moreNeeded; +}; + +// `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the +// [Stream](stream.html) class to mark the last frame with END_STREAM flag. +Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() { + var readableQueue = this._readableState.buffer; + return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1]; +}; + +// Outgoing frames - managing the window size +// ------------------------------------------ + +// Flow control window size is manipulated using the `_increaseWindow` method. +// +// * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled +// again once disabled. Any attempt to re-enable flow control MUST be rejected with a +// FLOW_CONTROL_ERROR error code. +// * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken +// depends on it being a stream or the connection itself. + +var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; + +Flow.prototype._increaseWindow = function _increaseWindow(size) { + if ((this._window === Infinity) && (size !== Infinity)) { + this._log.error('Trying to increase flow control window after flow control was turned off.'); + this.emit('error', 'FLOW_CONTROL_ERROR'); + } else { + this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.'); + this._window += size; + if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) { + this._log.error('Flow control window grew too large.'); + this.emit('error', 'FLOW_CONTROL_ERROR'); + } else { + if (size != 0) { + this.emit('window_update'); + } + } + } +}; + +// The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It +// modifies the flow control window: +// +// * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the +// END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL +// flag set is ignored. +// * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount +// specified in the frame. +Flow.prototype._updateWindow = function _updateWindow(frame) { + this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size); +}; + +// A SETTINGS frame can alter the initial flow control window size for all current streams. When the +// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by +// calling the `setInitialWindow` method. The window size has to be modified by the difference +// between the new value and the old value. +Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) { + this._increaseWindow(initialWindow - this._initialWindow); + this._initialWindow = initialWindow; +}; diff --git a/testing/xpcshell/node-http2/lib/protocol/framer.js b/testing/xpcshell/node-http2/lib/protocol/framer.js new file mode 100644 index 000000000..244e60ae1 --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/framer.js @@ -0,0 +1,1165 @@ +// The framer consists of two [Transform Stream][1] subclasses that operate in [object mode][2]: +// the Serializer and the Deserializer +// [1]: https://nodejs.org/api/stream.html#stream_class_stream_transform +// [2]: https://nodejs.org/api/stream.html#stream_new_stream_readable_options +var assert = require('assert'); + +var Transform = require('stream').Transform; + +exports.Serializer = Serializer; +exports.Deserializer = Deserializer; + +var logData = Boolean(process.env.HTTP2_LOG_DATA); + +var MAX_PAYLOAD_SIZE = 16384; +var WINDOW_UPDATE_PAYLOAD_SIZE = 4; + +// Serializer +// ---------- +// +// Frame Objects +// * * * * * * * --+--------------------------- +// | | +// v v Buffers +// [] -----> Payload Ser. --[buffers]--> Header Ser. --> * * * * +// empty adds payload adds header +// array buffers buffer + +function Serializer(log) { + this._log = log.child({ component: 'serializer' }); + Transform.call(this, { objectMode: true }); +} +Serializer.prototype = Object.create(Transform.prototype, { constructor: { value: Serializer } }); + +// When there's an incoming frame object, it first generates the frame type specific part of the +// frame (payload), and then then adds the header part which holds fields that are common to all +// frame types (like the length of the payload). +Serializer.prototype._transform = function _transform(frame, encoding, done) { + this._log.trace({ frame: frame }, 'Outgoing frame'); + + assert(frame.type in Serializer, 'Unknown frame type: ' + frame.type); + + var buffers = []; + Serializer[frame.type](frame, buffers); + var length = Serializer.commonHeader(frame, buffers); + + assert(length <= MAX_PAYLOAD_SIZE, 'Frame too large!'); + + for (var i = 0; i < buffers.length; i++) { + if (logData) { + this._log.trace({ data: buffers[i] }, 'Outgoing data'); + } + this.push(buffers[i]); + } + + done(); +}; + +// Deserializer +// ------------ +// +// Buffers +// * * * * --------+------------------------- +// | | +// v v Frame Objects +// {} -----> Header Des. --{frame}--> Payload Des. --> * * * * * * * +// empty adds parsed adds parsed +// object header properties payload properties + +function Deserializer(log, role) { + this._role = role; + this._log = log.child({ component: 'deserializer' }); + Transform.call(this, { objectMode: true }); + this._next(COMMON_HEADER_SIZE); +} +Deserializer.prototype = Object.create(Transform.prototype, { constructor: { value: Deserializer } }); + +// The Deserializer is stateful, and it's two main alternating states are: *waiting for header* and +// *waiting for payload*. The state is stored in the boolean property `_waitingForHeader`. +// +// When entering a new state, a `_buffer` is created that will hold the accumulated data (header or +// payload). The `_cursor` is used to track the progress. +Deserializer.prototype._next = function(size) { + this._cursor = 0; + this._buffer = new Buffer(size); + this._waitingForHeader = !this._waitingForHeader; + if (this._waitingForHeader) { + this._frame = {}; + } +}; + +// Parsing an incoming buffer is an iterative process because it can hold multiple frames if it's +// large enough. A `cursor` is used to track the progress in parsing the incoming `chunk`. +Deserializer.prototype._transform = function _transform(chunk, encoding, done) { + var cursor = 0; + + if (logData) { + this._log.trace({ data: chunk }, 'Incoming data'); + } + + while(cursor < chunk.length) { + // The content of an incoming buffer is first copied to `_buffer`. If it can't hold the full + // chunk, then only a part of it is copied. + var toCopy = Math.min(chunk.length - cursor, this._buffer.length - this._cursor); + chunk.copy(this._buffer, this._cursor, cursor, cursor + toCopy); + this._cursor += toCopy; + cursor += toCopy; + + // When `_buffer` is full, it's content gets parsed either as header or payload depending on + // the actual state. + + // If it's header then the parsed data is stored in a temporary variable and then the + // deserializer waits for the specified length payload. + if ((this._cursor === this._buffer.length) && this._waitingForHeader) { + var payloadSize = Deserializer.commonHeader(this._buffer, this._frame); + if (payloadSize <= MAX_PAYLOAD_SIZE) { + this._next(payloadSize); + } else { + this.emit('error', 'FRAME_SIZE_ERROR'); + return; + } + } + + // If it's payload then the the frame object is finalized and then gets pushed out. + // Unknown frame types are ignored. + // + // Note: If we just finished the parsing of a header and the payload length is 0, this branch + // will also run. + if ((this._cursor === this._buffer.length) && !this._waitingForHeader) { + if (this._frame.type) { + var error = Deserializer[this._frame.type](this._buffer, this._frame, this._role); + if (error) { + this._log.error('Incoming frame parsing error: ' + error); + this.emit('error', error); + } else { + this._log.trace({ frame: this._frame }, 'Incoming frame'); + this.push(this._frame); + } + } else { + this._log.error('Unknown type incoming frame'); + // Ignore it other than logging + } + this._next(COMMON_HEADER_SIZE); + } + } + + done(); +}; + +// [Frame Header](https://tools.ietf.org/html/rfc7540#section-4.1) +// -------------------------------------------------------------- +// +// HTTP/2 frames share a common base format consisting of a 9-byte header followed by 0 to 2^24 - 1 +// bytes of data. +// +// Additional size limits can be set by specific application uses. HTTP limits the frame size to +// 16,384 octets by default, though this can be increased by a receiver. +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Length (24) | +// +---------------+---------------+---------------+ +// | Type (8) | Flags (8) | +// +-+-----------------------------+---------------+---------------+ +// |R| Stream Identifier (31) | +// +-+-------------------------------------------------------------+ +// | Frame Data (0...) ... +// +---------------------------------------------------------------+ +// +// The fields of the frame header are defined as: +// +// * Length: +// The length of the frame data expressed as an unsigned 24-bit integer. The 9 bytes of the frame +// header are not included in this value. +// +// * Type: +// The 8-bit type of the frame. The frame type determines how the remainder of the frame header +// and data are interpreted. Implementations MUST ignore unsupported and unrecognized frame types. +// +// * Flags: +// An 8-bit field reserved for frame-type specific boolean flags. +// +// Flags are assigned semantics specific to the indicated frame type. Flags that have no defined +// semantics for a particular frame type MUST be ignored, and MUST be left unset (0) when sending. +// +// * R: +// A reserved 1-bit field. The semantics of this bit are undefined and the bit MUST remain unset +// (0) when sending and MUST be ignored when receiving. +// +// * Stream Identifier: +// A 31-bit stream identifier. The value 0 is reserved for frames that are associated with the +// connection as a whole as opposed to an individual stream. +// +// The structure and content of the remaining frame data is dependent entirely on the frame type. + +var COMMON_HEADER_SIZE = 9; + +var frameTypes = []; + +var frameFlags = {}; + +var genericAttributes = ['type', 'flags', 'stream']; + +var typeSpecificAttributes = {}; + +Serializer.commonHeader = function writeCommonHeader(frame, buffers) { + var headerBuffer = new Buffer(COMMON_HEADER_SIZE); + + var size = 0; + for (var i = 0; i < buffers.length; i++) { + size += buffers[i].length; + } + headerBuffer.writeUInt8(0, 0); + headerBuffer.writeUInt16BE(size, 1); + + var typeId = frameTypes.indexOf(frame.type); // If we are here then the type is valid for sure + headerBuffer.writeUInt8(typeId, 3); + + var flagByte = 0; + for (var flag in frame.flags) { + var position = frameFlags[frame.type].indexOf(flag); + assert(position !== -1, 'Unknown flag for frame type ' + frame.type + ': ' + flag); + if (frame.flags[flag]) { + flagByte |= (1 << position); + } + } + headerBuffer.writeUInt8(flagByte, 4); + + assert((0 <= frame.stream) && (frame.stream < 0x7fffffff), frame.stream); + headerBuffer.writeUInt32BE(frame.stream || 0, 5); + + buffers.unshift(headerBuffer); + + return size; +}; + +Deserializer.commonHeader = function readCommonHeader(buffer, frame) { + if (buffer.length < 9) { + return 'FRAME_SIZE_ERROR'; + } + + var totallyWastedByte = buffer.readUInt8(0); + var length = buffer.readUInt16BE(1); + // We do this just for sanity checking later on, to make sure no one sent us a + // frame that's super large. + length += totallyWastedByte << 16; + + frame.type = frameTypes[buffer.readUInt8(3)]; + if (!frame.type) { + // We are required to ignore unknown frame types + return length; + } + + frame.flags = {}; + var flagByte = buffer.readUInt8(4); + var definedFlags = frameFlags[frame.type]; + for (var i = 0; i < definedFlags.length; i++) { + frame.flags[definedFlags[i]] = Boolean(flagByte & (1 << i)); + } + + frame.stream = buffer.readUInt32BE(5) & 0x7fffffff; + + return length; +}; + +// Frame types +// =========== + +// Every frame type is registered in the following places: +// +// * `frameTypes`: a register of frame type codes (used by `commonHeader()`) +// * `frameFlags`: a register of valid flags for frame types (used by `commonHeader()`) +// * `typeSpecificAttributes`: a register of frame specific frame object attributes (used by +// logging code and also serves as documentation for frame objects) + +// [DATA Frames](https://tools.ietf.org/html/rfc7540#section-6.1) +// ------------------------------------------------------------ +// +// DATA frames (type=0x0) convey arbitrary, variable-length sequences of octets associated with a +// stream. +// +// The DATA frame defines the following flags: +// +// * END_STREAM (0x1): +// Bit 1 being set indicates that this frame is the last that the endpoint will send for the +// identified stream. +// * PADDED (0x08): +// Bit 4 being set indicates that the Pad Length field is present. + +frameTypes[0x0] = 'DATA'; + +frameFlags.DATA = ['END_STREAM', 'RESERVED2', 'RESERVED4', 'PADDED']; + +typeSpecificAttributes.DATA = ['data']; + +Serializer.DATA = function writeData(frame, buffers) { + buffers.push(frame.data); +}; + +Deserializer.DATA = function readData(buffer, frame) { + var dataOffset = 0; + var paddingLength = 0; + if (frame.flags.PADDED) { + if (buffer.length < 1) { + // We must have at least one byte for padding control, but we don't. Bad peer! + return 'FRAME_SIZE_ERROR'; + } + paddingLength = (buffer.readUInt8(dataOffset) & 0xff); + dataOffset = 1; + } + + if (paddingLength) { + if (paddingLength >= (buffer.length - 1)) { + // We don't have enough room for the padding advertised - bad peer! + return 'FRAME_SIZE_ERROR'; + } + frame.data = buffer.slice(dataOffset, -1 * paddingLength); + } else { + frame.data = buffer.slice(dataOffset); + } +}; + +// [HEADERS](https://tools.ietf.org/html/rfc7540#section-6.2) +// -------------------------------------------------------------- +// +// The HEADERS frame (type=0x1) allows the sender to create a stream. +// +// The HEADERS frame defines the following flags: +// +// * END_STREAM (0x1): +// Bit 1 being set indicates that this frame is the last that the endpoint will send for the +// identified stream. +// * END_HEADERS (0x4): +// The END_HEADERS bit indicates that this frame contains the entire payload necessary to provide +// a complete set of headers. +// * PADDED (0x08): +// Bit 4 being set indicates that the Pad Length field is present. +// * PRIORITY (0x20): +// Bit 6 being set indicates that the Exlusive Flag (E), Stream Dependency, and Weight fields are +// present. + +frameTypes[0x1] = 'HEADERS'; + +frameFlags.HEADERS = ['END_STREAM', 'RESERVED2', 'END_HEADERS', 'PADDED', 'RESERVED5', 'PRIORITY']; + +typeSpecificAttributes.HEADERS = ['priorityDependency', 'priorityWeight', 'exclusiveDependency', 'headers', 'data']; + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |Pad Length? (8)| +// +-+-------------+---------------+-------------------------------+ +// |E| Stream Dependency? (31) | +// +-+-------------+-----------------------------------------------+ +// | Weight? (8) | +// +-+-------------+-----------------------------------------------+ +// | Header Block Fragment (*) ... +// +---------------------------------------------------------------+ +// | Padding (*) ... +// +---------------------------------------------------------------+ +// +// The payload of a HEADERS frame contains a Headers Block + +Serializer.HEADERS = function writeHeadersPriority(frame, buffers) { + if (frame.flags.PRIORITY) { + var buffer = new Buffer(5); + assert((0 <= frame.priorityDependency) && (frame.priorityDependency <= 0x7fffffff), frame.priorityDependency); + buffer.writeUInt32BE(frame.priorityDependency, 0); + if (frame.exclusiveDependency) { + buffer[0] |= 0x80; + } + assert((0 <= frame.priorityWeight) && (frame.priorityWeight <= 0xff), frame.priorityWeight); + buffer.writeUInt8(frame.priorityWeight, 4); + buffers.push(buffer); + } + buffers.push(frame.data); +}; + +Deserializer.HEADERS = function readHeadersPriority(buffer, frame) { + var minFrameLength = 0; + if (frame.flags.PADDED) { + minFrameLength += 1; + } + if (frame.flags.PRIORITY) { + minFrameLength += 5; + } + if (buffer.length < minFrameLength) { + // Peer didn't send enough data - bad peer! + return 'FRAME_SIZE_ERROR'; + } + + var dataOffset = 0; + var paddingLength = 0; + if (frame.flags.PADDED) { + paddingLength = (buffer.readUInt8(dataOffset) & 0xff); + dataOffset = 1; + } + + if (frame.flags.PRIORITY) { + var dependencyData = new Buffer(4); + buffer.copy(dependencyData, 0, dataOffset, dataOffset + 4); + dataOffset += 4; + frame.exclusiveDependency = !!(dependencyData[0] & 0x80); + dependencyData[0] &= 0x7f; + frame.priorityDependency = dependencyData.readUInt32BE(0); + frame.priorityWeight = buffer.readUInt8(dataOffset); + dataOffset += 1; + } + + if (paddingLength) { + if ((buffer.length - dataOffset) < paddingLength) { + // Not enough data left to satisfy the advertised padding - bad peer! + return 'FRAME_SIZE_ERROR'; + } + frame.data = buffer.slice(dataOffset, -1 * paddingLength); + } else { + frame.data = buffer.slice(dataOffset); + } +}; + +// [PRIORITY](https://tools.ietf.org/html/rfc7540#section-6.3) +// ------------------------------------------------------- +// +// The PRIORITY frame (type=0x2) specifies the sender-advised priority of a stream. +// +// The PRIORITY frame does not define any flags. + +frameTypes[0x2] = 'PRIORITY'; + +frameFlags.PRIORITY = []; + +typeSpecificAttributes.PRIORITY = ['priorityDependency', 'priorityWeight', 'exclusiveDependency']; + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |E| Stream Dependency? (31) | +// +-+-------------+-----------------------------------------------+ +// | Weight? (8) | +// +-+-------------+ +// +// The payload of a PRIORITY frame contains an exclusive bit, a 31-bit dependency, and an 8-bit weight + +Serializer.PRIORITY = function writePriority(frame, buffers) { + var buffer = new Buffer(5); + assert((0 <= frame.priorityDependency) && (frame.priorityDependency <= 0x7fffffff), frame.priorityDependency); + buffer.writeUInt32BE(frame.priorityDependency, 0); + if (frame.exclusiveDependency) { + buffer[0] |= 0x80; + } + assert((0 <= frame.priorityWeight) && (frame.priorityWeight <= 0xff), frame.priorityWeight); + buffer.writeUInt8(frame.priorityWeight, 4); + + buffers.push(buffer); +}; + +Deserializer.PRIORITY = function readPriority(buffer, frame) { + if (buffer.length < 5) { + // PRIORITY frames are 5 bytes long. Bad peer! + return 'FRAME_SIZE_ERROR'; + } + var dependencyData = new Buffer(4); + buffer.copy(dependencyData, 0, 0, 4); + frame.exclusiveDependency = !!(dependencyData[0] & 0x80); + dependencyData[0] &= 0x7f; + frame.priorityDependency = dependencyData.readUInt32BE(0); + frame.priorityWeight = buffer.readUInt8(4); +}; + +// [RST_STREAM](https://tools.ietf.org/html/rfc7540#section-6.4) +// ----------------------------------------------------------- +// +// The RST_STREAM frame (type=0x3) allows for abnormal termination of a stream. +// +// No type-flags are defined. + +frameTypes[0x3] = 'RST_STREAM'; + +frameFlags.RST_STREAM = []; + +typeSpecificAttributes.RST_STREAM = ['error']; + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Error Code (32) | +// +---------------------------------------------------------------+ +// +// The RST_STREAM frame contains a single unsigned, 32-bit integer identifying the error +// code (see Error Codes). The error code indicates why the stream is being terminated. + +Serializer.RST_STREAM = function writeRstStream(frame, buffers) { + var buffer = new Buffer(4); + var code = errorCodes.indexOf(frame.error); + assert((0 <= code) && (code <= 0xffffffff), code); + buffer.writeUInt32BE(code, 0); + buffers.push(buffer); +}; + +Deserializer.RST_STREAM = function readRstStream(buffer, frame) { + if (buffer.length < 4) { + // RST_STREAM is 4 bytes long. Bad peer! + return 'FRAME_SIZE_ERROR'; + } + frame.error = errorCodes[buffer.readUInt32BE(0)]; + if (!frame.error) { + // Unknown error codes are considered equivalent to INTERNAL_ERROR + frame.error = 'INTERNAL_ERROR'; + } +}; + +// [SETTINGS](https://tools.ietf.org/html/rfc7540#section-6.5) +// ------------------------------------------------------- +// +// The SETTINGS frame (type=0x4) conveys configuration parameters that affect how endpoints +// communicate. +// +// The SETTINGS frame defines the following flag: + +// * ACK (0x1): +// Bit 1 being set indicates that this frame acknowledges receipt and application of the peer's +// SETTINGS frame. +frameTypes[0x4] = 'SETTINGS'; + +frameFlags.SETTINGS = ['ACK']; + +typeSpecificAttributes.SETTINGS = ['settings']; + +// The payload of a SETTINGS frame consists of zero or more settings. Each setting consists of a +// 16-bit identifier, and an unsigned 32-bit value. +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Identifier(16) | Value (32) | +// +-----------------+---------------------------------------------+ +// ...Value | +// +---------------------------------+ +// +// Each setting in a SETTINGS frame replaces the existing value for that setting. Settings are +// processed in the order in which they appear, and a receiver of a SETTINGS frame does not need to +// maintain any state other than the current value of settings. Therefore, the value of a setting +// is the last value that is seen by a receiver. This permits the inclusion of the same settings +// multiple times in the same SETTINGS frame, though doing so does nothing other than waste +// connection capacity. + +Serializer.SETTINGS = function writeSettings(frame, buffers) { + var settings = [], settingsLeft = Object.keys(frame.settings); + definedSettings.forEach(function(setting, id) { + if (setting.name in frame.settings) { + settingsLeft.splice(settingsLeft.indexOf(setting.name), 1); + var value = frame.settings[setting.name]; + settings.push({ id: id, value: setting.flag ? Boolean(value) : value }); + } + }); + assert(settingsLeft.length === 0, 'Unknown settings: ' + settingsLeft.join(', ')); + + var buffer = new Buffer(settings.length * 6); + for (var i = 0; i < settings.length; i++) { + buffer.writeUInt16BE(settings[i].id & 0xffff, i*6); + buffer.writeUInt32BE(settings[i].value, i*6 + 2); + } + + buffers.push(buffer); +}; + +Deserializer.SETTINGS = function readSettings(buffer, frame, role) { + frame.settings = {}; + + // Receipt of a SETTINGS frame with the ACK flag set and a length + // field value other than 0 MUST be treated as a connection error + // (Section 5.4.1) of type FRAME_SIZE_ERROR. + if(frame.flags.ACK && buffer.length != 0) { + return 'FRAME_SIZE_ERROR'; + } + + if (buffer.length % 6 !== 0) { + return 'PROTOCOL_ERROR'; + } + for (var i = 0; i < buffer.length / 6; i++) { + var id = buffer.readUInt16BE(i*6) & 0xffff; + var setting = definedSettings[id]; + if (setting) { + if (role == 'CLIENT' && setting.name == 'SETTINGS_ENABLE_PUSH') { + return 'SETTINGS frame on client got SETTINGS_ENABLE_PUSH'; + } + var value = buffer.readUInt32BE(i*6 + 2); + frame.settings[setting.name] = setting.flag ? Boolean(value & 0x1) : value; + } + } +}; + +// The following settings are defined: +var definedSettings = []; + +// * SETTINGS_HEADER_TABLE_SIZE (1): +// Allows the sender to inform the remote endpoint of the size of the header compression table +// used to decode header blocks. +definedSettings[1] = { name: 'SETTINGS_HEADER_TABLE_SIZE', flag: false }; + +// * SETTINGS_ENABLE_PUSH (2): +// This setting can be use to disable server push. An endpoint MUST NOT send a PUSH_PROMISE frame +// if it receives this setting set to a value of 0. The default value is 1, which indicates that +// push is permitted. +definedSettings[2] = { name: 'SETTINGS_ENABLE_PUSH', flag: true }; + +// * SETTINGS_MAX_CONCURRENT_STREAMS (3): +// indicates the maximum number of concurrent streams that the sender will allow. +definedSettings[3] = { name: 'SETTINGS_MAX_CONCURRENT_STREAMS', flag: false }; + +// * SETTINGS_INITIAL_WINDOW_SIZE (4): +// indicates the sender's initial stream window size (in bytes) for new streams. +definedSettings[4] = { name: 'SETTINGS_INITIAL_WINDOW_SIZE', flag: false }; + +// * SETTINGS_MAX_FRAME_SIZE (5): +// indicates the maximum size of a frame the receiver will allow. +definedSettings[5] = { name: 'SETTINGS_MAX_FRAME_SIZE', flag: false }; + +// [PUSH_PROMISE](https://tools.ietf.org/html/rfc7540#section-6.6) +// --------------------------------------------------------------- +// +// The PUSH_PROMISE frame (type=0x5) is used to notify the peer endpoint in advance of streams the +// sender intends to initiate. +// +// The PUSH_PROMISE frame defines the following flags: +// +// * END_PUSH_PROMISE (0x4): +// The END_PUSH_PROMISE bit indicates that this frame contains the entire payload necessary to +// provide a complete set of headers. + +frameTypes[0x5] = 'PUSH_PROMISE'; + +frameFlags.PUSH_PROMISE = ['RESERVED1', 'RESERVED2', 'END_PUSH_PROMISE', 'PADDED']; + +typeSpecificAttributes.PUSH_PROMISE = ['promised_stream', 'headers', 'data']; + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |Pad Length? (8)| +// +-+-------------+-----------------------------------------------+ +// |X| Promised-Stream-ID (31) | +// +-+-------------------------------------------------------------+ +// | Header Block Fragment (*) ... +// +---------------------------------------------------------------+ +// | Padding (*) ... +// +---------------------------------------------------------------+ +// +// The PUSH_PROMISE frame includes the unsigned 31-bit identifier of +// the stream the endpoint plans to create along with a minimal set of headers that provide +// additional context for the stream. + +Serializer.PUSH_PROMISE = function writePushPromise(frame, buffers) { + var buffer = new Buffer(4); + + var promised_stream = frame.promised_stream; + assert((0 <= promised_stream) && (promised_stream <= 0x7fffffff), promised_stream); + buffer.writeUInt32BE(promised_stream, 0); + + buffers.push(buffer); + buffers.push(frame.data); +}; + +Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame) { + if (buffer.length < 4) { + return 'FRAME_SIZE_ERROR'; + } + var dataOffset = 0; + var paddingLength = 0; + if (frame.flags.PADDED) { + if (buffer.length < 5) { + return 'FRAME_SIZE_ERROR'; + } + paddingLength = (buffer.readUInt8(dataOffset) & 0xff); + dataOffset = 1; + } + frame.promised_stream = buffer.readUInt32BE(dataOffset) & 0x7fffffff; + dataOffset += 4; + if (paddingLength) { + if ((buffer.length - dataOffset) < paddingLength) { + return 'FRAME_SIZE_ERROR'; + } + frame.data = buffer.slice(dataOffset, -1 * paddingLength); + } else { + frame.data = buffer.slice(dataOffset); + } +}; + +// [PING](https://tools.ietf.org/html/rfc7540#section-6.7) +// ----------------------------------------------- +// +// The PING frame (type=0x6) is a mechanism for measuring a minimal round-trip time from the +// sender, as well as determining whether an idle connection is still functional. +// +// The PING frame defines one type-specific flag: +// +// * ACK (0x1): +// Bit 1 being set indicates that this PING frame is a PING response. + +frameTypes[0x6] = 'PING'; + +frameFlags.PING = ['ACK']; + +typeSpecificAttributes.PING = ['data']; + +// In addition to the frame header, PING frames MUST contain 8 additional octets of opaque data. + +Serializer.PING = function writePing(frame, buffers) { + buffers.push(frame.data); +}; + +Deserializer.PING = function readPing(buffer, frame) { + if (buffer.length !== 8) { + return 'FRAME_SIZE_ERROR'; + } + frame.data = buffer; +}; + +// [GOAWAY](https://tools.ietf.org/html/rfc7540#section-6.8) +// --------------------------------------------------- +// +// The GOAWAY frame (type=0x7) informs the remote peer to stop creating streams on this connection. +// +// The GOAWAY frame does not define any flags. + +frameTypes[0x7] = 'GOAWAY'; + +frameFlags.GOAWAY = []; + +typeSpecificAttributes.GOAWAY = ['last_stream', 'error']; + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |X| Last-Stream-ID (31) | +// +-+-------------------------------------------------------------+ +// | Error Code (32) | +// +---------------------------------------------------------------+ +// +// The last stream identifier in the GOAWAY frame contains the highest numbered stream identifier +// for which the sender of the GOAWAY frame has received frames on and might have taken some action +// on. +// +// The GOAWAY frame also contains a 32-bit error code (see Error Codes) that contains the reason for +// closing the connection. + +Serializer.GOAWAY = function writeGoaway(frame, buffers) { + var buffer = new Buffer(8); + + var last_stream = frame.last_stream; + assert((0 <= last_stream) && (last_stream <= 0x7fffffff), last_stream); + buffer.writeUInt32BE(last_stream, 0); + + var code = errorCodes.indexOf(frame.error); + assert((0 <= code) && (code <= 0xffffffff), code); + buffer.writeUInt32BE(code, 4); + + buffers.push(buffer); +}; + +Deserializer.GOAWAY = function readGoaway(buffer, frame) { + if (buffer.length !== 8) { + // GOAWAY must have 8 bytes + return 'FRAME_SIZE_ERROR'; + } + frame.last_stream = buffer.readUInt32BE(0) & 0x7fffffff; + frame.error = errorCodes[buffer.readUInt32BE(4)]; + if (!frame.error) { + // Unknown error types are to be considered equivalent to INTERNAL ERROR + frame.error = 'INTERNAL_ERROR'; + } +}; + +// [WINDOW_UPDATE](https://tools.ietf.org/html/rfc7540#section-6.9) +// ----------------------------------------------------------------- +// +// The WINDOW_UPDATE frame (type=0x8) is used to implement flow control. +// +// The WINDOW_UPDATE frame does not define any flags. + +frameTypes[0x8] = 'WINDOW_UPDATE'; + +frameFlags.WINDOW_UPDATE = []; + +typeSpecificAttributes.WINDOW_UPDATE = ['window_size']; + +// The payload of a WINDOW_UPDATE frame is a 32-bit value indicating the additional number of bytes +// that the sender can transmit in addition to the existing flow control window. The legal range +// for this field is 1 to 2^31 - 1 (0x7fffffff) bytes; the most significant bit of this value is +// reserved. + +Serializer.WINDOW_UPDATE = function writeWindowUpdate(frame, buffers) { + var buffer = new Buffer(4); + + var window_size = frame.window_size; + assert((0 < window_size) && (window_size <= 0x7fffffff), window_size); + buffer.writeUInt32BE(window_size, 0); + + buffers.push(buffer); +}; + +Deserializer.WINDOW_UPDATE = function readWindowUpdate(buffer, frame) { + if (buffer.length !== WINDOW_UPDATE_PAYLOAD_SIZE) { + return 'FRAME_SIZE_ERROR'; + } + frame.window_size = buffer.readUInt32BE(0) & 0x7fffffff; + if (frame.window_size === 0) { + return 'PROTOCOL_ERROR'; + } +}; + +// [CONTINUATION](https://tools.ietf.org/html/rfc7540#section-6.10) +// ------------------------------------------------------------ +// +// The CONTINUATION frame (type=0x9) is used to continue a sequence of header block fragments. +// +// The CONTINUATION frame defines the following flag: +// +// * END_HEADERS (0x4): +// The END_HEADERS bit indicates that this frame ends the sequence of header block fragments +// necessary to provide a complete set of headers. + +frameTypes[0x9] = 'CONTINUATION'; + +frameFlags.CONTINUATION = ['RESERVED1', 'RESERVED2', 'END_HEADERS']; + +typeSpecificAttributes.CONTINUATION = ['headers', 'data']; + +Serializer.CONTINUATION = function writeContinuation(frame, buffers) { + buffers.push(frame.data); +}; + +Deserializer.CONTINUATION = function readContinuation(buffer, frame) { + frame.data = buffer; +}; + +// [ALTSVC](https://tools.ietf.org/html/rfc7838#section-4) +// ------------------------------------------------------------ +// +// The ALTSVC frame (type=0xA) advertises the availability of an alternative service to the client. +// +// The ALTSVC frame does not define any flags. + +frameTypes[0xA] = 'ALTSVC'; + +frameFlags.ALTSVC = []; + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Origin-Len (16) | Origin? (*) ... +// +-------------------------------+----------------+--------------+ +// | Alt-Svc-Field-Value (*) ... +// +---------------------------------------------------------------+ +// +// The ALTSVC frame contains the following fields: +// +// Origin-Len: An unsigned, 16-bit integer indicating the length, in +// octets, of the Origin field. +// +// Origin: An OPTIONAL sequence of characters containing ASCII +// serialisation of an origin ([RFC6454](https://tools.ietf.org/html/rfc6454), +// Section 6.2) that the alternate service is applicable to. +// +// Alt-Svc-Field-Value: A sequence of octets (length determined by +// subtracting the length of all preceding fields from the frame +// length) containing a value identical to the Alt-Svc field value +// defined in (Section 3)[https://tools.ietf.org/html/rfc7838#section-3] +// (ABNF production "Alt-Svc"). + +typeSpecificAttributes.ALTSVC = ['maxAge', 'port', 'protocolID', 'host', + 'origin']; + +function istchar(c) { + return ('!#$&\'*+-.^_`|~1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.indexOf(c) > -1); +} + +function hexencode(s) { + var t = ''; + for (var i = 0; i < s.length; i++) { + if (!istchar(s[i])) { + t += '%'; + t += new Buffer(s[i]).toString('hex'); + } else { + t += s[i]; + } + } + return t; +} + +Serializer.ALTSVC = function writeAltSvc(frame, buffers) { + var buffer = new Buffer(2); + buffer.writeUInt16BE(frame.origin.length, 0); + buffers.push(buffer); + buffers.push(new Buffer(frame.origin, 'ascii')); + + var fieldValue = hexencode(frame.protocolID) + '="' + frame.host + ':' + frame.port + '"'; + if (frame.maxAge !== 86400) { // 86400 is the default + fieldValue += "; ma=" + frame.maxAge; + } + + buffers.push(new Buffer(fieldValue, 'ascii')); +}; + +function stripquotes(s) { + var start = 0; + var end = s.length; + while ((start < end) && (s[start] === '"')) { + start++; + } + while ((end > start) && (s[end - 1] === '"')) { + end--; + } + if (start >= end) { + return ""; + } + return s.substring(start, end); +} + +function splitNameValue(nvpair) { + var eq = -1; + var inQuotes = false; + + for (var i = 0; i < nvpair.length; i++) { + if (nvpair[i] === '"') { + inQuotes = !inQuotes; + continue; + } + if (inQuotes) { + continue; + } + if (nvpair[i] === '=') { + eq = i; + break; + } + } + + if (eq === -1) { + return {'name': nvpair, 'value': null}; + } + + var name = stripquotes(nvpair.substring(0, eq).trim()); + var value = stripquotes(nvpair.substring(eq + 1).trim()); + return {'name': name, 'value': value}; +} + +function splitHeaderParameters(hv) { + return parseHeaderValue(hv, ';', splitNameValue); +} + +function parseHeaderValue(hv, separator, callback) { + var start = 0; + var inQuotes = false; + var values = []; + + for (var i = 0; i < hv.length; i++) { + if (hv[i] === '"') { + inQuotes = !inQuotes; + continue; + } + if (inQuotes) { + // Just skip this + continue; + } + if (hv[i] === separator) { + var newValue = hv.substring(start, i).trim(); + if (newValue.length > 0) { + newValue = callback(newValue); + values.push(newValue); + } + start = i + 1; + } + } + + var newValue = hv.substring(start).trim(); + if (newValue.length > 0) { + newValue = callback(newValue); + values.push(newValue); + } + + return values; +} + +function rsplit(s, delim, count) { + var nsplits = 0; + var end = s.length; + var rval = []; + for (var i = s.length - 1; i >= 0; i--) { + if (s[i] === delim) { + var t = s.substring(i + 1, end); + end = i; + rval.unshift(t); + nsplits++; + if (nsplits === count) { + break; + } + } + } + if (end !== 0) { + rval.unshift(s.substring(0, end)); + } + return rval; +} + +function ishex(c) { + return ('0123456789ABCDEFabcdef'.indexOf(c) > -1); +} + +function unescape(s) { + var i = 0; + var t = ''; + while (i < s.length) { + if (s[i] != '%' || !ishex(s[i + 1]) || !ishex(s[i + 2])) { + t += s[i]; + } else { + ++i; + var hexvalue = ''; + if (i < s.length) { + hexvalue += s[i]; + ++i; + } + if (i < s.length) { + hexvalue += s[i]; + } + if (hexvalue.length > 0) { + t += new Buffer(hexvalue, 'hex').toString(); + } else { + t += '%'; + } + } + + ++i; + } + return t; +} + +Deserializer.ALTSVC = function readAltSvc(buffer, frame) { + if (buffer.length < 2) { + return 'FRAME_SIZE_ERROR'; + } + var originLength = buffer.readUInt16BE(0); + if ((buffer.length - 2) < originLength) { + return 'FRAME_SIZE_ERROR'; + } + frame.origin = buffer.toString('ascii', 2, 2 + originLength); + var fieldValue = buffer.toString('ascii', 2 + originLength); + var values = parseHeaderValue(fieldValue, ',', splitHeaderParameters); + if (values.length > 1) { + // TODO - warn that we only use one here + } + if (values.length === 0) { + // Well that's a malformed frame. Just ignore it. + return; + } + + var chosenAltSvc = values[0]; + frame.maxAge = 86400; // Default + for (var i = 0; i < chosenAltSvc.length; i++) { + if (i === 0) { + // This corresponds to the protocolID="<host>:<port>" item + frame.protocolID = unescape(chosenAltSvc[i].name); + var hostport = rsplit(chosenAltSvc[i].value, ':', 1); + frame.host = hostport[0]; + frame.port = parseInt(hostport[1], 10); + } else if (chosenAltSvc[i].name == 'ma') { + frame.maxAge = parseInt(chosenAltSvc[i].value, 10); + } + // Otherwise, we just ignore this + } +}; + +// BLOCKED +// ------------------------------------------------------------ +// +// The BLOCKED frame (type=0xB) indicates that the sender is unable to send data +// due to a closed flow control window. +// +// The BLOCKED frame does not define any flags and contains no payload. + +frameTypes[0xB] = 'BLOCKED'; + +frameFlags.BLOCKED = []; + +typeSpecificAttributes.BLOCKED = []; + +Serializer.BLOCKED = function writeBlocked(frame, buffers) { +}; + +Deserializer.BLOCKED = function readBlocked(buffer, frame) { +}; + +// [Error Codes](https://tools.ietf.org/html/rfc7540#section-7) +// ------------------------------------------------------------ + +var errorCodes = [ + 'NO_ERROR', + 'PROTOCOL_ERROR', + 'INTERNAL_ERROR', + 'FLOW_CONTROL_ERROR', + 'SETTINGS_TIMEOUT', + 'STREAM_CLOSED', + 'FRAME_SIZE_ERROR', + 'REFUSED_STREAM', + 'CANCEL', + 'COMPRESSION_ERROR', + 'CONNECT_ERROR', + 'ENHANCE_YOUR_CALM', + 'INADEQUATE_SECURITY', + 'HTTP_1_1_REQUIRED' +]; + +// Logging +// ------- + +// [Bunyan serializers](https://github.com/trentm/node-bunyan#serializers) to improve logging output +// for debug messages emitted in this component. +exports.serializers = {}; + +// * `frame` serializer: it transforms data attributes from Buffers to hex strings and filters out +// flags that are not present. +var frameCounter = 0; +exports.serializers.frame = function(frame) { + if (!frame) { + return null; + } + + if ('id' in frame) { + return frame.id; + } + + frame.id = frameCounter; + frameCounter += 1; + + var logEntry = { id: frame.id }; + genericAttributes.concat(typeSpecificAttributes[frame.type]).forEach(function(name) { + logEntry[name] = frame[name]; + }); + + if (frame.data instanceof Buffer) { + if (logEntry.data.length > 50) { + logEntry.data = frame.data.slice(0, 47).toString('hex') + '...'; + } else { + logEntry.data = frame.data.toString('hex'); + } + + if (!('length' in logEntry)) { + logEntry.length = frame.data.length; + } + } + + if (frame.promised_stream instanceof Object) { + logEntry.promised_stream = 'stream-' + frame.promised_stream.id; + } + + logEntry.flags = Object.keys(frame.flags || {}).filter(function(name) { + return frame.flags[name] === true; + }); + + return logEntry; +}; + +// * `data` serializer: it simply transforms a buffer to a hex string. +exports.serializers.data = function(data) { + return data.toString('hex'); +}; diff --git a/testing/xpcshell/node-http2/lib/protocol/index.js b/testing/xpcshell/node-http2/lib/protocol/index.js new file mode 100644 index 000000000..0f3720e2c --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/index.js @@ -0,0 +1,91 @@ +// This is an implementation of the [HTTP/2][http2] +// framing layer for [node.js][node]. +// +// The main building blocks are [node.js streams][node-stream] that are connected through pipes. +// +// The main components are: +// +// * [Endpoint](endpoint.html): represents an HTTP/2 endpoint (client or server). It's +// responsible for the the first part of the handshake process (sending/receiving the +// [connection header][http2-connheader]) and manages other components (framer, compressor, +// connection, streams) that make up a client or server. +// +// * [Connection](connection.html): multiplexes the active HTTP/2 streams, manages connection +// lifecycle and settings, and responsible for enforcing the connection level limits (flow +// control, initiated stream limit) +// +// * [Stream](stream.html): implementation of the [HTTP/2 stream concept][http2-stream]. +// Implements the [stream state machine][http2-streamstate] defined by the standard, provides +// management methods and events for using the stream (sending/receiving headers, data, etc.), +// and enforces stream level constraints (flow control, sending only legal frames). +// +// * [Flow](flow.html): implements flow control for Connection and Stream as parent class. +// +// * [Compressor and Decompressor](compressor.html): compression and decompression of HEADER and +// PUSH_PROMISE frames +// +// * [Serializer and Deserializer](framer.html): the lowest layer in the stack that transforms +// between the binary and the JavaScript object representation of HTTP/2 frames +// +// [http2]: https://tools.ietf.org/html/rfc7540 +// [http2-connheader]: https://tools.ietf.org/html/rfc7540#section-3.5 +// [http2-stream]: https://tools.ietf.org/html/rfc7540#section-5 +// [http2-streamstate]: https://tools.ietf.org/html/rfc7540#section-5.1 +// [node]: https://nodejs.org/ +// [node-stream]: https://nodejs.org/api/stream.html +// [node-https]: https://nodejs.org/api/https.html +// [node-http]: https://nodejs.org/api/http.html + +exports.VERSION = 'h2'; + +exports.Endpoint = require('./endpoint').Endpoint; + +/* Bunyan serializers exported by submodules that are worth adding when creating a logger. */ +exports.serializers = {}; +var modules = ['./framer', './compressor', './flow', './connection', './stream', './endpoint']; +modules.map(require).forEach(function(module) { + for (var name in module.serializers) { + exports.serializers[name] = module.serializers[name]; + } +}); + +/* + Stream API Endpoint API + Stream data + + | ^ | ^ + | | | | + | | | | + +-----------|------------|---------------------------------------+ + | | | Endpoint | + | | | | + | +-------|------------|-----------------------------------+ | + | | | | Connection | | + | | v | | | + | | +-----------------------+ +-------------------- | | + | | | Stream | | Stream ... | | + | | +-----------------------+ +-------------------- | | + | | | ^ | ^ | | + | | v | v | | | + | | +------------+--+--------+--+------------+- ... | | + | | | ^ | | + | | | | | | + | +-----------------------|--------|-----------------------+ | + | | | | + | v | | + | +--------------------------+ +--------------------------+ | + | | Compressor | | Decompressor | | + | +--------------------------+ +--------------------------+ | + | | ^ | + | v | | + | +--------------------------+ +--------------------------+ | + | | Serializer | | Deserializer | | + | +--------------------------+ +--------------------------+ | + | | ^ | + +---------------------------|--------|---------------------------+ + | | + v | + + Raw data + +*/ diff --git a/testing/xpcshell/node-http2/lib/protocol/stream.js b/testing/xpcshell/node-http2/lib/protocol/stream.js new file mode 100644 index 000000000..6d520b949 --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/stream.js @@ -0,0 +1,659 @@ +var assert = require('assert'); + +// The Stream class +// ================ + +// Stream is a [Duplex stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex) +// subclass that implements the [HTTP/2 Stream](https://tools.ietf.org/html/rfc7540#section-5) +// concept. It has two 'sides': one that is used by the user to send/receive data (the `stream` +// object itself) and one that is used by a Connection to read/write frames to/from the other peer +// (`stream.upstream`). + +var Duplex = require('stream').Duplex; + +exports.Stream = Stream; + +// Public API +// ---------- + +// * **new Stream(log, connection)**: create a new Stream +// +// * **Event: 'headers' (headers)**: signals incoming headers +// +// * **Event: 'promise' (stream, headers)**: signals an incoming push promise +// +// * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0 +// (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. +// +// * **Event: 'error' (type)**: signals an error +// +// * **headers(headers)**: send headers +// +// * **promise(headers): Stream**: promise a stream +// +// * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer +// too, but once it is set locally, it can not be changed remotely. +// +// * **reset(error)**: reset the stream with an error code +// +// * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames +// that are to be sent/arrived to/from the peer and are related to this stream. +// +// Headers are always in the [regular node.js header format][1]. +// [1]: https://nodejs.org/api/http.html#http_message_headers + +// Constructor +// ----------- + +// The main aspects of managing the stream are: +function Stream(log, connection) { + Duplex.call(this); + + // * logging + this._log = log.child({ component: 'stream', s: this }); + + // * receiving and sending stream management commands + this._initializeManagement(); + + // * sending and receiving frames to/from the upstream connection + this._initializeDataFlow(); + + // * maintaining the state of the stream (idle, open, closed, etc.) and error detection + this._initializeState(); + + this.connection = connection; +} + +Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } }); + +// Managing the stream +// ------------------- + +// the default stream priority is 2^30 +var DEFAULT_PRIORITY = Math.pow(2, 30); +var MAX_PRIORITY = Math.pow(2, 31) - 1; + +// PUSH_PROMISE and HEADERS are forwarded to the user through events. +Stream.prototype._initializeManagement = function _initializeManagement() { + this._resetSent = false; + this._priority = DEFAULT_PRIORITY; + this._letPeerPrioritize = true; +}; + +Stream.prototype.promise = function promise(headers) { + var stream = new Stream(this._log, this.connection); + stream._priority = Math.min(this._priority + 1, MAX_PRIORITY); + this._pushUpstream({ + type: 'PUSH_PROMISE', + flags: {}, + stream: this.id, + promised_stream: stream, + headers: headers + }); + return stream; +}; + +Stream.prototype._onPromise = function _onPromise(frame) { + this.emit('promise', frame.promised_stream, frame.headers); +}; + +Stream.prototype.headers = function headers(headers) { + this._pushUpstream({ + type: 'HEADERS', + flags: {}, + stream: this.id, + headers: headers + }); +}; + +Stream.prototype._onHeaders = function _onHeaders(frame) { + if (frame.priority !== undefined) { + this.priority(frame.priority, true); + } + this.emit('headers', frame.headers); +}; + +Stream.prototype.priority = function priority(priority, peer) { + if ((peer && this._letPeerPrioritize) || !peer) { + if (!peer) { + this._letPeerPrioritize = false; + + var lastFrame = this.upstream.getLastQueuedFrame(); + if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) { + lastFrame.priority = priority; + } else { + this._pushUpstream({ + type: 'PRIORITY', + flags: {}, + stream: this.id, + priority: priority + }); + } + } + + this._log.debug({ priority: priority }, 'Changing priority'); + this.emit('priority', priority); + this._priority = priority; + } +}; + +Stream.prototype._onPriority = function _onPriority(frame) { + this.priority(frame.priority, true); +}; + +// Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for +// any stream. +Stream.prototype.reset = function reset(error) { + if (!this._resetSent) { + this._resetSent = true; + this._pushUpstream({ + type: 'RST_STREAM', + flags: {}, + stream: this.id, + error: error + }); + } +}; + +// Specify an alternate service for the origin of this stream +Stream.prototype.altsvc = function altsvc(host, port, protocolID, maxAge, origin) { + var stream; + if (origin) { + stream = 0; + } else { + stream = this.id; + } + this._pushUpstream({ + type: 'ALTSVC', + flags: {}, + stream: stream, + host: host, + port: port, + protocolID: protocolID, + origin: origin, + maxAge: maxAge + }); +}; + +// Data flow +// --------- + +// The incoming and the generated outgoing frames are received/transmitted on the `this.upstream` +// [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read +// and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by +// the user to write or read the body of the request. +// [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex + +// upstream side stream user side +// +// +------------------------------------+ +// | | +// +------------------+ | +// | upstream | | +// | | | +// +--+ | +--| +// read() | | _send() | _write() | | write(buf) +// <--------------|B |<--------------|--------------| B|<------------ +// | | | | | +// frames +--+ | +--| buffers +// | | | | | +// -------------->|B |---------------|------------->| B|------------> +// write(frame) | | _receive() | _read() | | read() +// +--+ | +--| +// | | | +// | | | +// +------------------+ | +// | | +// +------------------------------------+ +// +// B: input or output buffer + +var Flow = require('./flow').Flow; + +Stream.prototype._initializeDataFlow = function _initializeDataFlow() { + this.id = undefined; + + this._ended = false; + + this.upstream = new Flow(); + this.upstream._log = this._log; + this.upstream._send = this._send.bind(this); + this.upstream._receive = this._receive.bind(this); + this.upstream.write = this._writeUpstream.bind(this); + this.upstream.on('error', this.emit.bind(this, 'error')); + + this.on('finish', this._finishing); +}; + +Stream.prototype._pushUpstream = function _pushUpstream(frame) { + this.upstream.push(frame); + this._transition(true, frame); +}; + +// Overriding the upstream's `write` allows us to act immediately instead of waiting for the input +// queue to empty. This is important in case of control frames. +Stream.prototype._writeUpstream = function _writeUpstream(frame) { + this._log.debug({ frame: frame }, 'Receiving frame'); + + var moreNeeded = Flow.prototype.write.call(this.upstream, frame); + + // * Transition to a new state if that's the effect of receiving the frame + this._transition(false, frame); + + // * If it's a control frame. Call the appropriate handler method. + if (frame.type === 'HEADERS') { + if (this._processedHeaders && !frame.flags['END_STREAM']) { + this.emit('error', 'PROTOCOL_ERROR'); + } + this._processedHeaders = true; + this._onHeaders(frame); + } else if (frame.type === 'PUSH_PROMISE') { + this._onPromise(frame); + } else if (frame.type === 'PRIORITY') { + this._onPriority(frame); + } else if (frame.type === 'ALTSVC') { + // TODO + } else if (frame.type === 'BLOCKED') { + // TODO + } + + // * If it's an invalid stream level frame, emit error + else if ((frame.type !== 'DATA') && + (frame.type !== 'WINDOW_UPDATE') && + (frame.type !== 'RST_STREAM')) { + this._log.error({ frame: frame }, 'Invalid stream level frame'); + this.emit('error', 'PROTOCOL_ERROR'); + } + + return moreNeeded; +}; + +// The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame. +Stream.prototype._receive = function _receive(frame, ready) { + // * If it's a DATA frame, then push the payload into the output buffer on the other side. + // Call ready when the other side is ready to receive more. + if (!this._ended && (frame.type === 'DATA')) { + var moreNeeded = this.push(frame.data); + if (!moreNeeded) { + this._receiveMore = ready; + } + } + + // * Any frame may signal the end of the stream with the END_STREAM flag + if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { + this.push(null); + this._ended = true; + } + + // * Postpone calling `ready` if `push()` returned a falsy value + if (this._receiveMore !== ready) { + ready(); + } +}; + +// The `_read` method is called when the user side is ready to receive more data. If there's a +// pending write on the upstream, then call its pending ready callback to receive more frames. +Stream.prototype._read = function _read() { + if (this._receiveMore) { + var receiveMore = this._receiveMore; + delete this._receiveMore; + receiveMore(); + } +}; + +// The `write` method gets called when there's a write request from the user. +Stream.prototype._write = function _write(buffer, encoding, ready) { + // * Chunking is done by the upstream Flow. + var moreNeeded = this._pushUpstream({ + type: 'DATA', + flags: {}, + stream: this.id, + data: buffer + }); + + // * Call ready when upstream is ready to receive more frames. + if (moreNeeded) { + ready(); + } else { + this._sendMore = ready; + } +}; + +// The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames. +// If there's a pending write on the user side, then call its pending ready callback to receive more +// writes. +Stream.prototype._send = function _send() { + if (this._sendMore) { + var sendMore = this._sendMore; + delete this._sendMore; + sendMore(); + } +}; + +// When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM` +// flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag, +// then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an +// existing frame is a nice optimization. +var emptyBuffer = new Buffer(0); +Stream.prototype._finishing = function _finishing() { + var endFrame = { + type: 'DATA', + flags: { END_STREAM: true }, + stream: this.id, + data: emptyBuffer + }; + var lastFrame = this.upstream.getLastQueuedFrame(); + if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) { + this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.'); + lastFrame.flags.END_STREAM = true; + this._transition(true, endFrame); + } else { + this._pushUpstream(endFrame); + } +}; + +// [Stream States](https://tools.ietf.org/html/rfc7540#section-5.1) +// ---------------- +// +// +--------+ +// PP | | PP +// ,--------| idle |--------. +// / | | \ +// v +--------+ v +// +----------+ | +----------+ +// | | | H | | +// ,---| reserved | | | reserved |---. +// | | (local) | v | (remote) | | +// | +----------+ +--------+ +----------+ | +// | | ES | | ES | | +// | | H ,-------| open |-------. | H | +// | | / | | \ | | +// | v v +--------+ v v | +// | +----------+ | +----------+ | +// | | half | | | half | | +// | | closed | | R | closed | | +// | | (remote) | | | (local) | | +// | +----------+ | +----------+ | +// | | v | | +// | | ES / R +--------+ ES / R | | +// | `----------->| |<-----------' | +// | R | closed | R | +// `-------------------->| |<--------------------' +// +--------+ + +// Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame +Stream.prototype._initializeState = function _initializeState() { + this.state = 'IDLE'; + this._initiated = undefined; + this._closedByUs = undefined; + this._closedWithRst = undefined; + this._processedHeaders = false; +}; + +// Only `_setState` should change `this.state` directly. It also logs the state change and notifies +// interested parties using the 'state' event. +Stream.prototype._setState = function transition(state) { + assert(this.state !== state); + this._log.debug({ from: this.state, to: state }, 'State transition'); + this.state = state; + this.emit('state', state); +}; + +// A state is 'active' if the stream in that state counts towards the concurrency limit. Streams +// that are in the "open" state, or either of the "half closed" states count toward this limit. +function activeState(state) { + return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN')); +} + +// `_transition` is called every time there's an incoming or outgoing frame. It manages state +// transitions, and detects stream errors. A stream error is always caused by a frame that is not +// allowed in the current state. +Stream.prototype._transition = function transition(sending, frame) { + var receiving = !sending; + var connectionError; + var streamError; + + var DATA = false, HEADERS = false, PRIORITY = false, ALTSVC = false, BLOCKED = false; + var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false; + switch(frame.type) { + case 'DATA' : DATA = true; break; + case 'HEADERS' : HEADERS = true; break; + case 'PRIORITY' : PRIORITY = true; break; + case 'RST_STREAM' : RST_STREAM = true; break; + case 'PUSH_PROMISE' : PUSH_PROMISE = true; break; + case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break; + case 'ALTSVC' : ALTSVC = true; break; + case 'BLOCKED' : BLOCKED = true; break; + } + + var previousState = this.state; + + switch (this.state) { + // All streams start in the **idle** state. In this state, no frames have been exchanged. + // + // * Sending or receiving a HEADERS frame causes the stream to become "open". + // + // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen. + case 'IDLE': + if (HEADERS) { + this._setState('OPEN'); + if (frame.flags.END_STREAM) { + this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); + } + this._initiated = sending; + } else if (sending && RST_STREAM) { + this._setState('CLOSED'); + } else if (PRIORITY) { + /* No state change */ + } else { + connectionError = 'PROTOCOL_ERROR'; + } + break; + + // A stream in the **reserved (local)** state is one that has been promised by sending a + // PUSH_PROMISE frame. + // + // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed + // (remote)" state. + // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This + // releases the stream reservation. + // * An endpoint may receive PRIORITY frame in this state. + // * An endpoint MUST NOT send any other type of frame in this state. + case 'RESERVED_LOCAL': + if (sending && HEADERS) { + this._setState('HALF_CLOSED_REMOTE'); + } else if (RST_STREAM) { + this._setState('CLOSED'); + } else if (PRIORITY) { + /* No state change */ + } else { + connectionError = 'PROTOCOL_ERROR'; + } + break; + + // A stream in the **reserved (remote)** state has been reserved by a remote peer. + // + // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This + // releases the stream reservation. + // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)". + // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream. + // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR. + case 'RESERVED_REMOTE': + if (RST_STREAM) { + this._setState('CLOSED'); + } else if (receiving && HEADERS) { + this._setState('HALF_CLOSED_LOCAL'); + } else if (BLOCKED || PRIORITY) { + /* No state change */ + } else { + connectionError = 'PROTOCOL_ERROR'; + } + break; + + // The **open** state is where both peers can send frames. In this state, sending peers observe + // advertised stream level flow control limits. + // + // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes + // the stream to transition into one of the "half closed" states: an endpoint sending a + // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint + // receiving a END_STREAM flag causes the stream state to become "half closed (remote)". + // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition + // immediately to "closed". + case 'OPEN': + if (frame.flags.END_STREAM) { + this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); + } else if (RST_STREAM) { + this._setState('CLOSED'); + } else { + /* No state change */ + } + break; + + // A stream that is **half closed (local)** cannot be used for sending frames. + // + // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM + // flag is received, or when either peer sends a RST_STREAM frame. + // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. + // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag. + case 'HALF_CLOSED_LOCAL': + if (RST_STREAM || (receiving && frame.flags.END_STREAM)) { + this._setState('CLOSED'); + } else if (BLOCKED || ALTSVC || receiving || PRIORITY || (sending && WINDOW_UPDATE)) { + /* No state change */ + } else { + connectionError = 'PROTOCOL_ERROR'; + } + break; + + // A stream that is **half closed (remote)** is no longer being used by the peer to send frames. + // In this state, an endpoint is no longer obligated to maintain a receiver flow control window + // if it performs flow control. + // + // * If an endpoint receives additional frames for a stream that is in this state it MUST + // respond with a stream error of type STREAM_CLOSED. + // * A stream can transition from this state to "closed" by sending a frame that contains a + // END_STREAM flag, or when either peer sends a RST_STREAM frame. + // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. + // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream. + case 'HALF_CLOSED_REMOTE': + if (RST_STREAM || (sending && frame.flags.END_STREAM)) { + this._setState('CLOSED'); + } else if (BLOCKED || ALTSVC || sending || PRIORITY || (receiving && WINDOW_UPDATE)) { + /* No state change */ + } else { + connectionError = 'PROTOCOL_ERROR'; + } + break; + + // The **closed** state is the terminal state. + // + // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame + // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST + // treat that as a stream error of type STREAM_CLOSED. + // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short + // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives + // and processes the frame bearing the END_STREAM flag, it might send either frame type. + // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY + // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending + // END_STREAM as a connection error of type PROTOCOL_ERROR. + // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives + // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream + // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that + // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose + // to limit the period over which it ignores frames and treat frames that arrive after this + // time as being in error. + // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE + // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM + // can be used to close any of those streams. + case 'CLOSED': + if (PRIORITY || (sending && RST_STREAM) || + (receiving && WINDOW_UPDATE) || + (receiving && this._closedByUs && + (this._closedWithRst || RST_STREAM || ALTSVC))) { + /* No state change */ + } else { + streamError = 'STREAM_CLOSED'; + } + break; + } + + // Noting that the connection was closed by the other endpoint. It may be important in edge cases. + // For example, when the peer tries to cancel a promised stream, but we already sent every data + // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM. + if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) { + this._closedByUs = sending; + this._closedWithRst = RST_STREAM; + } + + // Sending/receiving a PUSH_PROMISE + // + // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state + // for the reserved stream transitions to "reserved (local)". + // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer. + // The state of the stream becomes "reserved (remote)". + if (PUSH_PROMISE && !connectionError && !streamError) { + /* This assertion must hold, because _transition is called immediately when a frame is written + to the stream. If it would be called when a frame gets out of the input queue, the state + of the reserved could have been changed by then. */ + assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state); + frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE'); + frame.promised_stream._initiated = sending; + } + + // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1) + if (this._initiated) { + var change = (activeState(this.state) - activeState(previousState)); + if (sending) { + frame.count_change = change; + } else { + frame.count_change(change); + } + } else if (sending) { + frame.count_change = 0; + } + + // Common error handling. + if (connectionError || streamError) { + var info = { + error: connectionError, + frame: frame, + state: this.state, + closedByUs: this._closedByUs, + closedWithRst: this._closedWithRst + }; + + // * When sending something invalid, throwing an exception, since it is probably a bug. + if (sending) { + this._log.error(info, 'Sending illegal frame.'); + return this.emit('error', new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.')); + } + + // * In case of a serious problem, emitting and error and letting someone else handle it + // (e.g. closing the connection) + // * When receiving something invalid, sending an RST_STREAM using the `reset` method. + // This will automatically cause a transition to the CLOSED state. + else { + this._log.error(info, 'Received illegal frame.'); + if (connectionError) { + this.emit('connectionError', connectionError); + } else { + this.reset(streamError); + this.emit('error', streamError); + } + } + } +}; + +// Bunyan serializers +// ------------------ + +exports.serializers = {}; + +var nextId = 0; +exports.serializers.s = function(stream) { + if (!('_id' in stream)) { + stream._id = nextId; + nextId += 1; + } + return stream._id; +}; |