Pārlūkot izejas kodu

Reuse inbound request headers storage for server streams (#1971)

Motivation:

Each stream starts with headers. When the server accepts a stream it
will respond with headers and eventually, once the RPC has been
completed, respond with trailers. For each RPC on the server, three sets
of headers are allocated.

However, once the response headers are received we drop any ref to them
(as they are copied into metadata), rather then dropping it on the floor
we can hold onto the headers and reuse their already allocated storage
for both the initial and trailing metadata. Assuming there is enough
capacity this saves two allocations per RPC.

Modifications:

- Hold on to headers in the stream state machine, clearning and
  re-populating them when necessary for the initial and trailing resposne
  headers.
- Add a '_modifying' state.

Result:

Fewer allocations
George Barnett 1 gadu atpakaļ
vecāks
revīzija
00a4bacd7b
1 mainītis faili ar 178 papildinājumiem un 40 dzēšanām
  1. 178 40
      Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

+ 178 - 40
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -66,6 +66,7 @@ private enum GRPCStreamStateMachineState {
   case clientClosedServerIdle(ClientClosedServerIdleState)
   case clientClosedServerIdle(ClientClosedServerIdleState)
   case clientClosedServerOpen(ClientClosedServerOpenState)
   case clientClosedServerOpen(ClientClosedServerOpenState)
   case clientClosedServerClosed(ClientClosedServerClosedState)
   case clientClosedServerClosed(ClientClosedServerClosedState)
+  case _modifying
 
 
   struct ClientIdleServerIdleState {
   struct ClientIdleServerIdleState {
     let maximumPayloadSize: Int
     let maximumPayloadSize: Int
@@ -86,13 +87,18 @@ private enum GRPCStreamStateMachineState {
 
 
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
 
 
+    // Store the headers received from the remote peer, its storage can be reused when sending
+    // headers back to the remote peer.
+    var headers: HPACKHeaders
+
     init(
     init(
       previousState: ClientIdleServerIdleState,
       previousState: ClientIdleServerIdleState,
       compressor: Zlib.Compressor?,
       compressor: Zlib.Compressor?,
       outboundCompression: CompressionAlgorithm,
       outboundCompression: CompressionAlgorithm,
       framer: GRPCMessageFramer,
       framer: GRPCMessageFramer,
       decompressor: Zlib.Decompressor?,
       decompressor: Zlib.Decompressor?,
-      deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
+      deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?,
+      headers: HPACKHeaders
     ) {
     ) {
       self.maximumPayloadSize = previousState.maximumPayloadSize
       self.maximumPayloadSize = previousState.maximumPayloadSize
       self.compressor = compressor
       self.compressor = compressor
@@ -101,6 +107,7 @@ private enum GRPCStreamStateMachineState {
       self.decompressor = decompressor
       self.decompressor = decompressor
       self.deframer = deframer
       self.deframer = deframer
       self.inboundMessageBuffer = .init()
       self.inboundMessageBuffer = .init()
+      self.headers = headers
     }
     }
   }
   }
 
 
@@ -114,6 +121,10 @@ private enum GRPCStreamStateMachineState {
 
 
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
 
 
+    // Store the headers received from the remote peer, its storage can be reused when sending
+    // headers back to the remote peer.
+    var headers: HPACKHeaders
+
     init(
     init(
       previousState: ClientOpenServerIdleState,
       previousState: ClientOpenServerIdleState,
       deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
       deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
@@ -127,6 +138,7 @@ private enum GRPCStreamStateMachineState {
       self.decompressor = decompressor
       self.decompressor = decompressor
 
 
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
+      self.headers = previousState.headers
     }
     }
   }
   }
 
 
@@ -192,6 +204,10 @@ private enum GRPCStreamStateMachineState {
 
 
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
 
 
+    // Store the headers received from the remote peer, its storage can be reused when sending
+    // headers back to the remote peer.
+    var headers: HPACKHeaders
+
     /// This transition should only happen on the client-side.
     /// This transition should only happen on the client-side.
     /// It can happen if the request times out before the client outbound can be opened, or if the stream is
     /// It can happen if the request times out before the client outbound can be opened, or if the stream is
     /// unexpectedly closed for some other reason on the client before it can transition to open.
     /// unexpectedly closed for some other reason on the client before it can transition to open.
@@ -207,6 +223,7 @@ private enum GRPCStreamStateMachineState {
       self.decompressor = nil
       self.decompressor = nil
 
 
       self.inboundMessageBuffer = .init()
       self.inboundMessageBuffer = .init()
+      self.headers = [:]
     }
     }
 
 
     /// This transition should only happen on the server-side.
     /// This transition should only happen on the server-side.
@@ -215,7 +232,8 @@ private enum GRPCStreamStateMachineState {
     /// any more messages from the client anyways, as it's closed.
     /// any more messages from the client anyways, as it's closed.
     init(
     init(
       previousState: ClientIdleServerIdleState,
       previousState: ClientIdleServerIdleState,
-      compressionAlgorithm: CompressionAlgorithm
+      compressionAlgorithm: CompressionAlgorithm,
+      headers: HPACKHeaders
     ) {
     ) {
       self.maximumPayloadSize = previousState.maximumPayloadSize
       self.maximumPayloadSize = previousState.maximumPayloadSize
 
 
@@ -231,6 +249,7 @@ private enum GRPCStreamStateMachineState {
       // client: it's closed.
       // client: it's closed.
       self.deframer = nil
       self.deframer = nil
       self.inboundMessageBuffer = .init()
       self.inboundMessageBuffer = .init()
+      self.headers = headers
     }
     }
 
 
     init(previousState: ClientOpenServerIdleState) {
     init(previousState: ClientOpenServerIdleState) {
@@ -241,6 +260,7 @@ private enum GRPCStreamStateMachineState {
       self.deframer = previousState.deframer
       self.deframer = previousState.deframer
       self.decompressor = previousState.decompressor
       self.decompressor = previousState.decompressor
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
+      self.headers = previousState.headers
     }
     }
   }
   }
 
 
@@ -254,6 +274,10 @@ private enum GRPCStreamStateMachineState {
 
 
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
 
 
+    // Store the headers received from the remote peer, its storage can be reused when sending
+    // headers back to the remote peer.
+    var headers: HPACKHeaders
+
     init(previousState: ClientOpenServerOpenState) {
     init(previousState: ClientOpenServerOpenState) {
       self.framer = previousState.framer
       self.framer = previousState.framer
       self.compressor = previousState.compressor
       self.compressor = previousState.compressor
@@ -261,6 +285,7 @@ private enum GRPCStreamStateMachineState {
       self.deframer = previousState.deframer
       self.deframer = previousState.deframer
       self.decompressor = previousState.decompressor
       self.decompressor = previousState.decompressor
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
+      self.headers = previousState.headers
     }
     }
 
 
     /// This should be called from the server path, as the deframer will already be configured in this scenario.
     /// This should be called from the server path, as the deframer will already be configured in this scenario.
@@ -275,6 +300,7 @@ private enum GRPCStreamStateMachineState {
       self.decompressor = nil
       self.decompressor = nil
 
 
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
+      self.headers = previousState.headers
     }
     }
 
 
     /// This should only be called from the client path, as the deframer has not yet been set up.
     /// This should only be called from the client path, as the deframer has not yet been set up.
@@ -298,6 +324,7 @@ private enum GRPCStreamStateMachineState {
       self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
       self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
 
 
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
       self.inboundMessageBuffer = previousState.inboundMessageBuffer
+      self.headers = previousState.headers
     }
     }
   }
   }
 
 
@@ -537,6 +564,8 @@ struct GRPCStreamStateMachine {
       state.decompressor?.end()
       state.decompressor?.end()
     case .clientClosedServerClosed(let state):
     case .clientClosedServerClosed(let state):
       state.compressor?.end()
       state.compressor?.end()
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -632,7 +661,8 @@ extension GRPCStreamStateMachine {
           outboundCompression: outboundEncoding,
           outboundCompression: outboundEncoding,
           framer: GRPCMessageFramer(),
           framer: GRPCMessageFramer(),
           decompressor: nil,
           decompressor: nil,
-          deframer: nil
+          deframer: nil,
+          headers: [:]
         )
         )
       )
       )
       return self.makeClientHeaders(
       return self.makeClientHeaders(
@@ -650,6 +680,8 @@ extension GRPCStreamStateMachine {
       try self.invalidState(
       try self.invalidState(
         "Client is closed: can't send metadata."
         "Client is closed: can't send metadata."
       )
       )
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -657,19 +689,28 @@ extension GRPCStreamStateMachine {
     switch self.state {
     switch self.state {
     case .clientIdleServerIdle:
     case .clientIdleServerIdle:
       try self.invalidState("Client not yet open.")
       try self.invalidState("Client not yet open.")
+
     case .clientOpenServerIdle(var state):
     case .clientOpenServerIdle(var state):
+      self.state = ._modifying
       state.framer.append(message, promise: promise)
       state.framer.append(message, promise: promise)
       self.state = .clientOpenServerIdle(state)
       self.state = .clientOpenServerIdle(state)
+
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       state.framer.append(message, promise: promise)
       state.framer.append(message, promise: promise)
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
+
     case .clientOpenServerClosed:
     case .clientOpenServerClosed:
       // The server has closed, so it makes no sense to send the rest of the request.
       // The server has closed, so it makes no sense to send the rest of the request.
       ()
       ()
+
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
       try self.invalidState(
       try self.invalidState(
         "Client is closed, cannot send a message."
         "Client is closed, cannot send a message."
       )
       )
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -686,6 +727,8 @@ extension GRPCStreamStateMachine {
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
       // Client is already closed - nothing to do.
       // Client is already closed - nothing to do.
       ()
       ()
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -697,18 +740,21 @@ extension GRPCStreamStateMachine {
       try self.invalidState("Client is not open yet.")
       try self.invalidState("Client is not open yet.")
 
 
     case .clientOpenServerIdle(var state):
     case .clientOpenServerIdle(var state):
+      self.state = ._modifying
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerIdle(state)
       self.state = .clientOpenServerIdle(state)
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
         ?? .awaitMoreMessages
         ?? .awaitMoreMessages
 
 
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
         ?? .awaitMoreMessages
         ?? .awaitMoreMessages
 
 
     case .clientClosedServerIdle(var state):
     case .clientClosedServerIdle(var state):
+      self.state = ._modifying
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerIdle(state)
       self.state = .clientClosedServerIdle(state)
       if let request {
       if let request {
@@ -718,6 +764,7 @@ extension GRPCStreamStateMachine {
       }
       }
 
 
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
+      self.state = ._modifying
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerOpen(state)
       self.state = .clientClosedServerOpen(state)
       if let request {
       if let request {
@@ -729,6 +776,9 @@ extension GRPCStreamStateMachine {
     case .clientOpenServerClosed, .clientClosedServerClosed:
     case .clientOpenServerClosed, .clientClosedServerClosed:
       // No point in sending any more requests if the server is closed.
       // No point in sending any more requests if the server is closed.
       return .noMoreMessages
       return .noMoreMessages
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -973,6 +1023,8 @@ extension GRPCStreamStateMachine {
       try self.invalidState(
       try self.invalidState(
         "Server is closed, nothing could have been sent."
         "Server is closed, nothing could have been sent."
       )
       )
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -993,6 +1045,7 @@ extension GRPCStreamStateMachine {
       )
       )
 
 
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       if endStream {
       if endStream {
         // This is invalid as per the protocol specification, because the server
         // This is invalid as per the protocol specification, because the server
         // can only close by sending trailers, not by setting EOS when sending
         // can only close by sending trailers, not by setting EOS when sending
@@ -1016,6 +1069,7 @@ extension GRPCStreamStateMachine {
       return .readInbound
       return .readInbound
 
 
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
+      self.state = ._modifying
       if endStream {
       if endStream {
         self.state = .clientClosedServerClosed(.init(previousState: state))
         self.state = .clientClosedServerClosed(.init(previousState: state))
         return .endRPCAndForwardErrorStatus(
         return .endRPCAndForwardErrorStatus(
@@ -1042,31 +1096,43 @@ extension GRPCStreamStateMachine {
       try self.invalidState(
       try self.invalidState(
         "Cannot have received anything from a closed server."
         "Cannot have received anything from a closed server."
       )
       )
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
   private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
   private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
     switch self.state {
     switch self.state {
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       let message = state.inboundMessageBuffer.pop()
       let message = state.inboundMessageBuffer.pop()
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
       return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
       return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
+
     case .clientOpenServerClosed(var state):
     case .clientOpenServerClosed(var state):
+      self.state = ._modifying
       let message = state.inboundMessageBuffer.pop()
       let message = state.inboundMessageBuffer.pop()
       self.state = .clientOpenServerClosed(state)
       self.state = .clientOpenServerClosed(state)
       return message.map { .receiveMessage($0) } ?? .noMoreMessages
       return message.map { .receiveMessage($0) } ?? .noMoreMessages
+
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
+      self.state = ._modifying
       let message = state.inboundMessageBuffer.pop()
       let message = state.inboundMessageBuffer.pop()
       self.state = .clientClosedServerOpen(state)
       self.state = .clientClosedServerOpen(state)
       return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
       return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
+
     case .clientClosedServerClosed(var state):
     case .clientClosedServerClosed(var state):
+      self.state = ._modifying
       let message = state.inboundMessageBuffer.pop()
       let message = state.inboundMessageBuffer.pop()
       self.state = .clientClosedServerClosed(state)
       self.state = .clientClosedServerClosed(state)
       return message.map { .receiveMessage($0) } ?? .noMoreMessages
       return message.map { .receiveMessage($0) } ?? .noMoreMessages
+
     case .clientIdleServerIdle,
     case .clientIdleServerIdle,
       .clientOpenServerIdle,
       .clientOpenServerIdle,
       .clientClosedServerIdle:
       .clientClosedServerIdle:
       return .awaitMoreMessages
       return .awaitMoreMessages
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -1103,6 +1169,9 @@ extension GRPCStreamStateMachine {
 
 
     case .clientOpenServerClosed, .clientClosedServerClosed:
     case .clientOpenServerClosed, .clientClosedServerClosed:
       return .doNothing
       return .doNothing
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 }
 }
@@ -1111,14 +1180,16 @@ extension GRPCStreamStateMachine {
 
 
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 extension GRPCStreamStateMachine {
 extension GRPCStreamStateMachine {
-  private func makeResponseHeaders(
+  private func formResponseHeaders(
+    in headers: inout HPACKHeaders,
     outboundEncoding: CompressionAlgorithm?,
     outboundEncoding: CompressionAlgorithm?,
     configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
     configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
     customMetadata: Metadata
     customMetadata: Metadata
-  ) -> HPACKHeaders {
+  ) {
+    headers.removeAll(keepingCapacity: true)
+
     // Response headers always contain :status (HTTP Status 200) and content-type.
     // Response headers always contain :status (HTTP Status 200) and content-type.
     // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
     // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
-    var headers = HPACKHeaders()
     headers.reserveCapacity(4 + customMetadata.count)
     headers.reserveCapacity(4 + customMetadata.count)
 
 
     headers.add("200", forKey: .status)
     headers.add("200", forKey: .status)
@@ -1131,8 +1202,6 @@ extension GRPCStreamStateMachine {
     for metadataPair in customMetadata {
     for metadataPair in customMetadata {
       headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
       headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
     }
     }
-
-    return headers
   }
   }
 
 
   private mutating func serverSend(
   private mutating func serverSend(
@@ -1141,8 +1210,16 @@ extension GRPCStreamStateMachine {
   ) throws -> HPACKHeaders {
   ) throws -> HPACKHeaders {
     // Server sends initial metadata
     // Server sends initial metadata
     switch self.state {
     switch self.state {
-    case .clientOpenServerIdle(let state):
+    case .clientOpenServerIdle(var state):
+      self.state = ._modifying
       let outboundEncoding = state.outboundCompression
       let outboundEncoding = state.outboundCompression
+      self.formResponseHeaders(
+        in: &state.headers,
+        outboundEncoding: outboundEncoding,
+        configuration: configuration,
+        customMetadata: metadata
+      )
+
       self.state = .clientOpenServerOpen(
       self.state = .clientOpenServerOpen(
         .init(
         .init(
           previousState: state,
           previousState: state,
@@ -1153,19 +1230,21 @@ extension GRPCStreamStateMachine {
           decompressor: state.decompressor
           decompressor: state.decompressor
         )
         )
       )
       )
-      return self.makeResponseHeaders(
-        outboundEncoding: outboundEncoding,
-        configuration: configuration,
-        customMetadata: metadata
-      )
-    case .clientClosedServerIdle(let state):
+
+      return state.headers
+
+    case .clientClosedServerIdle(var state):
+      self.state = ._modifying
       let outboundEncoding = state.outboundCompression
       let outboundEncoding = state.outboundCompression
-      self.state = .clientClosedServerOpen(.init(previousState: state))
-      return self.makeResponseHeaders(
+      self.formResponseHeaders(
+        in: &state.headers,
         outboundEncoding: outboundEncoding,
         outboundEncoding: outboundEncoding,
         configuration: configuration,
         configuration: configuration,
         customMetadata: metadata
         customMetadata: metadata
       )
       )
+      self.state = .clientClosedServerOpen(.init(previousState: state))
+      return state.headers
+
     case .clientIdleServerIdle:
     case .clientIdleServerIdle:
       try self.invalidState(
       try self.invalidState(
         "Client cannot be idle if server is sending initial metadata: it must have opened."
         "Client cannot be idle if server is sending initial metadata: it must have opened."
@@ -1178,6 +1257,8 @@ extension GRPCStreamStateMachine {
       try self.invalidState(
       try self.invalidState(
         "Server has already sent initial metadata."
         "Server has already sent initial metadata."
       )
       )
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -1187,16 +1268,23 @@ extension GRPCStreamStateMachine {
       try self.invalidState(
       try self.invalidState(
         "Server must have sent initial metadata before sending a message."
         "Server must have sent initial metadata before sending a message."
       )
       )
+
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       state.framer.append(message, promise: promise)
       state.framer.append(message, promise: promise)
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
+
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
+      self.state = ._modifying
       state.framer.append(message, promise: promise)
       state.framer.append(message, promise: promise)
       self.state = .clientClosedServerOpen(state)
       self.state = .clientClosedServerOpen(state)
+
     case .clientOpenServerClosed, .clientClosedServerClosed:
     case .clientOpenServerClosed, .clientClosedServerClosed:
       try self.invalidState(
       try self.invalidState(
         "Server can't send a message if it's closed."
         "Server can't send a message if it's closed."
       )
       )
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -1206,18 +1294,30 @@ extension GRPCStreamStateMachine {
   ) throws -> HPACKHeaders {
   ) throws -> HPACKHeaders {
     // Close the server.
     // Close the server.
     switch self.state {
     switch self.state {
-    case .clientOpenServerOpen(let state):
+    case .clientOpenServerOpen(var state):
+      self.state = ._modifying
+      state.headers.formTrailers(status: status, metadata: customMetadata)
       self.state = .clientOpenServerClosed(.init(previousState: state))
       self.state = .clientOpenServerClosed(.init(previousState: state))
-      return .trailers(status: status, metadata: customMetadata)
-    case .clientClosedServerOpen(let state):
+      return state.headers
+
+    case .clientClosedServerOpen(var state):
+      self.state = ._modifying
+      state.headers.formTrailers(status: status, metadata: customMetadata)
       self.state = .clientClosedServerClosed(.init(previousState: state))
       self.state = .clientClosedServerClosed(.init(previousState: state))
-      return .trailers(status: status, metadata: customMetadata)
-    case .clientOpenServerIdle(let state):
+      return state.headers
+
+    case .clientOpenServerIdle(var state):
+      self.state = ._modifying
+      state.headers.formTrailersOnly(status: status, metadata: customMetadata)
       self.state = .clientOpenServerClosed(.init(previousState: state))
       self.state = .clientOpenServerClosed(.init(previousState: state))
-      return .trailersOnly(status: status, metadata: customMetadata)
-    case .clientClosedServerIdle(let state):
+      return state.headers
+
+    case .clientClosedServerIdle(var state):
+      self.state = ._modifying
+      state.headers.formTrailersOnly(status: status, metadata: customMetadata)
       self.state = .clientClosedServerClosed(.init(previousState: state))
       self.state = .clientClosedServerClosed(.init(previousState: state))
-      return .trailersOnly(status: status, metadata: customMetadata)
+      return state.headers
+
     case .clientIdleServerIdle:
     case .clientIdleServerIdle:
       try self.invalidState(
       try self.invalidState(
         "Server can't send status if client is idle."
         "Server can't send status if client is idle."
@@ -1226,6 +1326,8 @@ extension GRPCStreamStateMachine {
       try self.invalidState(
       try self.invalidState(
         "Server can't send anything if closed."
         "Server can't send anything if closed."
       )
       )
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -1376,7 +1478,8 @@ extension GRPCStreamStateMachine {
         self.state = .clientClosedServerIdle(
         self.state = .clientClosedServerIdle(
           .init(
           .init(
             previousState: state,
             previousState: state,
-            compressionAlgorithm: outboundEncoding
+            compressionAlgorithm: outboundEncoding,
+            headers: headers
           )
           )
         )
         )
       } else {
       } else {
@@ -1396,7 +1499,8 @@ extension GRPCStreamStateMachine {
             outboundCompression: outboundEncoding,
             outboundCompression: outboundEncoding,
             framer: GRPCMessageFramer(),
             framer: GRPCMessageFramer(),
             decompressor: decompressor,
             decompressor: decompressor,
-            deframer: NIOSingleStepByteToMessageProcessor(deframer)
+            deframer: NIOSingleStepByteToMessageProcessor(deframer),
+            headers: headers
           )
           )
         )
         )
       }
       }
@@ -1409,6 +1513,9 @@ extension GRPCStreamStateMachine {
 
 
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
       try self.invalidState("Client can't have sent metadata if closed.")
       try self.invalidState("Client can't have sent metadata if closed.")
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -1423,6 +1530,7 @@ extension GRPCStreamStateMachine {
       try self.invalidState("Can't have received a message if client is idle.")
       try self.invalidState("Can't have received a message if client is idle.")
 
 
     case .clientOpenServerIdle(var state):
     case .clientOpenServerIdle(var state):
+      self.state = ._modifying
       // Deframer must be present on the server side, as we know the decompression
       // Deframer must be present on the server side, as we know the decompression
       // algorithm from the moment the client opens.
       // algorithm from the moment the client opens.
       try state.deframer!.process(buffer: buffer) { deframedMessage in
       try state.deframer!.process(buffer: buffer) { deframedMessage in
@@ -1438,6 +1546,7 @@ extension GRPCStreamStateMachine {
       action = .readInbound
       action = .readInbound
 
 
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       try state.deframer.process(buffer: buffer) { deframedMessage in
       try state.deframer.process(buffer: buffer) { deframedMessage in
         state.inboundMessageBuffer.append(deframedMessage)
         state.inboundMessageBuffer.append(deframedMessage)
       }
       }
@@ -1462,6 +1571,9 @@ extension GRPCStreamStateMachine {
 
 
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
       try self.invalidState("Client can't send a message if closed.")
       try self.invalidState("Client can't send a message if closed.")
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
 
 
     return action
     return action
@@ -1471,17 +1583,23 @@ extension GRPCStreamStateMachine {
     switch self.state {
     switch self.state {
     case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
     case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
       try self.invalidState("Server is not open yet.")
       try self.invalidState("Server is not open yet.")
+
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       let response = try state.framer.next(compressor: state.compressor)
       let response = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
       return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
       return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
         ?? .awaitMoreMessages
         ?? .awaitMoreMessages
+
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
+      self.state = ._modifying
       let response = try state.framer.next(compressor: state.compressor)
       let response = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerOpen(state)
       self.state = .clientClosedServerOpen(state)
       return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
       return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
         ?? .awaitMoreMessages
         ?? .awaitMoreMessages
+
     case .clientOpenServerClosed(var state):
     case .clientOpenServerClosed(var state):
+      self.state = ._modifying
       let response = try state.framer?.next(compressor: state.compressor)
       let response = try state.framer?.next(compressor: state.compressor)
       self.state = .clientOpenServerClosed(state)
       self.state = .clientOpenServerClosed(state)
       if let response {
       if let response {
@@ -1489,7 +1607,9 @@ extension GRPCStreamStateMachine {
       } else {
       } else {
         return .noMoreMessages
         return .noMoreMessages
       }
       }
+
     case .clientClosedServerClosed(var state):
     case .clientClosedServerClosed(var state):
+      self.state = ._modifying
       let response = try state.framer?.next(compressor: state.compressor)
       let response = try state.framer?.next(compressor: state.compressor)
       self.state = .clientClosedServerClosed(state)
       self.state = .clientClosedServerClosed(state)
       if let response {
       if let response {
@@ -1497,27 +1617,33 @@ extension GRPCStreamStateMachine {
       } else {
       } else {
         return .noMoreMessages
         return .noMoreMessages
       }
       }
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
   private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
   private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
     switch self.state {
     switch self.state {
     case .clientOpenServerIdle(var state):
     case .clientOpenServerIdle(var state):
+      self.state = ._modifying
       let request = state.inboundMessageBuffer.pop()
       let request = state.inboundMessageBuffer.pop()
       self.state = .clientOpenServerIdle(state)
       self.state = .clientOpenServerIdle(state)
       return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
       return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
 
 
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
+      self.state = ._modifying
       let request = state.inboundMessageBuffer.pop()
       let request = state.inboundMessageBuffer.pop()
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
       return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
       return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
 
 
     case .clientClosedServerIdle(var state):
     case .clientClosedServerIdle(var state):
+      self.state = ._modifying
       let request = state.inboundMessageBuffer.pop()
       let request = state.inboundMessageBuffer.pop()
       self.state = .clientClosedServerIdle(state)
       self.state = .clientClosedServerIdle(state)
       return request.map { .receiveMessage($0) } ?? .noMoreMessages
       return request.map { .receiveMessage($0) } ?? .noMoreMessages
 
 
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
+      self.state = ._modifying
       let request = state.inboundMessageBuffer.pop()
       let request = state.inboundMessageBuffer.pop()
       self.state = .clientClosedServerOpen(state)
       self.state = .clientClosedServerOpen(state)
       return request.map { .receiveMessage($0) } ?? .noMoreMessages
       return request.map { .receiveMessage($0) } ?? .noMoreMessages
@@ -1528,6 +1654,9 @@ extension GRPCStreamStateMachine {
 
 
     case .clientIdleServerIdle:
     case .clientIdleServerIdle:
       return .awaitMoreMessages
       return .awaitMoreMessages
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 
 
@@ -1553,6 +1682,9 @@ extension GRPCStreamStateMachine {
 
 
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
       return .doNothing
       return .doNothing
+
+    case ._modifying:
+      preconditionFailure()
     }
     }
   }
   }
 }
 }
@@ -1587,35 +1719,43 @@ internal enum GRPCHTTP2Keys: String {
 }
 }
 
 
 extension HPACKHeaders {
 extension HPACKHeaders {
-  internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
+  func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
     self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
     self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
       .map {
       .map {
         String($0)
         String($0)
       }
       }
   }
   }
 
 
-  internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
+  fileprivate mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
     self.add(name: key.rawValue, value: value)
     self.add(name: key.rawValue, value: value)
   }
   }
 
 
-  static func trailersOnly(code: Status.Code, message: String, metadata: Metadata = [:]) -> Self {
-    return .trailersOnly(status: Status(code: code, message: message), metadata: metadata)
+  fileprivate static func trailersOnly(code: Status.Code, message: String) -> Self {
+    var trailers = HPACKHeaders()
+    HPACKHeaders.formTrailers(
+      &trailers,
+      isTrailersOnly: true,
+      status: Status(code: code, message: message),
+      metadata: [:]
+    )
+    return trailers
   }
   }
 
 
-  static func trailersOnly(status: Status, metadata: Metadata = [:]) -> Self {
-    return .makeTrailers(isTrailersOnly: true, status: status, metadata: metadata)
+  fileprivate mutating func formTrailersOnly(status: Status, metadata: Metadata = [:]) {
+    Self.formTrailers(&self, isTrailersOnly: true, status: status, metadata: metadata)
   }
   }
 
 
-  static func trailers(status: Status, metadata: Metadata = [:]) -> Self {
-    return .makeTrailers(isTrailersOnly: false, status: status, metadata: metadata)
+  fileprivate mutating func formTrailers(status: Status, metadata: Metadata = [:]) {
+    Self.formTrailers(&self, isTrailersOnly: false, status: status, metadata: metadata)
   }
   }
 
 
-  private static func makeTrailers(
+  private static func formTrailers(
+    _ trailers: inout HPACKHeaders,
     isTrailersOnly: Bool,
     isTrailersOnly: Bool,
     status: Status,
     status: Status,
     metadata: Metadata
     metadata: Metadata
-  ) -> Self {
-    var trailers = HPACKHeaders()
+  ) {
+    trailers.removeAll(keepingCapacity: true)
 
 
     if isTrailersOnly {
     if isTrailersOnly {
       trailers.reserveCapacity(4 + metadata.count)
       trailers.reserveCapacity(4 + metadata.count)
@@ -1633,8 +1773,6 @@ extension HPACKHeaders {
     for (key, value) in metadata {
     for (key, value) in metadata {
       trailers.add(name: key, value: value.encoded())
       trailers.add(name: key, value: value.encoded())
     }
     }
-
-    return trailers
   }
   }
 }
 }