diff --git a/doc/api/http2.md b/doc/api/http2.md index 9e9c3f8d3f0232..16f3c1d2fe4542 100644 --- a/doc/api/http2.md +++ b/doc/api/http2.md @@ -4410,6 +4410,9 @@ added: v8.4.0 This method adds HTTP trailing headers (a header but at the end of the message) to the response. +Trailers must be added before calling [`response.end()`][]; trailers added +afterwards are silently dropped. + Attempting to set a header field name or value that contains invalid characters will result in a [`TypeError`][] being thrown. diff --git a/lib/internal/http2/compat.js b/lib/internal/http2/compat.js index 7b9524ef855988..d9c123ee846b90 100644 --- a/lib/internal/http2/compat.js +++ b/lib/internal/http2/compat.js @@ -52,6 +52,8 @@ const { validateObject, } = require('internal/validators'); const { + kAutoEmptyTrailers, + kDisableAutoTrailers, kSocket, kRequest, kProxySocket, @@ -300,16 +302,23 @@ function onStreamCloseRequest() { req.emit('close'); } -function onStreamTimeout(kind) { - return function onStreamTimeout() { - const obj = this[kind]; - obj.emit('timeout'); - }; +// Shared between all Http2ServerRequest instances created without explicit +// options; the Readable constructor only reads from it. +const kDefaultRequestOptions = { autoDestroy: false }; + +function onStreamTimeoutRequest() { + this[kRequest].emit('timeout'); +} + +function onStreamTimeoutResponse() { + this[kResponse].emit('timeout'); } class Http2ServerRequest extends Readable { constructor(stream, headers, options, rawHeaders) { - super({ autoDestroy: false, ...options }); + super(options === undefined ? + kDefaultRequestOptions : + { autoDestroy: false, ...options }); this[kState] = { closed: false, didRead: false, @@ -332,7 +341,7 @@ class Http2ServerRequest extends Readable { stream.on('error', onStreamError); stream.on('aborted', onStreamAbortedRequest); stream.on('close', onStreamCloseRequest); - stream.on('timeout', onStreamTimeout(kRequest)); + stream.on('timeout', onStreamTimeoutRequest); this.on('pause', onRequestPause); this.on('resume', onRequestResume); } @@ -472,6 +481,8 @@ class Http2ServerResponse extends Stream { this[kState] = { closed: false, ending: false, + finishing: false, + hasTrailers: false, destroyed: false, headRequest: false, sendDate: true, @@ -488,7 +499,7 @@ class Http2ServerResponse extends Stream { stream.on('aborted', onStreamAbortedResponse); stream.on('close', onStreamCloseResponse); stream.on('wantTrailers', onStreamTrailersReady); - stream.on('timeout', onStreamTimeout(kResponse)); + stream.on('timeout', onStreamTimeoutResponse); } // User land modules such as finalhandler just check truthiness of this @@ -583,6 +594,15 @@ class Http2ServerResponse extends Stream { name = name.trim().toLowerCase(); assertValidHeader(name, value); this[kTrailers][name] = value; + const state = this[kState]; + if (!state.hasTrailers) { + state.hasTrailers = true; + // If the response headers were already flushed with auto-empty + // trailers enabled, tell the stream to hand the trailers back to JS. + const stream = this[kStream]; + if (stream.headersSent) + stream[kDisableAutoTrailers](); + } } addTrailers(headers) { @@ -838,6 +858,12 @@ class Http2ServerResponse extends Stream { return this; } + // If the headers have not been flushed yet, they will be flushed below + // as part of ending the response. In that case there is no further + // opportunity to add trailers, so the trailers round trip can be + // skipped entirely when none have been registered. + state.finishing = true; + if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); @@ -895,9 +921,18 @@ class Http2ServerResponse extends Stream { const state = this[kState]; const headers = this[kHeaders]; headers[HTTP2_HEADER_STATUS] = state.statusCode; + // Only wait for trailers if some have been registered, or if the + // headers are flushed before the response is ended (in which case + // trailers may still be added during streaming). Trailers added after + // end() are dropped, matching HTTP/1 addTrailers() semantics. + const waitForTrailers = state.hasTrailers || !state.finishing; const options = { endStream: state.ending, - waitForTrailers: true, + waitForTrailers, + // When no trailers have been registered yet, let the native side + // finish the stream on its own if none show up by the time the last + // DATA frame is sent (see setTrailer()). + [kAutoEmptyTrailers]: waitForTrailers && !state.hasTrailers, sendDate: state.sendDate, }; this[kStream].respond(headers, options); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 3fe6380732a482..d65e9fe5a7623c 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -150,6 +150,8 @@ const { getStreamState, isPayloadMeaningless, kAuthority, + kAutoEmptyTrailers, + kDisableAutoTrailers, kSensitiveHeaders, kStrictSingleValueFields, kSocket, @@ -334,6 +336,7 @@ const { STREAM_OPTION_EMPTY_PAYLOAD, STREAM_OPTION_GET_TRAILERS, + STREAM_OPTION_AUTO_EMPTY_TRAILERS, } = constants; const STREAM_FLAGS_PENDING = 0x0; @@ -559,29 +562,19 @@ function sessionListenerRemoved(name) { } // Also keep track of listeners for the Http2Stream instances, as some events -// are emitted on those objects. -function streamListenerAdded(name) { - const session = this[kSession]; - if (!session) return; - switch (name) { - case 'priority': - session[kNativeFields][kSessionPriorityListenerCount]++; - break; - case 'frameError': - session[kNativeFields][kSessionFrameErrorListenerCount]++; - break; - } -} - -function streamListenerRemoved(name) { - const session = this[kSession]; +// are emitted on those objects. Instead of subscribing to 'newListener' and +// 'removeListener' (which makes every listener add/remove on every stream +// emit an extra tracking event), Http2Stream overrides the EventEmitter +// methods and updates the counts directly. +function trackStreamListener(stream, name, delta) { + const session = stream[kSession]; if (!session) return; switch (name) { case 'priority': - session[kNativeFields][kSessionPriorityListenerCount]--; + session[kNativeFields][kSessionPriorityListenerCount] += delta; break; case 'frameError': - session[kNativeFields][kSessionFrameErrorListenerCount]--; + session[kNativeFields][kSessionFrameErrorListenerCount] += delta; break; } } @@ -595,6 +588,20 @@ function onPing(payload) { session.emit('ping', payload); } +function streamNaturalCloseSettled(stream) { + return (stream._readableState.endEmitted || + !!stream._readableState.errored) && + (stream._writableState.finished || + !!stream._writableState.errored); +} + +// Shared 'end'/'finish'/'error' listener for the natural-close path of +// onStreamClose(). Named and reused to avoid per-stream closures. +function maybeDestroyNaturalClose() { + if (!this.destroyed && streamNaturalCloseSettled(this)) + this.destroy(); +} + // Fired by C++ when nghttp2's on_stream_close fires. `peerReset` is // true when the peer sent a RST_STREAM frame - peer RST_STREAM(NO_ERROR) // is otherwise indistinguishable from a clean END_STREAM exchange at @@ -655,22 +662,14 @@ function onStreamClose(code, peerReset) { // errored readable won't fire 'end' - and on a Duplex a writable // error propagates to readable.errored, blocking 'end' too. Treat // either side's errored state as settled. - const readDone = () => stream._readableState.endEmitted || - !!stream._readableState.errored; - const writeDone = () => stream._writableState.finished || - !!stream._writableState.errored; - if (readDone() && writeDone()) { + if (streamNaturalCloseSettled(stream)) { stream.destroy(); return true; } - const maybeDestroy = () => { - if (!stream.destroyed && readDone() && writeDone()) - stream.destroy(); - }; - stream.once('end', maybeDestroy); - stream.once('finish', maybeDestroy); - stream.once('error', maybeDestroy); + stream.on('end', maybeDestroyNaturalClose); + stream.on('finish', maybeDestroyNaturalClose); + stream.on('error', maybeDestroyNaturalClose); if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER && !stream[kState].didRead && stream.readableFlowing === null) { @@ -2015,12 +2014,14 @@ function streamOnPause() { this[kHandle].readStop(); } +function streamOnFinishMaybeDestroy() { + this[kMaybeDestroy](); +} + function afterShutdown(status) { const stream = this.handle[kOwner]; if (stream) { - stream.on('finish', () => { - stream[kMaybeDestroy](); - }); + stream.on('finish', streamOnFinishMaybeDestroy); } // Currently this status value is unused this.callback(); @@ -2045,6 +2046,47 @@ function shutdownWritable(callback) { return afterShutdown.call(req, 0); } +// Completes one of the two halves of a dispatched write (the write callback +// itself and the end-of-stream check); the stream machinery callback runs +// once both have finished. The state lives on stream[kState] because only a +// single write may be in flight at any given time. +function finishWrite(stream) { + const state = stream[kState]; + if (--state.writePending !== 0) + return; + const cb = state.writeCb; + state.writeCb = null; + const err = aggregateTwoErrors(state.endErr, state.writeErr); + state.writeErr = null; + state.endErr = null; + // writeGeneric does not destroy on error and + // we cannot enable autoDestroy, + // so make sure to destroy on error. + if (err) { + stream.destroy(err); + } + cb(err); +} + +// Runs on the tick after a write was dispatched: if the write turned out to +// be the last chunk of an ending writable, shut the writable side down right +// away so the final DATA frame can include the END_STREAM flag. +function endCheckNT(stream) { + const state = stream[kState]; + if (state.writeErr || + !stream._writableState.ending || + stream._writableState.buffered.length || + (state.flags & STREAM_FLAGS_HAS_TRAILERS)) { + finishWrite(stream); + return; + } + debugStreamObj(stream, 'shutting down writable on last write'); + shutdownWritable.call(stream, (err) => { + state.endErr = err; + finishWrite(stream); + }); +} + function finishSendTrailers(stream, headersList) { // The stream might be destroyed and in that case // there is nothing to do. @@ -2132,7 +2174,7 @@ function finishCloseStream(code) { // An Http2Stream is a Duplex stream that is backed by a // node::http2::Http2Stream handle implementing StreamBase. class Http2Stream extends Duplex { - constructor(session, options) { + constructor(session, options, hasHandle = false) { options.allowHalfOpen = true; options.decodeStrings = false; options.autoDestroy = false; @@ -2144,7 +2186,10 @@ class Http2Stream extends Duplex { // been assigned. this.cork(); this[kSession] = session; - session[kState].pendingStreams.add(this); + // Streams constructed with their native handle already available (e.g. + // server streams) are initialized immediately and never become pending. + if (!hasHandle) + session[kState].pendingStreams.add(this); // Allow our logic for determining whether any reads have happened to // work in all situations. This is similar to what we do in _http_incoming. @@ -2159,6 +2204,14 @@ class Http2Stream extends Duplex { writeQueueSize: 0, trailersReady: false, endAfterHeaders: false, + writeCb: null, + writeErr: null, + endErr: null, + writePending: 0, + endInProgress: false, + endCheckOwed: false, + shutdownWritableCalled: false, + fd: -1, }; // Fields used by the compat API to avoid megamorphisms. @@ -2166,9 +2219,53 @@ class Http2Stream extends Duplex { this[kProxySocket] = null; this.on('pause', streamOnPause); + } + + addListener(name, listener) { + const ret = super.addListener(name, listener); + if (name === 'priority' || name === 'frameError') + trackStreamListener(this, name, 1); + return ret; + } + + on(name, listener) { + const ret = super.on(name, listener); + if (name === 'priority' || name === 'frameError') + trackStreamListener(this, name, 1); + return ret; + } - this.on('newListener', streamListenerAdded); - this.on('removeListener', streamListenerRemoved); + prependListener(name, listener) { + const ret = super.prependListener(name, listener); + if (name === 'priority' || name === 'frameError') + trackStreamListener(this, name, 1); + return ret; + } + + removeListener(name, listener) { + if (name === 'priority' || name === 'frameError') { + const before = this.listenerCount(name); + const ret = super.removeListener(name, listener); + if (this.listenerCount(name) !== before) + trackStreamListener(this, name, -1); + return ret; + } + return super.removeListener(name, listener); + } + + removeAllListeners(name) { + let priority = 0; + let frameError = 0; + if (name === undefined || name === 'priority') + priority = this.listenerCount('priority'); + if (name === undefined || name === 'frameError') + frameError = this.listenerCount('frameError'); + const ret = super.removeAllListeners(name); + if (priority !== 0) + trackStreamListener(this, 'priority', -priority); + if (frameError !== 0) + trackStreamListener(this, 'frameError', -frameError); + return ret; } [kUpdateTimer]() { @@ -2346,45 +2443,40 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - let req; + // The stream machinery dispatches at most one _write()/_writev() at a + // time, so the coordination state between the write callback and the + // end-of-stream check below can live on the stream state instead of + // being captured by per-write closures. + const state = this[kState]; + state.writeCb = cb; + state.writeErr = null; + state.endErr = null; + + if (state.flags & STREAM_FLAGS_HAS_TRAILERS) { + // Trailers are pending, so the writable side cannot be shut down + // early anyway; there is no point in scheduling the end check. + state.writePending = 1; + } else if (state.endInProgress) { + // This write was dispatched from inside end(), which makes it the + // final chunk; end() runs the end-of-stream check synchronously once + // the stream machinery has settled, avoiding a nextTick per write. + state.writePending = 2; + state.endCheckOwed = true; + } else { + state.writePending = 2; + // Shutdown write stream right after last chunk is sent + // so final DATA frame can include END_STREAM flag + process.nextTick(endCheckNT, this); + } - let waitingForWriteCallback = true; - let waitingForEndCheck = true; - let writeCallbackErr; - let endCheckCallbackErr; - const done = () => { - if (waitingForEndCheck || waitingForWriteCallback) return; - const err = aggregateTwoErrors(endCheckCallbackErr, writeCallbackErr); - // writeGeneric does not destroy on error and - // we cannot enable autoDestroy, - // so make sure to destroy on error. - if (err) { - this.destroy(err); - } - cb(err); - }; + // This is invoked both as a method on the write req and as a plain + // call, so the stream has to be captured here. const writeCallback = (err) => { - waitingForWriteCallback = false; - writeCallbackErr = err; - done(); - }; - const endCheckCallback = (err) => { - waitingForEndCheck = false; - endCheckCallbackErr = err; - done(); + state.writeErr = err; + finishWrite(this); }; - // Shutdown write stream right after last chunk is sent - // so final DATA frame can include END_STREAM flag - process.nextTick(() => { - if (writeCallbackErr || - !this._writableState.ending || - this._writableState.buffered.length || - (this[kState].flags & STREAM_FLAGS_HAS_TRAILERS)) - return endCheckCallback(); - debugStreamObj(this, 'shutting down writable on last write'); - shutdownWritable.call(this, endCheckCallback); - }); + let req; if (writev) req = writevGeneric(this, data, writeCallback); else @@ -2410,6 +2502,25 @@ class Http2Stream extends Duplex { this[kWriteGeneric](true, data, '', cb); } + end(chunk, encoding, cb) { + const state = this[kState]; + // Any write dispatched while end() runs is the final chunk. Marking + // that lets [kWriteGeneric] hand its end-of-stream check back to us to + // run synchronously below (once the writable state has settled) + // instead of scheduling a nextTick for it. + state.endInProgress = true; + try { + super.end(chunk, encoding, cb); + } finally { + state.endInProgress = false; + if (state.endCheckOwed) { + state.endCheckOwed = false; + endCheckNT(this); + } + } + return this; + } + _final(cb) { if (this.pending) { this.once('ready', () => this._final(cb)); @@ -2439,6 +2550,17 @@ class Http2Stream extends Duplex { } } + // Called by the compat layer when trailers are registered after the + // response headers were already sent with auto-empty trailers enabled + // (see STREAM_OPTION_AUTO_EMPTY_TRAILERS); switches the stream back to + // JS-managed trailers. + [kDisableAutoTrailers]() { + const handle = this[kHandle]; + if (this.destroyed || handle === undefined) + return; + handle.disableAutoTrailers(); + } + sendTrailers(headers) { if (this.destroyed || this.closed) throw new ERR_HTTP2_INVALID_STREAM(); @@ -2561,10 +2683,16 @@ class Http2Stream extends Duplex { // gives the session the opportunity to clean itself up. The session // will destroy if it has been closed and there are no other open or // pending streams. Delay with setImmediate so we don't do it on the - // nghttp2 stack. - setImmediate(() => { - session[kMaybeDestroy](); - }); + // nghttp2 stack. When the session is not closed (or other streams are + // still around), [kMaybeDestroy] would be a no-op, so skip scheduling + // it altogether: session.close() runs its own check, and the native + // side notifies again through ongracefulclosecomplete once all pending + // data has been flushed. + if (session.closed && + sessionState.streams.size === 0 && + sessionState.pendingStreams.size === 0) { + setImmediate(sessionMaybeDestroy, session); + } if (err) { if (session[kType] === NGHTTP2_SESSION_CLIENT) { if (onClientStreamErrorChannel.hasSubscribers) { @@ -2615,6 +2743,8 @@ class Http2Stream extends Duplex { } } +Http2Stream.prototype.off = Http2Stream.prototype.removeListener; + // TODO(aduh95): remove this in future semver-major Http2Stream.prototype.priority = deprecate(function priority(options) { if (this.destroyed) @@ -2649,6 +2779,10 @@ function callStreamClose(stream) { stream.close(); } +function sessionMaybeDestroy(session) { + session[kMaybeDestroy](); +} + function prepareResponseHeaders(stream, headersParam, options) { let headers; let statusCode; @@ -2679,31 +2813,35 @@ function prepareResponseHeaders(stream, headersParam, options) { function prepareResponseHeadersObject(oldHeaders, options) { assertIsObject(oldHeaders, 'headers', ['Object', 'Array']); const headers = { __proto__: null }; + let statusCode; + let hasDate = false; if (oldHeaders !== null && oldHeaders !== undefined) { // This loop is here for performance reason. Do not change. + // The :status and date fields are picked up while copying so they do + // not have to be looked up again on the null-prototype copy. for (const key in oldHeaders) { if (ObjectHasOwn(oldHeaders, key)) { - headers[key] = oldHeaders[key]; + const value = oldHeaders[key]; + headers[key] = value; + if (key === HTTP2_HEADER_STATUS) + statusCode = value; + else if (key === HTTP2_HEADER_DATE) + hasDate = value != null; } } headers[kSensitiveHeaders] = oldHeaders[kSensitiveHeaders]; } - const statusCode = - headers[HTTP2_HEADER_STATUS] = - headers[HTTP2_HEADER_STATUS] | 0 || HTTP_STATUS_OK; + statusCode = headers[HTTP2_HEADER_STATUS] = statusCode | 0 || HTTP_STATUS_OK; - if (options.sendDate == null || options.sendDate) { - headers[HTTP2_HEADER_DATE] ??= utcDate(); + if (!hasDate && (options.sendDate == null || options.sendDate)) { + headers[HTTP2_HEADER_DATE] = utcDate(); } validatePreparedResponseHeaders(headers, statusCode); - return { - headers, - statusCode: headers[HTTP2_HEADER_STATUS], - }; + return { headers, statusCode }; } function prepareResponseHeadersArray(headers, options) { @@ -2961,12 +3099,14 @@ function afterOpen(session, options, headers, streamOptions, err, fd) { class ServerHttp2Stream extends Http2Stream { constructor(session, handle, id, options, headers) { - super(session, options); + super(session, options, true); handle.owner = this; this[kInit](id, handle); this[kProtocol] = headers[HTTP2_HEADER_SCHEME]; this[kAuthority] = getAuthority(headers); - this.once('finish', autoDrainReadable); + // 'finish' is only emitted once, so a regular listener is safe here and + // avoids allocating a once() wrapper for every stream. + this.on('finish', autoDrainReadable); } // True if the remote peer accepts push streams @@ -3086,20 +3226,27 @@ class ServerHttp2Stream extends Http2Stream { const state = this[kState]; assertIsObject(options, 'options'); - options = { ...options }; + // The options are only read, never mutated, so the user-provided object + // can be used directly instead of copying it. + options ??= kEmptyObject; debugStreamObj(this, 'initiating response'); this[kUpdateTimer](); - options.endStream = !!options.endStream; + const endStream = !!options.endStream; let streamOptions = 0; - if (options.endStream) + if (endStream) streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD; if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; state.flags |= STREAM_FLAGS_HAS_TRAILERS; + // Internal mode used by the compat layer: if no trailers have been + // registered by the time the final DATA frame is sent, the native + // side finishes the stream without calling back into JS at all. + if (options[kAutoEmptyTrailers]) + streamOptions |= STREAM_OPTION_AUTO_EMPTY_TRAILERS; } const { @@ -3112,12 +3259,11 @@ class ServerHttp2Stream extends Http2Stream { // Close the writable side if the endStream option is set or status // is one of known codes with no payload, or it's a head request - if (!!options.endStream || + if (endStream || statusCode === HTTP_STATUS_NO_CONTENT || statusCode === HTTP_STATUS_RESET_CONTENT || statusCode === HTTP_STATUS_NOT_MODIFIED || this.headRequest === true) { - options.endStream = true; this.end(); } @@ -3307,7 +3453,7 @@ ServerHttp2Stream.prototype[kProceed] = ServerHttp2Stream.prototype.respond; class ClientHttp2Stream extends Http2Stream { constructor(session, handle, id, options) { - super(session, options); + super(session, options, id !== undefined); this[kState].flags |= STREAM_FLAGS_HEADERS_SENT; if (id !== undefined) this[kInit](id, handle); diff --git a/lib/internal/http2/util.js b/lib/internal/http2/util.js index 25adc8f9697d82..159ba5bd531524 100644 --- a/lib/internal/http2/util.js +++ b/lib/internal/http2/util.js @@ -38,6 +38,8 @@ const { } = require('internal/errors'); const kAuthority = Symbol('authority'); +const kAutoEmptyTrailers = Symbol('autoEmptyTrailers'); +const kDisableAutoTrailers = Symbol('disableAutoTrailers'); const kSensitiveHeaders = Symbol('sensitiveHeaders'); const kStrictSingleValueFields = Symbol('strictSingleValueFields'); const kSocket = Symbol('socket'); @@ -770,14 +772,16 @@ function buildNgHeaderString(arrayOrMap, let pseudoHeaders = ''; let count = 0; - const singles = new SafeSet(); + let singles; const sensitiveHeaders = arrayOrMap[kSensitiveHeaders] || emptyArray; - const neverIndex = sensitiveHeaders.map((v) => v.toLowerCase()); + const neverIndex = sensitiveHeaders.length === 0 ? + emptyArray : sensitiveHeaders.map((v) => v.toLowerCase()); function processHeader(key, value) { key = key.toLowerCase(); + const isSingleValueField = kSingleValueFields.has(key); const isStrictSingleValueField = strictSingleValueFields && - kSingleValueFields.has(key); + isSingleValueField; let isArray = ArrayIsArray(value); if (isArray) { switch (value.length) { @@ -795,11 +799,15 @@ function buildNgHeaderString(arrayOrMap, value = String(value); } if (isStrictSingleValueField) { - if (singles.has(key)) + if (singles === undefined) { + singles = [key]; + } else if (singles.includes(key)) { throw new ERR_HTTP2_HEADER_SINGLE_VALUE(key); - singles.add(key); + } else { + singles.push(key); + } } - const flags = neverIndex.includes(key) ? + const flags = neverIndex.length !== 0 && neverIndex.includes(key) ? kNeverIndexFlag : kNoHeaderFlags; if (key[0] === ':') { @@ -810,11 +818,15 @@ function buildNgHeaderString(arrayOrMap, count++; return; } - if (!checkIsHttpToken(key)) { - throw new ERR_INVALID_HTTP_TOKEN('Header name', key); - } - if (isIllegalConnectionSpecificHeader(key, value)) { - throw new ERR_HTTP2_INVALID_CONNECTION_HEADERS(key); + // Well-known single-value fields are all valid HTTP tokens and none of + // them is a connection-specific header, so both checks can be skipped. + if (!isSingleValueField) { + if (!checkIsHttpToken(key)) { + throw new ERR_INVALID_HTTP_TOKEN('Header name', key); + } + if (isIllegalConnectionSpecificHeader(key, value)) { + throw new ERR_HTTP2_INVALID_CONNECTION_HEADERS(key); + } } if (isArray) { for (let j = 0; j < value.length; ++j) { @@ -981,6 +993,8 @@ module.exports = { getStreamState, isPayloadMeaningless, kAuthority, + kAutoEmptyTrailers, + kDisableAutoTrailers, kSensitiveHeaders, kStrictSingleValueFields, kSocket, diff --git a/src/node_http2.cc b/src/node_http2.cc index fca840c0fa03ad..710afee2da845c 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -2308,6 +2308,9 @@ Http2Stream::Http2Stream(Http2Session* session, if (options & STREAM_OPTION_GET_TRAILERS) set_has_trailers(); + if (options & STREAM_OPTION_AUTO_EMPTY_TRAILERS) + set_auto_empty_trailers(); + PushStreamListener(&stream_listener_); if (options & STREAM_OPTION_EMPTY_PAYLOAD) @@ -2435,6 +2438,9 @@ int Http2Stream::SubmitResponse(const Http2Headers& headers, int options) { if (options & STREAM_OPTION_GET_TRAILERS) set_has_trailers(); + if (options & STREAM_OPTION_AUTO_EMPTY_TRAILERS) + set_auto_empty_trailers(); + if (!is_writable()) options |= STREAM_OPTION_EMPTY_PAYLOAD; @@ -2468,39 +2474,67 @@ int Http2Stream::SubmitInfo(const Http2Headers& headers) { } void Http2Stream::OnTrailers() { + CHECK(!this->is_destroyed()); + set_has_trailers(false); + if (!auto_empty_trailers()) { + EmitWantTrailers(); + return; + } + // The JS side has not registered any trailers, so the stream can be + // finished without calling into JS at all. The empty DATA frame cannot be + // submitted synchronously because OnTrailers() runs from inside the data + // source read callback for the final DATA frame; defer it to the next + // turn of the event loop, just like the JS sendTrailers() path does. + Debug(this, "auto-submitting empty trailers"); + env()->SetImmediate( + [self = BaseObjectPtr(this)](Environment* env) { + if (self->is_destroyed()) return; + // Hand control back to the JS side if trailers were registered in + // the meantime or if submitting the empty DATA frame failed. + if (!self->auto_empty_trailers() || self->SubmitEmptyTrailers() != 0) + self->EmitWantTrailers(); + }); +} + +void Http2Stream::EmitWantTrailers() { Debug(this, "let javascript know we are ready for trailers"); CHECK(!this->is_destroyed()); Isolate* isolate = env()->isolate(); HandleScope scope(isolate); Local context = env()->context(); Context::Scope context_scope(context); - set_has_trailers(false); MakeCallback(env()->http2session_on_stream_trailers_function(), 0, nullptr); } -// Submit informational headers for a stream. +// Sending an empty trailers frame poses problems in Safari, Edge & IE. +// Instead we can just send an empty data frame with NGHTTP2_FLAG_END_STREAM +// to indicate that the stream is ready to be closed. +int Http2Stream::SubmitEmptyTrailers() { + CHECK(!this->is_destroyed()); + Http2Scope h2scope(this); + Debug(this, "sending empty trailers"); + Http2Stream::Provider::Stream prov(this, 0); + int ret = nghttp2_submit_data( + session_->session(), + NGHTTP2_FLAG_END_STREAM, + id_, + *prov); + CHECK_NE(ret, NGHTTP2_ERR_NOMEM); + return ret; +} + +// Submit trailing headers for a stream. int Http2Stream::SubmitTrailers(const Http2Headers& headers) { CHECK(!this->is_destroyed()); + if (headers.length() == 0) + return SubmitEmptyTrailers(); Http2Scope h2scope(this); Debug(this, "sending %d trailers", headers.length()); - int ret; - // Sending an empty trailers frame poses problems in Safari, Edge & IE. - // Instead we can just send an empty data frame with NGHTTP2_FLAG_END_STREAM - // to indicate that the stream is ready to be closed. - if (headers.length() == 0) { - Http2Stream::Provider::Stream prov(this, 0); - ret = nghttp2_submit_data( - session_->session(), - NGHTTP2_FLAG_END_STREAM, - id_, - *prov); - } else { - ret = nghttp2_submit_trailer( - session_->session(), - id_, - headers.data(), - headers.length()); - } + int ret = nghttp2_submit_trailer( + session_->session(), + id_, + headers.data(), + headers.length()); CHECK_NE(ret, NGHTTP2_ERR_NOMEM); return ret; } @@ -3110,6 +3144,15 @@ void Http2Stream::Trailers(const FunctionCallbackInfo& args) { stream->SubmitTrailers(Http2Headers(env, headers))); } +// Called by the JS layer when trailers are registered after the response +// headers were already submitted with STREAM_OPTION_AUTO_EMPTY_TRAILERS set, +// so that the trailers are handed back to JS instead of being auto-emptied. +void Http2Stream::DisableAutoTrailers(const FunctionCallbackInfo& args) { + Http2Stream* stream; + ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); + stream->set_auto_empty_trailers(false); +} + // Grab the numeric id of the Http2Stream void Http2Stream::GetID(const FunctionCallbackInfo& args) { Http2Stream* stream; @@ -3547,6 +3590,10 @@ void Initialize(Local target, SetProtoMethod(isolate, stream, "pushPromise", Http2Stream::PushPromise); SetProtoMethod(isolate, stream, "info", Http2Stream::Info); SetProtoMethod(isolate, stream, "trailers", Http2Stream::Trailers); + SetProtoMethod(isolate, + stream, + "disableAutoTrailers", + Http2Stream::DisableAutoTrailers); SetProtoMethod(isolate, stream, "respond", Http2Stream::Respond); SetProtoMethod(isolate, stream, "rstStream", Http2Stream::RstStream); SetProtoMethod(isolate, stream, "refreshState", Http2Stream::RefreshState); diff --git a/src/node_http2.h b/src/node_http2.h index c9957cb559b323..670c5b5db81f70 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -57,6 +57,10 @@ constexpr int STREAM_OPTION_EMPTY_PAYLOAD = 0x1; // Stream might have trailing headers constexpr int STREAM_OPTION_GET_TRAILERS = 0x2; +// Stream may finish with an empty DATA frame carrying END_STREAM without +// calling back into JS, unless trailers are registered before then +constexpr int STREAM_OPTION_AUTO_EMPTY_TRAILERS = 0x4; + // Http2Stream internal states constexpr int kStreamStateNone = 0x0; constexpr int kStreamStateShut = 0x1; @@ -66,6 +70,7 @@ constexpr int kStreamStateClosed = 0x8; constexpr int kStreamStateDestroyed = 0x10; constexpr int kStreamStateTrailers = 0x20; constexpr int kStreamStatePeerReset = 0x40; +constexpr int kStreamStateAutoEmptyTrailers = 0x80; // Http2Session internal states constexpr int kSessionStateNone = 0x0; @@ -310,7 +315,9 @@ class Http2Stream : public AsyncWrap, // Submit trailing headers for this stream int SubmitTrailers(const Http2Headers& headers); + int SubmitEmptyTrailers(); void OnTrailers(); + void EmitWantTrailers(); // Submit a PRIORITY frame for this stream int SubmitPriority(const Http2Priority& priority, bool silent = false); @@ -368,6 +375,17 @@ class Http2Stream : public AsyncWrap, flags_ &= ~kStreamStateTrailers; } + bool auto_empty_trailers() const { + return flags_ & kStreamStateAutoEmptyTrailers; + } + + void set_auto_empty_trailers(bool on = true) { + if (on) + flags_ |= kStreamStateAutoEmptyTrailers; + else + flags_ &= ~kStreamStateAutoEmptyTrailers; + } + void set_closed() { flags_ |= kStreamStateClosed; } @@ -463,6 +481,8 @@ class Http2Stream : public AsyncWrap, static void RefreshState(const v8::FunctionCallbackInfo& args); static void Info(const v8::FunctionCallbackInfo& args); static void Trailers(const v8::FunctionCallbackInfo& args); + static void DisableAutoTrailers( + const v8::FunctionCallbackInfo& args); static void Respond(const v8::FunctionCallbackInfo& args); static void RstStream(const v8::FunctionCallbackInfo& args); @@ -1119,7 +1139,8 @@ class Origins { V(NGHTTP2_ERR_STREAM_CLOSED) \ V(NGHTTP2_ERR_NOMEM) \ V(STREAM_OPTION_EMPTY_PAYLOAD) \ - V(STREAM_OPTION_GET_TRAILERS) + V(STREAM_OPTION_GET_TRAILERS) \ + V(STREAM_OPTION_AUTO_EMPTY_TRAILERS) #define HTTP2_ERROR_CODES(V) \ V(NGHTTP2_NO_ERROR) \ diff --git a/test/parallel/test-http2-compat-serverresponse-trailers-streaming.js b/test/parallel/test-http2-compat-serverresponse-trailers-streaming.js new file mode 100644 index 00000000000000..b15cc4680b00bc --- /dev/null +++ b/test/parallel/test-http2-compat-serverresponse-trailers-streaming.js @@ -0,0 +1,70 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const h2 = require('http2'); + +// Regression test for the auto-empty-trailers optimization: when the +// response headers are flushed before the response is ended (streaming +// mode), trailers registered afterwards must still be sent, and responses +// that never register trailers must complete normally. + +const server = h2.createServer(); +server.listen(0, common.mustCall(() => { + const port = server.address().port; + server.on('request', (request, response) => { + if (request.url === '/trailers') { + response.writeHead(200); + response.write('hello'); + // Trailers registered after the headers were already flushed. + response.setTrailer('x-checksum', 'abc'); + response.addTrailers({ 'x-count': 2 }); + response.end('world'); + } else { + response.writeHead(200); + response.write('no'); + response.end('trailers'); + } + }); + + const client = h2.connect(`http://localhost:${port}`); + + { + const request = client.request({ ':path': '/trailers' }); + let body = ''; + request.setEncoding('utf8'); + request.on('data', (chunk) => body += chunk); + request.on('trailers', common.mustCall((trailers) => { + assert.strictEqual(trailers['x-checksum'], 'abc'); + assert.strictEqual(trailers['x-count'], '2'); + })); + request.on('end', common.mustCall(() => { + assert.strictEqual(body, 'helloworld'); + maybeClose(); + })); + request.end(); + } + + { + const request = client.request({ ':path': '/plain' }); + let body = ''; + request.setEncoding('utf8'); + request.on('data', (chunk) => body += chunk); + request.on('trailers', common.mustNotCall()); + request.on('end', common.mustCall(() => { + assert.strictEqual(body, 'notrailers'); + maybeClose(); + })); + request.end(); + } + + let remaining = 2; + function maybeClose() { + if (--remaining === 0) { + client.close(); + server.close(); + } + } +}));