Browse Source

Avoid CoWs in grpc-web-to-http2 (#1227)

Motivation:

The gRPC Web to HTTP/2 codec attempted to avoid CoWs when mutating its
state.

Modifications:

- Avoid CoWs in the GRPCWebToHTTP2ServerCodec

Result:

Fewer CoWs.
George Barnett 4 years ago
parent
commit
e9ce37830a
1 changed files with 170 additions and 137 deletions
  1. 170 137
      Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift

+ 170 - 137
Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift

@@ -96,28 +96,16 @@ extension GRPCWebToHTTP2ServerCodec {
       self.scheme = scheme
     }
 
-    private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
-      var state: State = ._modifying
-      swap(&self.state, &state)
-      defer {
-        swap(&self.state, &state)
-      }
-      return body(&state)
-    }
-
     /// Process the inbound `HTTPServerRequestPart`.
     internal mutating func processInbound(
       serverRequestPart: HTTPServerRequestPart,
       allocator: ByteBufferAllocator
     ) -> Action {
-      let scheme = self.scheme
-      return self.withStateAvoidingCoWs { state in
-        state.processInbound(
-          serverRequestPart: serverRequestPart,
-          scheme: scheme,
-          allocator: allocator
-        )
-      }
+      return self.state.processInbound(
+        serverRequestPart: serverRequestPart,
+        scheme: self.scheme,
+        allocator: allocator
+      )
     }
 
     /// Process the outbound `HTTP2Frame.FramePayload`.
@@ -126,9 +114,11 @@ extension GRPCWebToHTTP2ServerCodec {
       promise: EventLoopPromise<Void>?,
       allocator: ByteBufferAllocator
     ) -> Action {
-      return self.withStateAvoidingCoWs { state in
-        state.processOutbound(framePayload: framePayload, promise: promise, allocator: allocator)
-      }
+      return self.state.processOutbound(
+        framePayload: framePayload,
+        promise: promise,
+        allocator: allocator
+      )
     }
 
     /// An action to take as a result of interaction with the state machine.
