Browse Source

Remove a bunch of (unspecialized) generics (#1073)

Motivation:

The state machine code in the BaseCallHandler works returns an 'Action'
to perform as a result of prodding the state machine. These actions are
generic over the request and response type of the RPC as the often
include instructions like 'forward this request to the interceptors' and
so on. When these generics are unspecialized we burn a bunch of
instructions in the runtime, and in many cases they can be represented
without generics entirely.

Modifications:

- Shuffle some of the state machine code around in BaseCallHandler such
  that neither State or Action are generic

Result:

- Instructions executed in the unary server benchmark decreases by ~20%
George Barnett 5 years ago
parent
commit
a250f3f0ab
1 changed files with 173 additions and 207 deletions
  1. 173 207
      Sources/GRPC/CallHandlers/_BaseCallHandler.swift

+ 173 - 207
Sources/GRPC/CallHandlers/_BaseCallHandler.swift

@@ -51,6 +51,9 @@ public class _BaseCallHandler<
   /// A response serializer.
   private let responseSerializer: ResponseSerializer
 
+  /// The `ChannelHandlerContext`.
+  private var context: ChannelHandlerContext?
+
   /// The event loop this call is being handled on.
   internal var eventLoop: EventLoop {
     return self.callHandlerContext.eventLoop
@@ -97,11 +100,13 @@ public class _BaseCallHandler<
   // MARK: - ChannelHandler
 
   public func handlerAdded(context: ChannelHandlerContext) {
-    self.act(on: self.state.handlerAdded(context: context))
+    self.state.handlerAdded()
+    self.context = context
   }
 
   public func handlerRemoved(context: ChannelHandlerContext) {
     self.pipeline = nil
+    self.context = nil
   }
 
   public func channelInactive(context: ChannelHandlerContext) {
@@ -110,7 +115,9 @@ public class _BaseCallHandler<
   }
 
   public func errorCaught(context: ChannelHandlerContext, error: Error) {
-    self.act(on: self.state.errorCaught(error))
+    if self.state.errorCaught() {
+      self.observeLibraryError(error)
+    }
   }
 
   public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@@ -118,17 +125,26 @@ public class _BaseCallHandler<
 
     switch part {
     case let .metadata(headers):
-      self.act(on: self.state.channelRead(.metadata(headers)))
+      if self.state.channelReadMetadata() {
+        self.receiveRequestPartInInterceptors(.metadata(headers))
+      }
+
     case let .message(buffer):
-      do {
-        let request = try self.requestDeserializer.deserialize(byteBuffer: buffer)
-        self.act(on: self.state.channelRead(.message(request)))
-      } catch {
-        self.errorCaught(context: context, error: error)
+      if self.state.channelReadMessage() {
+        do {
+          let request = try self.requestDeserializer.deserialize(byteBuffer: buffer)
+          self.receiveRequestPartInInterceptors(.message(request))
+        } catch {
+          self.errorCaught(context: context, error: error)
+        }
       }
+
     case .end:
-      self.act(on: self.state.channelRead(.end))
+      if self.state.channelReadEnd() {
+        self.receiveRequestPartInInterceptors(.end)
+      }
     }
+
     // We're the last handler. We don't have anything to forward.
   }
 
@@ -158,7 +174,22 @@ public class _BaseCallHandler<
     _ part: GRPCServerResponsePart<ResponsePayload>,
     promise: EventLoopPromise<Void>?
   ) {
-    self.act(on: self.state.sendResponsePartFromObserver(part, promise: promise))
+    let forward: Bool
+
+    switch part {
+    case .metadata:
+      forward = self.state.sendResponsePartFromObserver(.metadata)
+    case .message:
+      forward = self.state.sendResponsePartFromObserver(.message)
+    case .end:
+      forward = self.state.sendResponsePartFromObserver(.end)
+    }
+
+    if forward {
+      self.sendResponsePartToInterceptors(part, promise: promise)
+    } else {
+      promise?.fail(GRPCError.AlreadyComplete())
+    }
   }
 
   /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
@@ -236,7 +267,20 @@ extension _BaseCallHandler {
   /// Receive a request part from the interceptors pipeline to forward to the event observer.
   /// - Parameter part: The request part to forward.
   private func receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
-    self.act(on: self.state.receiveRequestPartFromInterceptors(part))
+    let forward: Bool
+
+    switch part {
+    case .metadata:
+      forward = self.state.receiveRequestPartFromInterceptors(.metadata)
+    case .message:
+      forward = self.state.receiveRequestPartFromInterceptors(.message)
+    case .end:
+      forward = self.state.receiveRequestPartFromInterceptors(.end)
+    }
+
+    if forward {
+      self.receiveRequestPartInObserver(part)
+    }
   }
 
   /// Send a response part via the `Channel`. Called once the response part has traversed the
@@ -248,26 +292,45 @@ extension _BaseCallHandler {
     _ part: GRPCServerResponsePart<ResponsePayload>,
     promise: EventLoopPromise<Void>?
   ) {
-    self.act(on: self.state.sendResponsePartFromInterceptors(part, promise: promise))
+    let forward: Bool
+
+    switch part {
+    case .metadata:
+      forward = self.state.sendResponsePartFromInterceptors(.metadata)
+    case .message:
+      forward = self.state.sendResponsePartFromInterceptors(.message)
+    case .end:
+      forward = self.state.sendResponsePartFromInterceptors(.end)
+    }
+
+    if forward, let context = self.context {
+      self.writeResponsePartToChannel(context: context, part: part, promise: promise)
+    } else {
+      promise?.fail(GRPCError.AlreadyComplete())
+    }
   }
 }
 
 // MARK: - State
 
