diff options
Diffstat (limited to 'testing/xpcshell/node-http2/lib/protocol/endpoint.js')
-rw-r--r-- | testing/xpcshell/node-http2/lib/protocol/endpoint.js | 262 |
1 files changed, 262 insertions, 0 deletions
diff --git a/testing/xpcshell/node-http2/lib/protocol/endpoint.js b/testing/xpcshell/node-http2/lib/protocol/endpoint.js new file mode 100644 index 000000000..a218db040 --- /dev/null +++ b/testing/xpcshell/node-http2/lib/protocol/endpoint.js @@ -0,0 +1,262 @@ +var assert = require('assert'); + +var Serializer = require('./framer').Serializer; +var Deserializer = require('./framer').Deserializer; +var Compressor = require('./compressor').Compressor; +var Decompressor = require('./compressor').Decompressor; +var Connection = require('./connection').Connection; +var Duplex = require('stream').Duplex; +var Transform = require('stream').Transform; + +exports.Endpoint = Endpoint; + +// The Endpoint class +// ================== + +// Public API +// ---------- + +// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. +// +// - `log`: bunyan logger of the parent +// - `role`: 'CLIENT' or 'SERVER' +// - `settings`: initial HTTP/2 settings +// - `filters`: a map of functions that filter the traffic between components (for debugging or +// intentional failure injection). +// +// Filter functions get three arguments: +// 1. `frame`: the current frame +// 2. `forward(frame)`: function that can be used to forward a frame to the next component +// 3. `done()`: callback to signal the end of the filter process +// +// Valid filter names and their position in the stack: +// - `beforeSerialization`: after compression, before serialization +// - `beforeCompression`: after multiplexing, before compression +// - `afterDeserialization`: after deserialization, before decompression +// - `afterDecompression`: after decompression, before multiplexing +// +// * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection +// +// * **Event: 'error' (type)**: signals an error +// +// * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) +// +// * **close([error])**: close the connection with an error code + +// Constructor +// ----------- + +// The process of initialization: +function Endpoint(log, role, settings, filters) { + Duplex.call(this); + + // * Initializing logging infrastructure + this._log = log.child({ component: 'endpoint', e: this }); + + // * First part of the handshake process: sending and receiving the client connection header + // prelude. + assert((role === 'CLIENT') || role === 'SERVER'); + if (role === 'CLIENT') { + this._writePrelude(); + } else { + this._readPrelude(); + } + + // * Initialization of component. This includes the second part of the handshake process: + // sending the first SETTINGS frame. This is done by the connection class right after + // initialization. + this._initializeDataFlow(role, settings, filters || {}); + + // * Initialization of management code. + this._initializeManagement(); + + // * Initializing error handling. + this._initializeErrorHandling(); +} +Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } }); + +// Handshake +// --------- + +var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'); + +// Writing the client header is simple and synchronous. +Endpoint.prototype._writePrelude = function _writePrelude() { + this._log.debug('Sending the client connection header prelude.'); + this.push(CLIENT_PRELUDE); +}; + +// The asynchronous process of reading the client header: +Endpoint.prototype._readPrelude = function _readPrelude() { + // * progress in the header is tracker using a `cursor` + var cursor = 0; + + // * `_write` is temporarily replaced by the comparator function + this._write = function _temporalWrite(chunk, encoding, done) { + // * which compares the stored header with the current `chunk` byte by byte and emits the + // 'error' event if there's a byte that doesn't match + var offset = cursor; + while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) { + if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) { + this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk }, + 'Client connection header prelude does not match.'); + this._error('handshake', 'PROTOCOL_ERROR'); + return; + } + cursor += 1; + } + + // * if the whole header is over, and there were no error then restore the original `_write` + // and call it with the remaining part of the current chunk + if (cursor === CLIENT_PRELUDE.length) { + this._log.debug('Successfully received the client connection header prelude.'); + delete this._write; + chunk = chunk.slice(cursor - offset); + this._write(chunk, encoding, done); + } + }; +}; + +// Data flow +// --------- + +// +---------------------------------------------+ +// | | +// | +-------------------------------------+ | +// | | +---------+ +---------+ +---------+ | | +// | | | stream1 | | stream2 | | ... | | | +// | | +---------+ +---------+ +---------+ | | +// | | connection | | +// | +-------------------------------------+ | +// | | ^ | +// | pipe | | pipe | +// | v | | +// | +------------------+------------------+ | +// | | compressor | decompressor | | +// | +------------------+------------------+ | +// | | ^ | +// | pipe | | pipe | +// | v | | +// | +------------------+------------------+ | +// | | serializer | deserializer | | +// | +------------------+------------------+ | +// | | ^ | +// | _read() | | _write() | +// | v | | +// | +------------+ +-----------+ | +// | |output queue| |input queue| | +// +------+------------+-----+-----------+-------+ +// | ^ +// read() | | write() +// v | + +function createTransformStream(filter) { + var transform = new Transform({ objectMode: true }); + var push = transform.push.bind(transform); + transform._transform = function(frame, encoding, done) { + filter(frame, push, done); + }; + return transform; +} + +function pipeAndFilter(stream1, stream2, filter) { + if (filter) { + stream1.pipe(createTransformStream(filter)).pipe(stream2); + } else { + stream1.pipe(stream2); + } +} + +Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { + var firstStreamId, compressorRole, decompressorRole; + if (role === 'CLIENT') { + firstStreamId = 1; + compressorRole = 'REQUEST'; + decompressorRole = 'RESPONSE'; + } else { + firstStreamId = 2; + compressorRole = 'RESPONSE'; + decompressorRole = 'REQUEST'; + } + + this._serializer = new Serializer(this._log); + this._deserializer = new Deserializer(this._log); + this._compressor = new Compressor(this._log, compressorRole); + this._decompressor = new Decompressor(this._log, decompressorRole); + this._connection = new Connection(this._log, firstStreamId, settings); + + pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); + pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); + pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization); + pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression); + + this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE', + this._decompressor.setTableSizeLimit.bind(this._decompressor)); + this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', + this._compressor.setTableSizeLimit.bind(this._compressor)); +}; + +var noread = {}; +Endpoint.prototype._read = function _read() { + this._readableState.sync = true; + var moreNeeded = noread, chunk; + while (moreNeeded && (chunk = this._serializer.read())) { + moreNeeded = this.push(chunk); + } + if (moreNeeded === noread) { + this._serializer.once('readable', this._read.bind(this)); + } + this._readableState.sync = false; +}; + +Endpoint.prototype._write = function _write(chunk, encoding, done) { + this._deserializer.write(chunk, encoding, done); +}; + +// Management +// -------------- + +Endpoint.prototype._initializeManagement = function _initializeManagement() { + this._connection.on('stream', this.emit.bind(this, 'stream')); +}; + +Endpoint.prototype.createStream = function createStream() { + return this._connection.createStream(); +}; + +// Error handling +// -------------- + +Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() { + this._serializer.on('error', this._error.bind(this, 'serializer')); + this._deserializer.on('error', this._error.bind(this, 'deserializer')); + this._compressor.on('error', this._error.bind(this, 'compressor')); + this._decompressor.on('error', this._error.bind(this, 'decompressor')); + this._connection.on('error', this._error.bind(this, 'connection')); + + this._connection.on('peerError', this.emit.bind(this, 'peerError')); +}; + +Endpoint.prototype._error = function _error(component, error) { + this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection'); + this.close(error); + setImmediate(this.emit.bind(this, 'error', error)); +}; + +Endpoint.prototype.close = function close(error) { + this._connection.close(error); +}; + +// Bunyan serializers +// ------------------ + +exports.serializers = {}; + +var nextId = 0; +exports.serializers.e = function(endpoint) { + if (!('id' in endpoint)) { + endpoint.id = nextId; + nextId += 1; + } + return endpoint.id; +}; |