@@ -174,6 +164,23 @@ extension GRPCWebToHTTP2ServerCodec {
 
       /// Not a real state.
       case _modifying
+
+      private var isModifying: Bool {
+        switch self {
+        case ._modifying:
+          return true
+        case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed:
+          return false
+        }
+      }
+
+      private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
+        self = ._modifying
+        defer {
+          assert(!self.isModifying)
+        }
+        return body(&self)
+      }
     }
 
     fileprivate struct InboundState {
@@ -258,35 +265,37 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
   ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
     switch self {
     case .idle:
-      let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
-
-      // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
-      // allocate a second headers block to use the normalization provided by NIO HTTP/2.
-      //
-      // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
-      // extra copy.
-      var headers = HPACKHeaders()
-      headers.reserveCapacity(normalized.count + 4)
-      headers.add(name: ":path", value: head.uri)
-      headers.add(name: ":method", value: head.method.rawValue)
-      headers.add(name: ":scheme", value: scheme)
-      if let host = head.headers.first(name: "host") {
-        headers.add(name: ":authority", value: host)
-      }
-      headers.add(contentsOf: normalized)
+      return self.withStateAvoidingCoWs { state in
+        let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
+
+        // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
+        // allocate a second headers block to use the normalization provided by NIO HTTP/2.
+        //
+        // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
+        // extra copy.
+        var headers = HPACKHeaders()
+        headers.reserveCapacity(normalized.count + 4)
+        headers.add(name: ":path", value: head.uri)
+        headers.add(name: ":method", value: head.method.rawValue)
+        headers.add(name: ":scheme", value: scheme)
+        if let host = head.headers.first(name: "host") {
+          headers.add(name: ":authority", value: host)
+        }
+        headers.add(contentsOf: normalized)
 
-      // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
-      // that will be done at the HTTP/2 level.
-      let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
-      let isWebText = contentType == .some(.webTextProtobuf)
+        // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
+        // that will be done at the HTTP/2 level.
+        let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
+        let isWebText = contentType == .some(.webTextProtobuf)
 
-      let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
+        let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
 
-      self = .fullyOpen(
-        .init(isTextEncoded: isWebText, allocator: allocator),
-        .init(isTextEncoded: isWebText, closeConnection: closeConnection)
-      )
-      return .fireChannelRead(.headers(.init(headers: headers)))
+        state = .fullyOpen(
+          .init(isTextEncoded: isWebText, allocator: allocator),
+          .init(isTextEncoded: isWebText, closeConnection: closeConnection)
+        )
+        return .fireChannelRead(.headers(.init(headers: headers)))
+      }
 
     case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
       preconditionFailure("Invalid state: already received request head")
@@ -304,15 +313,19 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
       preconditionFailure("Invalid state: haven't received request head")
 
     case .fullyOpen(var inbound, let outbound):
-      let action = inbound.processInboundData(buffer: &buffer)
-      self = .fullyOpen(inbound, outbound)
-      return action
+      return self.withStateAvoidingCoWs { state in
+        let action = inbound.processInboundData(buffer: &buffer)
+        state = .fullyOpen(inbound, outbound)
+        return action
+      }
 
     case var .clientOpenServerClosed(inbound):
       // The server is already done, but it's not our place to drop the request.
-      let action = inbound.processInboundData(buffer: &buffer)
-      self = .clientOpenServerClosed(inbound)
-      return action
+      return self.withStateAvoidingCoWs { state in
+        let action = inbound.processInboundData(buffer: &buffer)
+        state = .clientOpenServerClosed(inbound)
+        return action
+      }
 
     case .clientClosedServerOpen:
       preconditionFailure("End of request stream already received")
@@ -330,24 +343,28 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
       preconditionFailure("Invalid state: haven't received request head")
 
     case let .fullyOpen(_, outbound):
-      // We're done with inbound state.
-      self = .clientClosedServerOpen(outbound)
+      return self.withStateAvoidingCoWs { state in
+        // We're done with inbound state.
+        state = .clientClosedServerOpen(outbound)
 
-      // Send an empty DATA frame with the end stream flag set.
-      let empty = allocator.buffer(capacity: 0)
-      return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
+        // Send an empty DATA frame with the end stream flag set.
+        let empty = allocator.buffer(capacity: 0)
+        return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
+      }
 
     case .clientClosedServerOpen:
       preconditionFailure("End of request stream already received")
 
     case .clientOpenServerClosed:
-      // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
-      // it's necessary to communicate to the other peers that the response is done.
-      self = .idle
+      return self.withStateAvoidingCoWs { state in
+        // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
+        // it's necessary to communicate to the other peers that the response is done.
+        state = .idle
 
-      // Send an empty DATA frame with the end stream flag set.
-      let empty = allocator.buffer(capacity: 0)
-      return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
+        // Send an empty DATA frame with the end stream flag set.
+        let empty = allocator.buffer(capacity: 0)
+        return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
+      }
 
     case ._modifying:
       preconditionFailure("Left in modifying state")
@@ -368,39 +385,43 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
       preconditionFailure("Invalid state: haven't received request head")
 
     case .fullyOpen(let inbound, var outbound):
-      // Double check these are trailers.
-      assert(outbound.responseHeadersSent)
+      return self.withStateAvoidingCoWs { state in
+        // Double check these are trailers.
+        assert(outbound.responseHeadersSent)
 
-      // We haven't seen the end of the request stream yet.
-      self = .clientOpenServerClosed(inbound)
+        // We haven't seen the end of the request stream yet.
+        state = .clientOpenServerClosed(inbound)
 
-      // Avoid CoW-ing the buffers.
-      let responseBuffers = outbound.responseBuffer
-      outbound.responseBuffer = nil
+        // Avoid CoW-ing the buffers.
+        let responseBuffers = outbound.responseBuffer
+        outbound.responseBuffer = nil
 
-      return self.processTrailers(
-        responseBuffers: responseBuffers,
-        trailers: trailers,
-        promise: promise,
-        allocator: allocator,
-        closeChannel: outbound.closeConnection
-      )
+        return Self.processTrailers(
+          responseBuffers: responseBuffers,
+          trailers: trailers,
+          promise: promise,
+          allocator: allocator,
+          closeChannel: outbound.closeConnection
+        )
+      }
 
     case var .clientClosedServerOpen(state):
-      // Client is closed and now so is the server.
-      self = .idle
+      return self.withStateAvoidingCoWs { nextState in
+        // Client is closed and now so is the server.
+        nextState = .idle
 
-      // Avoid CoW-ing the buffers.
-      let responseBuffers = state.responseBuffer
-      state.responseBuffer = nil
+        // Avoid CoW-ing the buffers.
+        let responseBuffers = state.responseBuffer
+        state.responseBuffer = nil
 
-      return self.processTrailers(
-        responseBuffers: responseBuffers,
-        trailers: trailers,
-        promise: promise,
-        allocator: allocator,
-        closeChannel: state.closeConnection
-      )
+        return Self.processTrailers(
+          responseBuffers: responseBuffers,
+          trailers: trailers,
+          promise: promise,
+          allocator: allocator,
+          closeChannel: state.closeConnection
+        )
+      }
 
     case .clientOpenServerClosed:
       preconditionFailure("Already seen end of response stream")
@@ -410,7 +431,7 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
     }
   }
 