-extension _BaseCallHandler {
-  fileprivate enum State {
-    /// Idle. We're waiting to be added to a pipeline.
-    case idle
+private enum State {
+  /// Idle. We're waiting to be added to a pipeline.
+  case idle
 
-    /// We're in a pipeline and receiving from the client.
-    case active(ActiveState)
+  /// We're in a pipeline and receiving from the client.
+  case active(ActiveState)
 
-    /// We're done. This state is terminal, all actions are ignored.
-    case closed
-  }
+  /// We're done. This state is terminal, all actions are ignored.
+  case closed
+}
+
+private enum RPCStreamPart {
+  case metadata
+  case message
+  case end
 }
 
-extension _BaseCallHandler.State {
+extension State {
   /// The state of the request and response streams.
   ///
   /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
@@ -280,237 +343,190 @@ extension _BaseCallHandler.State {
     case requestClosedResponseOpen
     case requestClosedResponseClosed
 
-    enum Filter {
-      case allow
-      case drop
-    }
-
-    mutating func receiveHeaders() -> Filter {
+    mutating func receiveHeaders() -> Bool {
       switch self {
       case .requestIdleResponseIdle:
         self = .requestOpenResponseIdle
-        return .allow
+        return true
 
       case .requestOpenResponseIdle,
            .requestOpenResponseOpen,
            .requestClosedResponseIdle,
            .requestClosedResponseOpen,
            .requestClosedResponseClosed:
-        return .drop
+        return false
       }
     }
 
-    func receiveMessage() -> Filter {
+    func receiveMessage() -> Bool {
       switch self {
       case .requestOpenResponseIdle,
            .requestOpenResponseOpen:
-        return .allow
+        return true
 
       case .requestIdleResponseIdle,
            .requestClosedResponseIdle,
            .requestClosedResponseOpen,
            .requestClosedResponseClosed:
-        return .drop
+        return false
       }
     }
 
-    mutating func receiveEnd() -> Filter {
+    mutating func receiveEnd() -> Bool {
       switch self {
       case .requestOpenResponseIdle:
         self = .requestClosedResponseIdle
-        return .allow
+        return true
 
       case .requestOpenResponseOpen:
         self = .requestClosedResponseOpen
-        return .allow
+        return true
 
       case .requestIdleResponseIdle,
            .requestClosedResponseIdle,
            .requestClosedResponseOpen,
            .requestClosedResponseClosed:
-        return .drop
+        return false
       }
     }
 
-    mutating func sendHeaders() -> Filter {
+    mutating func sendHeaders() -> Bool {
       switch self {
       case .requestOpenResponseIdle:
         self = .requestOpenResponseOpen
-        return .allow
+        return true
 
       case .requestClosedResponseIdle:
         self = .requestClosedResponseOpen
-        return .allow
+        return true
 
       case .requestIdleResponseIdle,
            .requestOpenResponseOpen,
            .requestClosedResponseOpen,
            .requestClosedResponseClosed:
-        return .drop
+        return false
       }
     }
 
-    func sendMessage() -> Filter {
+    func sendMessage() -> Bool {
       switch self {
       case .requestOpenResponseOpen,
            .requestClosedResponseOpen:
-        return .allow
+        return true
 
       case .requestIdleResponseIdle,
            .requestOpenResponseIdle,
            .requestClosedResponseIdle,
            .requestClosedResponseClosed:
-        return .drop
+        return false
       }
     }
 
-    mutating func sendEnd() -> Filter {
+    mutating func sendEnd() -> Bool {
       switch self {
       case .requestIdleResponseIdle:
-        return .drop
+        return false
 
       case .requestOpenResponseIdle,
            .requestOpenResponseOpen,
            .requestClosedResponseIdle,
            .requestClosedResponseOpen:
         self = .requestClosedResponseClosed
-        return .allow
+        return true
 
       case .requestClosedResponseClosed:
-        return .drop
+        return false
       }
     }
   }
 
   fileprivate struct ActiveState {
-    var context: ChannelHandlerContext
-
     /// The stream state between the 'Channel' and interceptor pipeline.
     var channelStreamState: StreamState
 
     /// The stream state between the interceptor pipeline and event observer.
     var observerStreamState: StreamState
 
-    init(context: ChannelHandlerContext) {
-      self.context = context
+    init() {
       self.channelStreamState = .requestIdleResponseIdle
       self.observerStreamState = .requestIdleResponseIdle
     }
   }
 }
 
-extension _BaseCallHandler.State {
-  fileprivate enum Action {
-    /// Do nothing.
-    case none
-
-    /// Receive the request part in the interceptor pipeline.
-    case receiveRequestPartInInterceptors(GRPCServerRequestPart<_BaseCallHandler.RequestPayload>)
-
-    /// Receive the request part in the observer.
-    case receiveRequestPartInObserver(GRPCServerRequestPart<_BaseCallHandler.RequestPayload>)
-
-    /// Receive an error in the observer.
-    case receiveLibraryErrorInObserver(Error)
-
-    /// Send a response part to the interceptor pipeline.
-    case sendResponsePartToInterceptors(
-      GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
-      EventLoopPromise<Void>?
-    )
-
-    /// Write the response part to the `Channel`.
-    case writeResponsePartToChannel(
-      ChannelHandlerContext,
-      GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
-      promise: EventLoopPromise<Void>?
-    )
-
-    /// Complete the promise with the result.
-    case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
-
-    /// Perform multiple actions.
-    indirect case multiple([Action])
-  }
-}
-
-extension _BaseCallHandler.State {
+extension State {
   /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
   /// state. We only expect this to be called once.
-  internal mutating func handlerAdded(context: ChannelHandlerContext) -> Action {
+  internal mutating func handlerAdded() {
     switch self {
     case .idle:
       // This is the only way we can become active.
-      self = .active(.init(context: context))
-      return .none
-
+      self = .active(.init())
     case .active:
       preconditionFailure("Invalid state: already active")
-
     case .closed:
-      return .none
+      ()
     }
   }
 
   /// Received an error from the `Channel`.
-  internal mutating func errorCaught(_ error: Error) -> Action {
+  /// - Returns: True if the error should be forwarded to the error observer, or false if it should
+  ///   be dropped.
+  internal func errorCaught() -> Bool {
     switch self {
     case .active:
-      return .receiveLibraryErrorInObserver(error)
-
+      return true
     case .idle, .closed:
-      return .none
+      return false
     }
   }
 
-  /// Receive a request part from the `Channel`. If we're active we just forward these through the
-  /// pipeline. We validate at the other end.
-  internal mutating func channelRead(
-    _ requestPart: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
-  ) -> Action {
+  /// Receive a metadata part from the `Channel`.
+  /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
+  internal mutating func channelReadMetadata() -> Bool {
     switch self {
     case .idle:
       preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
-
     case var .active(state):
-      // Avoid CoW-ing state.
-      self = .idle
-
-      let filter: StreamState.Filter
-      let part: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
-
-      switch requestPart {
-      case let .metadata(headers):
-        filter = state.channelStreamState.receiveHeaders()
-        part = .metadata(headers)
-      case let .message(message):
-        filter = state.channelStreamState.receiveMessage()
-        part = .message(message)
-      case .end:
-        filter = state.channelStreamState.receiveEnd()
-        part = .end
-      }
-
-      // Restore state.
+      let allow = state.channelStreamState.receiveHeaders()
       self = .active(state)
+      return allow
+    case .closed:
+      return false
+    }
+  }
 
-      switch filter {
-      case .allow:
-        return .receiveRequestPartInInterceptors(part)
-      case .drop:
-        return .none
-      }
+  /// Receive a message part from the `Channel`.
+  /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
+  internal func channelReadMessage() -> Bool {
+    switch self {
+    case .idle:
+      preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
+    case let .active(state):
+      return state.channelStreamState.receiveMessage()
+    case .closed:
+      return false
+    }
+  }
 
+  /// Receive an end-stream part from the `Channel`.
+  /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
+  internal mutating func channelReadEnd() -> Bool {
+    switch self {
+    case .idle:
+      preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
+    case var .active(state):
+      let allow = state.channelStreamState.receiveEnd()
+      self = .active(state)
+      return allow
     case .closed:
-      return .none
+      return false
     }
   }
 
   /// Send a response part from the observer to the interceptors.
-  internal mutating func sendResponsePartFromObserver(
-    _ part: GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
-    promise: EventLoopPromise<Void>?
-  ) -> Action {
+  /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
+  internal mutating func sendResponsePartFromObserver(_ part: RPCStreamPart) -> Bool {
     switch self {
     case .idle:
       preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
@@ -519,37 +535,29 @@ extension _BaseCallHandler.State {
       // Avoid CoW-ing 'state'.
       self = .idle
 
-      let filter: StreamState.Filter
+      let allow: Bool
 
       switch part {
       case .metadata:
-        filter = state.observerStreamState.sendHeaders()
+        allow = state.observerStreamState.sendHeaders()
       case .message:
-        filter = state.observerStreamState.sendMessage()
+        allow = state.observerStreamState.sendMessage()
       case .end:
-        filter = state.observerStreamState.sendEnd()
+        allow = state.observerStreamState.sendEnd()
       }
 
       // Restore the state.
       self = .active(state)
-
-      switch filter {
-      case .allow:
-        return .sendResponsePartToInterceptors(part, promise)
-      case .drop:
-        return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
-      }
+      return allow
 
     case .closed:
-      return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
+      return false
     }
   }
 
   /// Send a response part from the interceptors to the `Channel`.
-  internal mutating func sendResponsePartFromInterceptors(
-    _ part: GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
-    promise: EventLoopPromise<Void>?
-  ) -> Action {
+  /// - Returns: True if the part should be forwarded to the `Channel`, false otherwise.
+  internal mutating func sendResponsePartFromInterceptors(_ part: RPCStreamPart) -> Bool {
     switch self {
     case .idle:
       preconditionFailure("Invalid state: can't send response on idle call")
@@ -558,38 +566,32 @@ extension _BaseCallHandler.State {
       // Avoid CoW-ing 'state'.
       self = .idle
 
-      let filter: StreamState.Filter
+      let allow: Bool
 
       switch part {
       case .metadata:
-        filter = state.channelStreamState.sendHeaders()
+        allow = state.channelStreamState.sendHeaders()
         self = .active(state)
       case .message:
-        filter = state.channelStreamState.sendMessage()
+        allow = state.channelStreamState.sendMessage()
         self = .active(state)
       case .end:
-        filter = state.channelStreamState.sendEnd()
+        allow = state.channelStreamState.sendEnd()
         // We're sending end, we're no longer active.
         self = .closed
       }
 
-      switch filter {
-      case .allow:
-        return .writeResponsePartToChannel(state.context, part, promise: promise)
-      case .drop:
-        return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
-      }
+      return allow
 
     case .closed:
-      // We're already closed, fail any promise.
-      return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
+      // We're already closed.
+      return false
     }
   }
 
   /// A request part has traversed the interceptor pipeline, now send it to the observer.
-  internal mutating func receiveRequestPartFromInterceptors(
-    _ part: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
-  ) -> Action {
+  /// - Returns: True if the part should be forwarded to the observer, false otherwise.
+  internal mutating func receiveRequestPartFromInterceptors(_ part: RPCStreamPart) -> Bool {
     switch self {
     case .idle:
       preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
@@ -598,31 +600,25 @@ extension _BaseCallHandler.State {
       // Avoid CoW-ing `state`.
       self = .idle
 
-      let filter: StreamState.Filter
+      let allow: Bool
 
       // Does the active state allow us to send this?
       switch part {
       case .metadata:
-        filter = state.observerStreamState.receiveHeaders()
+        allow = state.observerStreamState.receiveHeaders()
       case .message:
-        filter = state.observerStreamState.receiveMessage()
+        allow = state.observerStreamState.receiveMessage()
       case .end:
-        filter = state.observerStreamState.receiveEnd()
+        allow = state.observerStreamState.receiveEnd()
       }
 
       // Put `state` back.
       self = .active(state)
-
-      switch filter {
-      case .allow:
-        return .receiveRequestPartInObserver(part)
-      case .drop:
-        return .none
-      }
+      return allow
 
     case .closed:
       // We're closed, just ignore this.
-      return .none
+      return false
     }
   }
 }
@@ -630,36 +626,6 @@ extension _BaseCallHandler.State {
 // MARK: State Actions
 
 extension _BaseCallHandler {
-  private func act(on action: State.Action) {
-    switch action {
-    case .none:
-      ()
-
-    case let .receiveRequestPartInInterceptors(part):
-      self.receiveRequestPartInInterceptors(part)
-
-    case let .receiveRequestPartInObserver(part):
-      self.receiveRequestPartInObserver(part)
-
-    case let .receiveLibraryErrorInObserver(error):
-      self.observeLibraryError(error)
-
-    case let .sendResponsePartToInterceptors(part, promise):
-      self.sendResponsePartToInterceptors(part, promise: promise)
-
-    case let .writeResponsePartToChannel(context, part, promise):
-      self.writeResponsePartToChannel(context: context, part: part, promise: promise)
-
-    case let .completePromise(promise, result):
-      promise?.completeWith(result)
-
-    case let .multiple(actions):
-      for action in actions {
-        self.act(on: action)
-      }
-    }
-  }
-
   /// Receives a request part in the interceptor pipeline.
   private func receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
     self.pipeline?.receive(part)