var assert = require('assert'); // The Connection class // ==================== // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport // stream (TCP stream). It operates by sending and receiving frames and is implemented as a // [Flow](flow.html) subclass. var Flow = require('./flow').Flow; exports.Connection = Connection; // Public API // ---------- // * **new Connection(log, firstStreamId, settings)**: create a new Connection // // * **Event: 'error' (type)**: signals a connection level error made by the other end // // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error // code other than NO_ERROR // // * **Event: 'stream' (stream)**: signals that there's an incoming stream // // * **createStream(): stream**: initiate a new stream // // * **set(settings, callback)**: change the value of one or more settings according to the // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. // // * **ping([callback])**: send a ping and call callback when the answer arrives // // * **close([error])**: close the stream with an error code // Constructor // ----------- // The main aspects of managing the connection are: function Connection(log, firstStreamId, settings) { // * initializing the base class Flow.call(this, 0); // * logging: every method uses the common logger object this._log = log.child({ component: 'connection' }); // * stream management this._initializeStreamManagement(firstStreamId); // * lifecycle management this._initializeLifecycleManagement(); // * flow control this._initializeFlowControl(); // * settings management this._initializeSettingsManagement(settings); // * multiplexing this._initializeMultiplexing(); } Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } }); // Overview // -------- // | ^ | ^ // v | v | // +--------------+ +--------------+ // +---| stream1 |---| stream2 |---- .... ---+ // | | +----------+ | | +----------+ | | // | | | stream1. | | | | stream2. | | | // | +-| upstream |-+ +-| upstream |-+ | // | +----------+ +----------+ | // | | ^ | ^ | // | v | v | | // | +-----+-------------+-----+-------- .... | // | ^ | | | | // | | v | | | // | +--------------+ | | | // | | stream0 | | | | // | | connection | | | | // | | management | multiplexing | // | +--------------+ flow control | // | | ^ | // | _read() | | _write() | // | v | | // | +------------+ +-----------+ | // | |output queue| |input queue| | // +----------------+------------+-+-----------+-----------------+ // | ^ // read() | | write() // v | // Stream management // ----------------- var Stream = require('./stream').Stream; // Initialization: Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { // * streams are stored in two data structures: // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames. // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames. this._streamIds = []; this._streamPriorities = []; // * The next outbound stream ID and the last inbound stream id this._nextStreamId = firstStreamId; this._lastIncomingStream = 0; // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. this._streamSlotsFree = Infinity; this._streamLimit = Infinity; this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); }; // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It // broadcasts the message by creating an event on it. Connection.prototype._writeControlFrame = function _writeControlFrame(frame) { if ((frame.type === 'SETTINGS') || (frame.type === 'PING') || (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') || (frame.type === 'ALTSVC')) { this._log.debug({ frame: frame }, 'Receiving connection level frame'); this.emit(frame.type, frame); } else { this._log.error({ frame: frame }, 'Invalid connection level frame'); this.emit('error', 'PROTOCOL_ERROR'); } }; // Methods to manage the stream slot pool: Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) { var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit); this._streamSlotsFree += newStreamLimit - this._streamLimit; this._streamLimit = newStreamLimit; if (wakeup) { this.emit('wakeup'); } }; Connection.prototype._changeStreamCount = function _changeStreamCount(change) { if (change) { this._log.trace({ free: this._streamSlotsFree, change: change }, 'Changing active stream count.'); var wakeup = (this._streamSlotsFree === 0) && (change < 0); this._streamSlotsFree -= change; if (wakeup) { this.emit('wakeup'); } } }; // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of // an outbound stream) consists of three steps: // // 1. var stream = new Stream(this._log, this); // 2. this._allocateId(stream, id); // 2. this._allocatePriority(stream); // Allocating an ID to a stream Connection.prototype._allocateId = function _allocateId(stream, id) { // * initiated stream without definite ID if (id === undefined) { id = this._nextStreamId; this._nextStreamId += 2; } // * incoming stream with a legitim ID (larger than any previous and different parity than ours) else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) { this._lastIncomingStream = id; } // * incoming stream with invalid ID else { this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream }, 'Invalid incoming stream ID.'); this.emit('error', 'PROTOCOL_ERROR'); return undefined; } assert(!(id in this._streamIds)); // * adding to `this._streamIds` this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.'); this._streamIds[id] = stream; stream.id = id; this.emit('new_stream', stream, id); // * forwarding connection errors from streams stream.on('connectionError', this.emit.bind(this, 'error')); return id; }; // Allocating a priority to a stream, and managing priority changes Connection.prototype._allocatePriority = function _allocatePriority(stream) { this._log.trace({ s: stream }, 'Allocating priority for stream.'); this._insert(stream, stream._priority); stream.on('priority', this._reprioritize.bind(this, stream)); stream.upstream.on('readable', this.emit.bind(this, 'wakeup')); this.emit('wakeup'); }; Connection.prototype._insert = function _insert(stream, priority) { if (priority in this._streamPriorities) { this._streamPriorities[priority].push(stream); } else { this._streamPriorities[priority] = [stream]; } }; Connection.prototype._reprioritize = function _reprioritize(stream, priority) { var bucket = this._streamPriorities[stream._priority]; var index = bucket.indexOf(stream); assert(index !== -1); bucket.splice(index, 1); if (bucket.length === 0) { delete this._streamPriorities[stream._priority]; } this._insert(stream, priority); }; // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to // a previously nonexistent stream. Connection.prototype._createIncomingStream = function _createIncomingStream(id) { this._log.debug({ stream_id: id }, 'New incoming stream.'); var stream = new Stream(this._log, this); this._allocateId(stream, id); this._allocatePriority(stream); this.emit('stream', stream, id); return stream; }; // Creating an *outbound* stream Connection.prototype.createStream = function createStream() { this._log.trace('Creating new outbound stream.'); // * Receiving is enabled immediately, and an ID gets assigned to the stream var stream = new Stream(this._log, this); this._allocatePriority(stream); return stream; }; // Multiplexing // ------------ Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { this.on('window_update', this.emit.bind(this, 'wakeup')); this._sendScheduled = false; this._firstFrameReceived = false; }; // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented // by child classes. It reads frames from streams and pushes them to the output buffer. Connection.prototype._send = function _send(immediate) { // * Do not do anything if the connection is already closed if (this._closed) { return; } // * Collapsing multiple calls in a turn into a single deferred call if (immediate) { this._sendScheduled = false; } else { if (!this._sendScheduled) { this._sendScheduled = true; setImmediate(this._send.bind(this, true)); } return; } this._log.trace('Starting forwarding frames from streams.'); // * Looping through priority `bucket`s in priority order. priority_loop: for (var priority in this._streamPriorities) { var bucket = this._streamPriorities[priority]; var nextBucket = []; // * Forwarding frames from buckets with round-robin scheduling. // 1. pulling out frame // 2. if there's no frame, skip this stream // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip // this stream // 4. adding stream to the bucket of the next round // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream // 7. forwarding the frame, changing `streamCount` as appropriate // 8. stepping to the next stream if there's still more frame needed in the output buffer // 9. switching to the bucket of the next round while (bucket.length > 0) { for (var index = 0; index < bucket.length; index++) { var stream = bucket[index]; var frame = stream.upstream.read((this._window > 0) ? this._window : -1); if (!frame) { continue; } else if (frame.count_change > this._streamSlotsFree) { stream.upstream.unshift(frame); continue; } nextBucket.push(stream); if (frame.stream === undefined) { frame.stream = stream.id || this._allocateId(stream); } if (frame.type === 'PUSH_PROMISE') { this._allocatePriority(frame.promised_stream); frame.promised_stream = this._allocateId(frame.promised_stream); } this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); var moreNeeded = this.push(frame); this._changeStreamCount(frame.count_change); assert(moreNeeded !== null); // The frame shouldn't be unforwarded if (moreNeeded === false) { break priority_loop; } } bucket = nextBucket; nextBucket = []; } } // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event if (moreNeeded === undefined) { this.once('wakeup', this._send.bind(this)); } this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.'); }; // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be // implemented by child classes. It forwards the given frame to the appropriate stream: Connection.prototype._receive = function _receive(frame, done) { this._log.trace({ frame: frame }, 'Forwarding incoming frame'); // * first frame needs to be checked by the `_onFirstFrameReceived` method if (!this._firstFrameReceived) { this._firstFrameReceived = true; this._onFirstFrameReceived(frame); } // Do some sanity checking here before we create a stream if ((frame.type == 'SETTINGS' || frame.type == 'PING' || frame.type == 'GOAWAY') && frame.stream != 0) { // Got connection-level frame on a stream - EEP! this.close('PROTOCOL_ERROR'); return; } else if ((frame.type == 'DATA' || frame.type == 'HEADERS' || frame.type == 'PRIORITY' || frame.type == 'RST_STREAM' || frame.type == 'PUSH_PROMISE' || frame.type == 'CONTINUATION') && frame.stream == 0) { // Got stream-level frame on connection - EEP! this.close('PROTOCOL_ERROR'); return; } // WINDOW_UPDATE can be on either stream or connection // * gets the appropriate stream from the stream registry var stream = this._streamIds[frame.stream]; // * or creates one if it's not in `this.streams` if (!stream) { stream = this._createIncomingStream(frame.stream); } // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream if (frame.type === 'PUSH_PROMISE') { frame.promised_stream = this._createIncomingStream(frame.promised_stream); } frame.count_change = this._changeStreamCount.bind(this); // * and writes it to the `stream`'s `upstream` stream.upstream.write(frame); done(); }; // Settings management // ------------------- var defaultSettings = { }; // Settings management initialization: Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) { // * Setting up the callback queue for setting acknowledgements this._settingsAckCallbacks = []; // * Sending the initial settings. this._log.debug({ settings: settings }, 'Sending the first SETTINGS frame as part of the connection header.'); this.set(settings || defaultSettings); // * Forwarding SETTINGS frames to the `_receiveSettings` method this.on('SETTINGS', this._receiveSettings); this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize); }; // * Checking that the first frame the other endpoint sends is SETTINGS Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) { if ((frame.stream === 0) && (frame.type === 'SETTINGS')) { this._log.debug('Receiving the first SETTINGS frame as part of the connection header.'); } else { this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.'); this.emit('error', 'PROTOCOL_ERROR'); } }; // Handling of incoming SETTINGS frames. Connection.prototype._receiveSettings = function _receiveSettings(frame) { // * If it's an ACK, call the appropriate callback if (frame.flags.ACK) { var callback = this._settingsAckCallbacks.shift(); if (callback) { callback(); } } // * If it's a setting change request, then send an ACK and change the appropriate settings else { if (!this._closed) { this.push({ type: 'SETTINGS', flags: { ACK: true }, stream: 0, settings: {} }); } for (var name in frame.settings) { this.emit('RECEIVING_' + name, frame.settings[name]); } } }; Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) { if ((value < 0x4000) || (value >= 0x01000000)) { this._log.fatal('Received invalid value for max frame size: ' + value); this.emit('error'); } }; // Changing one or more settings value and sending out a SETTINGS frame Connection.prototype.set = function set(settings, callback) { // * Calling the callback and emitting event when the change is acknowledges var self = this; this._settingsAckCallbacks.push(function() { for (var name in settings) { self.emit('ACKNOWLEDGED_' + name, settings[name]); } if (callback) { callback(); } }); // * Sending out the SETTINGS frame this.push({ type: 'SETTINGS', flags: { ACK: false }, stream: 0, settings: settings }); for (var name in settings) { this.emit('SENDING_' + name, settings[name]); } }; // Lifecycle management // -------------------- // The main responsibilities of lifecycle management code: // // * keeping the connection alive by // * sending PINGs when the connection is idle // * answering PINGs // * ending the connection Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() { this._pings = {}; this.on('PING', this._receivePing); this.on('GOAWAY', this._receiveGoaway); this._closed = false; }; // Generating a string of length 16 with random hexadecimal digits Connection.prototype._generatePingId = function _generatePingId() { do { var id = ''; for (var i = 0; i < 16; i++) { id += Math.floor(Math.random()*16).toString(16); } } while(id in this._pings); return id; }; // Sending a ping and calling `callback` when the answer arrives Connection.prototype.ping = function ping(callback) { var id = this._generatePingId(); var data = new Buffer(id, 'hex'); this._pings[id] = callback; this._log.debug({ data: data }, 'Sending PING.'); this.push({ type: 'PING', flags: { ACK: false }, stream: 0, data: data }); }; // Answering pings Connection.prototype._receivePing = function _receivePing(frame) { if (frame.flags.ACK) { var id = frame.data.toString('hex'); if (id in this._pings) { this._log.debug({ data: frame.data }, 'Receiving answer for a PING.'); var callback = this._pings[id]; if (callback) { callback(); } delete this._pings[id]; } else { this._log.warn({ data: frame.data }, 'Unsolicited PING answer.'); } } else { this._log.debug({ data: frame.data }, 'Answering PING.'); this.push({ type: 'PING', flags: { ACK: true }, stream: 0, data: frame.data }); } }; // Terminating the connection Connection.prototype.close = function close(error) { if (this._closed) { this._log.warn('Trying to close an already closed connection'); return; } this._log.debug({ error: error }, 'Closing the connection'); this.push({ type: 'GOAWAY', flags: {}, stream: 0, last_stream: this._lastIncomingStream, error: error || 'NO_ERROR' }); this.push(null); this._closed = true; }; Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { this._log.debug({ error: frame.error }, 'Other end closed the connection'); this.push(null); this._closed = true; if (frame.error !== 'NO_ERROR') { this.emit('peerError', frame.error); } }; // Flow control // ------------ Connection.prototype._initializeFlowControl = function _initializeFlowControl() { // Handling of initial window size of individual streams. this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE; this.on('new_stream', function(stream) { stream.upstream.setInitialWindow(this._initialStreamWindowSize); }); this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize); this._streamIds[0].upstream.setInitialWindow = function noop() {}; }; // The initial connection flow control window is 65535 bytes. var INITIAL_STREAM_WINDOW_SIZE = 65535; // A SETTINGS frame can alter the initial flow control window size for all current streams. When the // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by // the difference between the new value and the old value. Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) { if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) { this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.'); this.emit('error', 'FLOW_CONTROL_ERROR'); } else { this._log.debug({ size: size }, 'Changing stream initial window size.'); this._initialStreamWindowSize = size; this._streamIds.forEach(function(stream) { stream.upstream.setInitialWindow(size); }); } };