-  private func processTrailers(
+  private static func processTrailers(
     responseBuffers: CircularBuffer<ByteBuffer>?,
     trailers: HPACKHeaders,
     promise: EventLoopPromise<Void>?,
@@ -447,40 +468,44 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
       preconditionFailure("Invalid state: haven't received request head")
 
     case let .fullyOpen(inbound, outbound):
-      // We still haven't seen the end of the request stream.
-      self = .clientOpenServerClosed(inbound)
+      return self.withStateAvoidingCoWs { state in
+        // We still haven't seen the end of the request stream.
+        state = .clientOpenServerClosed(inbound)
 
-      let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
-        hpackHeaders: trailers,
-        closeConnection: outbound.closeConnection
-      )
+        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
+          hpackHeaders: trailers,
+          closeConnection: outbound.closeConnection
+        )
 
-      return .write(
-        .init(
-          part: .head(head),
-          additionalPart: .end(nil),
-          promise: promise,
-          closeChannel: outbound.closeConnection
+        return .write(
+          .init(
+            part: .head(head),
+            additionalPart: .end(nil),
+            promise: promise,
+            closeChannel: outbound.closeConnection
+          )
         )
-      )
+      }
 
     case let .clientClosedServerOpen(outbound):
-      // We're done, back to idle.
-      self = .idle
+      return self.withStateAvoidingCoWs { state in
+        // We're done, back to idle.
+        state = .idle
 
-      let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
-        hpackHeaders: trailers,
-        closeConnection: outbound.closeConnection
-      )
+        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
+          hpackHeaders: trailers,
+          closeConnection: outbound.closeConnection
+        )
 
-      return .write(
-        .init(
-          part: .head(head),
-          additionalPart: .end(nil),
-          promise: promise,
-          closeChannel: outbound.closeConnection
+        return .write(
+          .init(
+            part: .head(head),
+            additionalPart: .end(nil),
+            promise: promise,
+            closeChannel: outbound.closeConnection
+          )
         )
-      )
+      }
 
     case .clientOpenServerClosed:
       preconditionFailure("Already seen end of response stream")
@@ -499,24 +524,28 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
       preconditionFailure("Invalid state: haven't received request head")
 
     case .fullyOpen(let inbound, var outbound):
-      outbound.responseHeadersSent = true
-      self = .fullyOpen(inbound, outbound)
+      return self.withStateAvoidingCoWs { state in
+        outbound.responseHeadersSent = true
+        state = .fullyOpen(inbound, outbound)
 
-      let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
-        hpackHeaders: headers,
-        closeConnection: outbound.closeConnection
-      )
-      return .write(.init(part: .head(head), promise: promise, closeChannel: false))
+        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
+          hpackHeaders: headers,
+          closeConnection: outbound.closeConnection
+        )
+        return .write(.init(part: .head(head), promise: promise, closeChannel: false))
+      }
 
     case var .clientClosedServerOpen(outbound):
-      outbound.responseHeadersSent = true
-      self = .clientClosedServerOpen(outbound)
+      return self.withStateAvoidingCoWs { state in
+        outbound.responseHeadersSent = true
+        state = .clientClosedServerOpen(outbound)
 
-      let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
-        hpackHeaders: headers,
-        closeConnection: outbound.closeConnection
-      )
-      return .write(.init(part: .head(head), promise: promise, closeChannel: false))
+        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
+          hpackHeaders: headers,
+          closeConnection: outbound.closeConnection
+        )
+        return .write(.init(part: .head(head), promise: promise, closeChannel: false))
+      }
 
     case .clientOpenServerClosed:
       preconditionFailure("Already seen end of response stream")
@@ -558,7 +587,7 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
     }
   }
 
-  private func processResponseData(
+  private static func processResponseData(
     _ payload: HTTP2Frame.FramePayload.Data,
     promise: EventLoopPromise<Void>?,
     state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
@@ -590,14 +619,18 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
       preconditionFailure("Invalid state: haven't received request head")
 
     case .fullyOpen(let inbound, var outbound):
-      let action = self.processResponseData(payload, promise: promise, state: &outbound)
-      self = .fullyOpen(inbound, outbound)
-      return action
+      return self.withStateAvoidingCoWs { state in
+        let action = Self.processResponseData(payload, promise: promise, state: &outbound)
+        state = .fullyOpen(inbound, outbound)
+        return action
+      }
 
     case var .clientClosedServerOpen(outbound):
-      let action = self.processResponseData(payload, promise: promise, state: &outbound)
-      self = .clientClosedServerOpen(outbound)
-      return action
+      return self.withStateAvoidingCoWs { state in
+        let action = Self.processResponseData(payload, promise: promise, state: &outbound)
+        state = .clientClosedServerOpen(outbound)
+        return action
+      }
 
     case .clientOpenServerClosed:
       return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))