Просмотр исходного кода

Avoid copying client interceptor contexts (#1092)

Motivation:

The client interceptor pipeline holds an array of interceptor contexts.
Each context is a struct and also holds a reference to the pipeline.
Each time the context asks the pipeline for the next inbound/outbound
context we incur a copy of the context which is surprisingly costly due
to retain and release traffic for the pipeline ref as well as the cost
of initializing the unspecialized contexts.

Modifications:

- interceptor contexts now ask the pipeline to perform an operation for
  them rather than asking for a context on which to perform the
  operation, this avoids copying contexts
- as the pipeline is now responsible for performing the operations a
  further optimisation is opened up: head and tail interceptors and
  contexts are no longer required, the same action can be done directly
  by the pipeline

Result:

A 4.8% reduction in instructions in the unary_10k_small_requests
benchmark.
George Barnett 4 лет назад
Родитель
Сommit
befa21af1f

+ 2 - 0
Sources/GRPC/ClientCalls/CallDetails.swift

@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 
+@usableFromInline
 internal struct CallDetails {
   /// The type of the RPC, e.g. unary.
   internal var type: GRPCCallType
@@ -28,5 +29,6 @@ internal struct CallDetails {
   internal var scheme: String
 
   /// Call options provided by the user.
+  @usableFromInline
   internal var options: CallOptions
 }

+ 22 - 62
Sources/GRPC/Interceptor/ClientInterceptorContext.swift

@@ -19,77 +19,72 @@ import NIO
 public struct ClientInterceptorContext<Request, Response> {
   /// The interceptor this context is for.
   @usableFromInline
-  internal let interceptor: AnyClientInterceptor<Request, Response>
+  internal let interceptor: ClientInterceptor<Request, Response>
 
   /// The pipeline this context is associated with.
-  private let pipeline: ClientInterceptorPipeline<Request, Response>
+  @usableFromInline
+  internal let _pipeline: ClientInterceptorPipeline<Request, Response>
 
   /// The index of this context's interceptor within the pipeline.
-  private let index: Int
-
-  // The next context in the inbound direction, if one exists.
-  private var nextInbound: ClientInterceptorContext<Request, Response>? {
-    return self.pipeline.nextInboundContext(forIndex: self.index)
-  }
-
-  // The next context in the outbound direction, if one exists.
-  private var nextOutbound: ClientInterceptorContext<Request, Response>? {
-    return self.pipeline.nextOutboundContext(forIndex: self.index)
-  }
+  @usableFromInline
+  internal let _index: Int
 
   /// The `EventLoop` this interceptor pipeline is being executed on.
   public var eventLoop: EventLoop {
-    return self.pipeline.eventLoop
+    return self._pipeline.eventLoop
   }
 
   /// A logger.
   public var logger: Logger {
-    return self.pipeline.logger
+    return self._pipeline.logger
   }
 
   /// The type of the RPC, e.g. "unary".
   public var type: GRPCCallType {
-    return self.pipeline.details.type
+    return self._pipeline.details.type
   }
 
   /// The path of the RPC in the format "/Service/Method", e.g. "/echo.Echo/Get".
   public var path: String {
-    return self.pipeline.details.path
+    return self._pipeline.details.path
   }
 
   /// The options used to invoke the call.
   public var options: CallOptions {
-    return self.pipeline.details.options
+    return self._pipeline.details.options
   }
 
   /// Construct a `ClientInterceptorContext` for the interceptor at the given index within in
   /// interceptor pipeline.
+  @inlinable
   internal init(
-    for interceptor: AnyClientInterceptor<Request, Response>,
+    for interceptor: ClientInterceptor<Request, Response>,
     atIndex index: Int,
     in pipeline: ClientInterceptorPipeline<Request, Response>
   ) {
     self.interceptor = interceptor
-    self.pipeline = pipeline
-    self.index = index
+    self._pipeline = pipeline
+    self._index = index
   }
 
   /// Forwards the response part to the next inbound interceptor in the pipeline, if there is one.
   ///
   /// - Parameter part: The response part to forward.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   public func receive(_ part: GRPCClientResponsePart<Response>) {
     self.eventLoop.assertInEventLoop()
-    self.nextInbound?.invokeReceive(part)
+    self._pipeline.invokeReceive(part, fromContextAtIndex: self._index)
   }
 
   /// Forwards the error to the next inbound interceptor in the pipeline, if there is one.
   ///
   /// - Parameter error: The error to forward.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   public func errorCaught(_ error: Error) {
     self.eventLoop.assertInEventLoop()
-    self.nextInbound?.invokeErrorCaught(error)
+    self._pipeline.invokeErrorCaught(error, fromContextAtIndex: self._index)
   }
 
   /// Forwards the request part to the next outbound interceptor in the pipeline, if there is one.
@@ -98,57 +93,22 @@ public struct ClientInterceptorContext<Request, Response> {
   ///   - part: The request part to forward.
   ///   - promise: The promise the complete when the part has been written.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   public func send(
     _ part: GRPCClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?
   ) {
     self.eventLoop.assertInEventLoop()
-
-    if let outbound = self.nextOutbound {
-      outbound.invokeSend(part, promise: promise)
-    } else {
-      promise?.fail(GRPCStatus(code: .unavailable, message: "The RPC has already completed"))
-    }
+    self._pipeline.invokeSend(part, promise: promise, fromContextAtIndex: self._index)
   }
 
   /// Forwards a request to cancel the RPC to the next outbound interceptor in the pipeline.
   ///
   /// - Parameter promise: The promise to complete with the outcome of the cancellation request.
   /// - Important: This *must* to be called from the `eventLoop`.
-  public func cancel(promise: EventLoopPromise<Void>?) {
-    self.eventLoop.assertInEventLoop()
-
-    if let outbound = self.nextOutbound {
-      outbound.invokeCancel(promise: promise)
-    } else {
-      // The RPC has already been completed. Should cancellation fail?
-      promise?.succeed(())
-    }
-  }
-}
-
-extension ClientInterceptorContext {
-  internal func invokeReceive(_ part: GRPCClientResponsePart<Response>) {
-    self.eventLoop.assertInEventLoop()
-    self.interceptor.receive(part, context: self)
-  }
-
   @inlinable
-  internal func invokeSend(
-    _ part: GRPCClientRequestPart<Request>,
-    promise: EventLoopPromise<Void>?
-  ) {
-    self.eventLoop.assertInEventLoop()
-    self.interceptor.send(part, promise: promise, context: self)
-  }
-
-  internal func invokeCancel(promise: EventLoopPromise<Void>?) {
-    self.eventLoop.assertInEventLoop()
-    self.interceptor.cancel(promise: promise, context: self)
-  }
-
-  internal func invokeErrorCaught(_ error: Error) {
+  public func cancel(promise: EventLoopPromise<Void>?) {
     self.eventLoop.assertInEventLoop()
-    self.interceptor.errorCaught(error, context: self)
+    self._pipeline.invokeCancel(promise: promise, fromContextAtIndex: self._index)
   }
 }

+ 322 - 126
Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

@@ -69,59 +69,69 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   internal let eventLoop: EventLoop
 
   /// The details of the call.
+  @usableFromInline
   internal let details: CallDetails
 
   /// A task for closing the RPC in case of a timeout.
-  private var scheduledClose: Scheduled<Void>?
-
-  /// The contexts associated with the interceptors stored in this pipeline. Context will be removed
-  /// once the RPC has completed. Contexts are ordered from outbound to inbound, that is, the tail
-  /// is first and the head is last.
-  private var contexts: InterceptorContextList<ClientInterceptorContext<Request, Response>>?
-
-  /// Returns the next context in the outbound direction for the context at the given index, if one
-  /// exists.
-  /// - Parameter index: The index of the `ClientInterceptorContext` which is requesting the next
-  ///   outbound context.
-  /// - Returns: The `ClientInterceptorContext` or `nil` if one does not exist.
-  internal func nextOutboundContext(
-    forIndex index: Int
-  ) -> ClientInterceptorContext<Request, Response>? {
-    return self.context(atIndex: index + 1)
-  }
+  @usableFromInline
+  internal var _scheduledClose: Scheduled<Void>?
 
-  /// Returns the next context in the inbound direction for the context at the given index, if one
-  /// exists.
-  /// - Parameter index: The index of the `ClientInterceptorContext` which is requesting the next
-  ///   inbound context.
-  /// - Returns: The `ClientInterceptorContext` or `nil` if one does not exist.
-  internal func nextInboundContext(
-    forIndex index: Int
-  ) -> ClientInterceptorContext<Request, Response>? {
-    return self.context(atIndex: index - 1)
-  }
+  @usableFromInline
+  internal let _errorDelegate: ClientErrorDelegate?
 
-  /// Returns the context for the given index, if one exists.
-  /// - Parameter index: The index of the `ClientInterceptorContext` to return.
-  /// - Returns: The `ClientInterceptorContext` or `nil` if one does not exist for the given index.
-  private func context(atIndex index: Int) -> ClientInterceptorContext<Request, Response>? {
-    return self.contexts?[checked: index]
-  }
+  @usableFromInline
+  internal let _onError: (Error) -> Void
 
-  /// The context closest to the `NIO.Channel`, i.e. where inbound events originate. This will be
-  /// `nil` once the RPC has completed.
   @usableFromInline
-  internal var _head: ClientInterceptorContext<Request, Response>? {
-    return self.contexts?.last
-  }
+  internal let _onCancel: (EventLoopPromise<Void>?) -> Void
+
+  @usableFromInline
+  internal let _onRequestPart: (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
 
-  /// The context closest to the application, i.e. where outbound events originate. This will be
-  /// `nil` once the RPC has completed.
   @usableFromInline
-  internal var _tail: ClientInterceptorContext<Request, Response>? {
-    return self.contexts?.first
+  internal let _onResponsePart: (GRPCClientResponsePart<Response>) -> Void
+
+  /// The index after the last user interceptor context index. (i.e. `_userContexts.endIndex`).
+  @usableFromInline
+  internal let _headIndex: Int
+
+  /// The index before the first user interceptor context index (always -1).
+  @usableFromInline
+  internal let _tailIndex: Int
+
+  @usableFromInline
+  internal var _userContexts: [ClientInterceptorContext<Request, Response>]
+
+  /// Whether the interceptor pipeline is still open. It becomes closed after an 'end' response
+  /// part has traversed the pipeline.
+  @usableFromInline
+  internal var _isOpen = true
+
+  /// The index of the next context on the inbound side of the context at the given index.
+  @inlinable
+  internal func _nextInboundIndex(after index: Int) -> Int {
+    // Unchecked arithmetic is okay here: our smallest inbound index is '_tailIndex' but we will
+    // never ask for the inbound index after the tail.
+    assert(self._indexIsValid(index))
+    return index &- 1
+  }
+
+  /// The index of the next context on the outbound side of the context at the given index.
+  @inlinable
+  internal func _nextOutboundIndex(after index: Int) -> Int {
+    // Unchecked arithmetic is okay here: our greatest outbound index is '_headIndex' but we will
+    // never ask for the outbound index after the head.
+    assert(self._indexIsValid(index))
+    return index &+ 1
+  }
+
+  /// Returns true of the index is in the range `_tailIndex ... _headIndex`.
+  @inlinable
+  internal func _indexIsValid(_ index: Int) -> Bool {
+    return index >= self._tailIndex && index <= self._headIndex
   }
 
+  @inlinable
   internal init(
     eventLoop: EventLoop,
     details: CallDetails,
@@ -134,17 +144,28 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ) {
     self.eventLoop = eventLoop
     self.details = details
-    self.contexts = InterceptorContextList(
-      for: self,
-      interceptors: interceptors,
-      errorDelegate: errorDelegate,
-      onError: onError,
-      onCancel: onCancel,
-      onRequestPart: onRequestPart,
-      onResponsePart: onResponsePart
-    )
 
-    self.setupDeadline()
+    self._errorDelegate = errorDelegate
+    self._onError = onError
+    self._onCancel = onCancel
+    self._onRequestPart = onRequestPart
+    self._onResponsePart = onResponsePart
+
+    // The tail is before the interceptors.
+    self._tailIndex = -1
+    // The head is after the interceptors.
+    self._headIndex = interceptors.endIndex
+
+    // Make some contexts.
+    self._userContexts = []
+    self._userContexts.reserveCapacity(interceptors.count)
+
+    for index in 0 ..< interceptors.count {
+      let context = ClientInterceptorContext(for: interceptors[index], atIndex: index, in: self)
+      self._userContexts.append(context)
+    }
+
+    self._setupDeadline()
   }
 
   /// Emit a response part message into the interceptor pipeline.
@@ -153,9 +174,55 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ///
   /// - Parameter part: The part to emit into the pipeline.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   internal func receive(_ part: GRPCClientResponsePart<Response>) {
+    self.invokeReceive(part, fromContextAtIndex: self._headIndex)
+  }
+
+  /// Invoke receive on the appropriate context when called from the context at the given index.
+  @inlinable
+  internal func invokeReceive(
+    _ part: GRPCClientResponsePart<Response>,
+    fromContextAtIndex index: Int
+  ) {
+    self._invokeReceive(part, onContextAtIndex: self._nextInboundIndex(after: index))
+  }
+
+  /// Invoke receive on the context at the given index, if doing so is safe.
+  @inlinable
+  internal func _invokeReceive(
+    _ part: GRPCClientResponsePart<Response>,
+    onContextAtIndex index: Int
+  ) {
     self.eventLoop.assertInEventLoop()
-    self._head?.invokeReceive(part)
+    assert(self._indexIsValid(index))
+    guard self._isOpen else {
+      return
+    }
+
+    self._invokeReceive(part, onContextAtUncheckedIndex: index)
+  }
+
+  /// Invoke receive on the context at the given index, assuming that the index is valid and the
+  /// pipeline is still open.
+  @inlinable
+  internal func _invokeReceive(
+    _ part: GRPCClientResponsePart<Response>,
+    onContextAtUncheckedIndex index: Int
+  ) {
+    switch index {
+    case self._headIndex:
+      self._invokeReceive(part, onContextAtUncheckedIndex: self._nextInboundIndex(after: index))
+
+    case self._tailIndex:
+      if part.isEnd {
+        self.close()
+      }
+      self._onResponsePart(part)
+
+    default:
+      self._userContexts[index].invokeReceive(part)
+    }
   }
 
   /// Emit an error into the interceptor pipeline.
@@ -164,9 +231,70 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ///
   /// - Parameter error: The error to emit.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   internal func errorCaught(_ error: Error) {
+    self.invokeErrorCaught(error, fromContextAtIndex: self._headIndex)
+  }
+
+  /// Invoke `errorCaught` on the appropriate context when called from the context at the given
+  /// index.
+  @inlinable
+  internal func invokeErrorCaught(_ error: Error, fromContextAtIndex index: Int) {
+    self._invokeErrorCaught(error, onContextAtIndex: self._nextInboundIndex(after: index))
+  }
+
+  /// Invoke `errorCaught` on the context at the given index if that index exists and the pipeline
+  /// is still open.
+  @inlinable
+  internal func _invokeErrorCaught(_ error: Error, onContextAtIndex index: Int) {
     self.eventLoop.assertInEventLoop()
-    self._head?.invokeErrorCaught(error)
+    assert(self._indexIsValid(index))
+    guard self._isOpen else {
+      return
+    }
+    self._invokeErrorCaught(error, onContextAtUncheckedIndex: index)
+  }
+
+  /// Invoke `errorCaught` on the context at the given index assuming the index exists and the
+  /// pipeline is still open.
+  @inlinable
+  internal func _invokeErrorCaught(_ error: Error, onContextAtUncheckedIndex index: Int) {
+    switch index {
+    case self._headIndex:
+      self._invokeErrorCaught(error, onContextAtIndex: self._nextInboundIndex(after: index))
+
+    case self._tailIndex:
+      self._errorCaught(error)
+
+    default:
+      self._userContexts[index].invokeErrorCaught(error)
+    }
+  }
+
+  /// Handles a caught error which has traversed the interceptor pipeline.
+  @usableFromInline
+  internal func _errorCaught(_ error: Error) {
+    // We're about to complete, close the pipeline.
+    self.close()
+
+    var unwrappedError: Error
+
+    // Unwrap the error, if possible.
+    if let errorContext = error as? GRPCError.WithContext {
+      unwrappedError = errorContext.error
+      self._errorDelegate?.didCatchError(
+        errorContext.error,
+        logger: self.logger,
+        file: errorContext.file,
+        line: errorContext.line
+      )
+    } else {
+      unwrappedError = error
+      self._errorDelegate?.didCatchErrorWithoutContext(error, logger: self.logger)
+    }
+
+    // Emit the unwrapped error.
+    self._onError(unwrappedError)
   }
 
   /// Writes a request message into the interceptor pipeline.
@@ -179,12 +307,60 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   /// - Important: This *must* to be called from the `eventLoop`.
   @inlinable
   internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
-    self.eventLoop.assertInEventLoop()
+    self.invokeSend(part, promise: promise, fromContextAtIndex: self._tailIndex)
+  }
 
-    if let tail = self._tail {
-      tail.invokeSend(part, promise: promise)
-    } else {
+  /// Invoke send on the appropriate context when called from the context at the given index.
+  @inlinable
+  internal func invokeSend(
+    _ part: GRPCClientRequestPart<Request>,
+    promise: EventLoopPromise<Void>?,
+    fromContextAtIndex index: Int
+  ) {
+    self._invokeSend(
+      part,
+      promise: promise,
+      onContextAtIndex: self._nextOutboundIndex(after: index)
+    )
+  }
+
+  /// Invoke send on the context at the given index, if it exists and the pipeline is still open.
+  @inlinable
+  internal func _invokeSend(
+    _ part: GRPCClientRequestPart<Request>,
+    promise: EventLoopPromise<Void>?,
+    onContextAtIndex index: Int
+  ) {
+    self.eventLoop.assertInEventLoop()
+    assert(self._indexIsValid(index))
+    guard self._isOpen else {
       promise?.fail(GRPCError.AlreadyComplete())
+      return
+    }
+    self._invokeSend(part, promise: promise, onContextAtUncheckedIndex: index)
+  }
+
+  /// Invoke send on the context at the given index assuming the index exists and the pipeline is
+  /// still open.
+  @inlinable
+  internal func _invokeSend(
+    _ part: GRPCClientRequestPart<Request>,
+    promise: EventLoopPromise<Void>?,
+    onContextAtUncheckedIndex index: Int
+  ) {
+    switch index {
+    case self._headIndex:
+      self._onRequestPart(part, promise)
+
+    case self._tailIndex:
+      self._invokeSend(
+        part,
+        promise: promise,
+        onContextAtUncheckedIndex: self._nextOutboundIndex(after: index)
+      )
+
+    default:
+      self._userContexts[index].invokeSend(part, promise: promise)
     }
   }
 
@@ -194,13 +370,52 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ///
   /// - Parameter promise: A promise to complete when the cancellation request has been handled.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   internal func cancel(promise: EventLoopPromise<Void>?) {
-    self.eventLoop.assertInEventLoop()
+    self.invokeCancel(promise: promise, fromContextAtIndex: self._tailIndex)
+  }
 
-    if let tail = self._tail {
-      tail.invokeCancel(promise: promise)
-    } else {
+  /// Invoke `cancel` on the appropriate context when called from the context at the given index.
+  @inlinable
+  internal func invokeCancel(promise: EventLoopPromise<Void>?, fromContextAtIndex index: Int) {
+    self._invokeCancel(promise: promise, onContextAtIndex: self._nextOutboundIndex(after: index))
+  }
+
+  /// Invoke `cancel` on the context at the given index if the index is valid and the pipeline is
+  /// still open.
+  @inlinable
+  internal func _invokeCancel(
+    promise: EventLoopPromise<Void>?,
+    onContextAtIndex index: Int
+  ) {
+    self.eventLoop.assertInEventLoop()
+    assert(self._indexIsValid(index))
+    guard self._isOpen else {
       promise?.fail(GRPCError.AlreadyComplete())
+      return
+    }
+    self._invokeCancel(promise: promise, onContextAtUncheckedIndex: index)
+  }
+
+  /// Invoke `cancel` on the context at the given index assuming the index is valid and the
+  /// pipeline is still open.
+  @inlinable
+  internal func _invokeCancel(
+    promise: EventLoopPromise<Void>?,
+    onContextAtUncheckedIndex index: Int
+  ) {
+    switch index {
+    case self._headIndex:
+      self._onCancel(promise)
+
+    case self._tailIndex:
+      self._invokeCancel(
+        promise: promise,
+        onContextAtUncheckedIndex: self._nextOutboundIndex(after: index)
+      )
+
+    default:
+      self._userContexts[index].invokeCancel(promise: promise)
     }
   }
 }
@@ -211,90 +426,71 @@ extension ClientInterceptorPipeline {
   /// Closes the pipeline. This should be called once, by the tail interceptor, to indicate that
   /// the RPC has completed.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   internal func close() {
     self.eventLoop.assertInEventLoop()
-
-    // Grab the head, we'll use it to cancel the transport. This is most likely already closed,
-    // but there's nothing to stop an interceptor from emitting its own error and leaving the
-    // transport open.
-    let head = self._head
-    self.contexts = nil
+    self._isOpen = false
 
     // Cancel the timeout.
-    self.scheduledClose?.cancel()
-    self.scheduledClose = nil
+    self._scheduledClose?.cancel()
+    self._scheduledClose = nil
 
     // Cancel the transport.
-    head?.invokeCancel(promise: nil)
+    self._onCancel(nil)
   }
 
   /// Sets up a deadline for the pipeline.
-  private func setupDeadline() {
-    if self.eventLoop.inEventLoop {
-      self._setupDeadline()
-    } else {
-      self.eventLoop.execute {
-        self._setupDeadline()
-      }
-    }
-  }
+  @inlinable
+  internal func _setupDeadline() {
+    func setup() {
+      self.eventLoop.assertInEventLoop()
 
-  /// Sets up a deadline for the pipeline.
-  /// - Important: This *must* to be called from the `eventLoop`.
-  private func _setupDeadline() {
-    self.eventLoop.assertInEventLoop()
+      let timeLimit = self.details.options.timeLimit
+      let deadline = timeLimit.makeDeadline()
 
-    let timeLimit = self.details.options.timeLimit
-    let deadline = timeLimit.makeDeadline()
+      // There's no point scheduling this.
+      if deadline == .distantFuture {
+        return
+      }
 
-    // There's no point scheduling this.
-    if deadline == .distantFuture {
-      return
+      self._scheduledClose = self.eventLoop.scheduleTask(deadline: deadline) {
+        // When the error hits the tail we'll call 'close()', this will cancel the transport if
+        // necessary.
+        self.errorCaught(GRPCError.RPCTimedOut(timeLimit))
+      }
     }
 
-    self.scheduledClose = self.eventLoop.scheduleTask(deadline: deadline) {
-      // When the error hits the tail we'll call 'close()', this will cancel the transport if
-      // necessary.
-      self.errorCaught(GRPCError.RPCTimedOut(timeLimit))
+    if self.eventLoop.inEventLoop {
+      setup()
+    } else {
+      self.eventLoop.execute {
+        setup()
+      }
     }
   }
 }
 
-private extension InterceptorContextList {
-  init<Request, Response>(
-    for pipeline: ClientInterceptorPipeline<Request, Response>,
-    interceptors: [ClientInterceptor<Request, Response>],
-    errorDelegate: ClientErrorDelegate?,
-    onError: @escaping (Error) -> Void,
-    onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
-    onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void,
-    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
-  ) where Element == ClientInterceptorContext<Request, Response> {
-    let middle = interceptors.enumerated().map { index, interceptor in
-      ClientInterceptorContext(
-        for: .userProvided(interceptor),
-        atIndex: index,
-        in: pipeline
-      )
-    }
+extension ClientInterceptorContext {
+  @inlinable
+  internal func invokeReceive(_ part: GRPCClientResponsePart<Response>) {
+    self.interceptor.receive(part, context: self)
+  }
 
-    let first = ClientInterceptorContext<Request, Response>(
-      for: .tail(
-        for: pipeline,
-        errorDelegate: errorDelegate,
-        onError: onError,
-        onResponsePart: onResponsePart
-      ),
-      atIndex: middle.startIndex - 1,
-      in: pipeline
-    )
+  @inlinable
+  internal func invokeSend(
+    _ part: GRPCClientRequestPart<Request>,
+    promise: EventLoopPromise<Void>?
+  ) {
+    self.interceptor.send(part, promise: promise, context: self)
+  }
 
-    let last = ClientInterceptorContext<Request, Response>(
-      for: .head(onCancel: onCancel, onRequestPart: onRequestPart),
-      atIndex: middle.endIndex,
-      in: pipeline
-    )
+  @inlinable
+  internal func invokeCancel(promise: EventLoopPromise<Void>?) {
+    self.interceptor.cancel(promise: promise, context: self)
+  }
 
-    self.init(first: first, middle: middle, last: last)
+  @inlinable
+  internal func invokeErrorCaught(_ error: Error) {
+    self.interceptor.errorCaught(error, context: self)
   }
 }

+ 0 - 264
Sources/GRPC/Interceptor/ClientInterceptors.swift

@@ -97,267 +97,3 @@ open class ClientInterceptor<Request, Response> {
     context.cancel(promise: promise)
   }
 }
-
-// MARK: - Head/Tail
-
-/// An interceptor which offloads requests to the transport and forwards any response parts to the
-/// rest of the pipeline.
-@usableFromInline
-internal struct HeadClientInterceptor<Request, Response>: ClientInterceptorProtocol {
-  /// Called when a cancellation has been requested.
-  private let onCancel: (EventLoopPromise<Void>?) -> Void
-
-  /// Called when a request part has been written.
-  @usableFromInline
-  internal let _onRequestPart: (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
-
-  init(
-    onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
-    onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
-  ) {
-    self.onCancel = onCancel
-    self._onRequestPart = onRequestPart
-  }
-
-  @inlinable
-  internal func send(
-    _ part: GRPCClientRequestPart<Request>,
-    promise: EventLoopPromise<Void>?,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    self._onRequestPart(part, promise)
-  }
-
-  internal func cancel(
-    promise: EventLoopPromise<Void>?,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    self.onCancel(promise)
-  }
-
-  internal func receive(
-    _ part: GRPCClientResponsePart<Response>,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    context.receive(part)
-  }
-
-  internal func errorCaught(
-    _ error: Error,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    context.errorCaught(error)
-  }
-}
-
-/// An interceptor which offloads responses to a provided callback and forwards any requests parts
-/// and cancellation requests to rest of the pipeline.
-@usableFromInline
-internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProtocol {
-  /// The pipeline this interceptor belongs to.
-  private let pipeline: ClientInterceptorPipeline<Request, Response>
-
-  /// A user-provided error delegate.
-  private let errorDelegate: ClientErrorDelegate?
-
-  /// A callback invoked when an error is received.
-  private let onErrorCaught: (Error) -> Void
-
-  /// A response part handler; typically this will complete some promises, for streaming responses
-  /// it will also invoke a user-supplied handler. This closure may also be provided by the user.
-  /// We need to be careful about re-entrancy.
-  private let onResponsePart: (GRPCClientResponsePart<Response>) -> Void
-
-  internal init(
-    for pipeline: ClientInterceptorPipeline<Request, Response>,
-    errorDelegate: ClientErrorDelegate?,
-    _ onErrorCaught: @escaping (Error) -> Void,
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
-  ) {
-    self.pipeline = pipeline
-    self.errorDelegate = errorDelegate
-    self.onErrorCaught = onErrorCaught
-    self.onResponsePart = onResponsePart
-  }
-
-  internal func receive(
-    _ part: GRPCClientResponsePart<Response>,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    switch part {
-    case .metadata, .message:
-      ()
-    case .end:
-      // We're about to complete, close the pipeline before calling out via `onResponsePart`.
-      self.pipeline.close()
-    }
-
-    self.onResponsePart(part)
-  }
-
-  internal func errorCaught(
-    _ error: Error,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    // We're about to complete, close the pipeline before calling out via the error delegate
-    // or `onResponsePart`.
-    self.pipeline.close()
-
-    var unwrappedError: Error
-
-    // Unwrap the error, if possible.
-    if let errorContext = error as? GRPCError.WithContext {
-      unwrappedError = errorContext.error
-      self.errorDelegate?.didCatchError(
-        errorContext.error,
-        logger: context.logger,
-        file: errorContext.file,
-        line: errorContext.line
-      )
-    } else {
-      unwrappedError = error
-      self.errorDelegate?.didCatchErrorWithoutContext(
-        error,
-        logger: context.logger
-      )
-    }
-
-    // Emit the unwrapped error.
-    self.onErrorCaught(unwrappedError)
-  }
-
-  @inlinable
-  internal func send(
-    _ part: GRPCClientRequestPart<Request>,
-    promise: EventLoopPromise<Void>?,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    context.send(part, promise: promise)
-  }
-
-  internal func cancel(
-    promise: EventLoopPromise<Void>?,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    context.cancel(promise: promise)
-  }
-}
-
-// MARK: - Any Interceptor
-
-/// A wrapping interceptor which delegates to the implementation of an underlying interceptor.
-@usableFromInline
-internal struct AnyClientInterceptor<Request, Response>: ClientInterceptorProtocol {
-  @usableFromInline
-  internal enum Implementation {
-    case head(HeadClientInterceptor<Request, Response>)
-    case tail(TailClientInterceptor<Request, Response>)
-    case base(ClientInterceptor<Request, Response>)
-  }
-
-  /// The underlying interceptor implementation.
-  @usableFromInline
-  internal let _implementation: Implementation
-
-  /// Makes a head interceptor.
-  /// - Returns: An `AnyClientInterceptor` which wraps a `HeadClientInterceptor`.
-  internal static func head(
-    onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
-    onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
-  ) -> AnyClientInterceptor<Request, Response> {
-    return .init(.head(.init(onCancel: onCancel, onRequestPart: onRequestPart)))
-  }
-
-  /// Makes a tail interceptor.
-  /// - Parameters:
-  ///   - pipeline: The pipeline the tail interceptor belongs to.
-  ///   - errorDelegate: An error delegate.
-  ///   - onError: A callback invoked when an error is received.
-  ///   - onResponsePart: A handler called for each response part received from the pipeline.
-  /// - Returns: An `AnyClientInterceptor` which wraps a `TailClientInterceptor`.
-  internal static func tail(
-    for pipeline: ClientInterceptorPipeline<Request, Response>,
-    errorDelegate: ClientErrorDelegate?,
-    onError: @escaping (Error) -> Void,
-    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
-  ) -> AnyClientInterceptor<Request, Response> {
-    let tail = TailClientInterceptor(
-      for: pipeline,
-      errorDelegate: errorDelegate,
-      onError,
-      onResponsePart
-    )
-    return .init(.tail(tail))
-  }
-
-  /// A user provided interceptor.
-  /// - Parameter interceptor: The interceptor to wrap.
-  /// - Returns: An `AnyClientInterceptor` which wraps `interceptor`.
-  internal static func userProvided(
-    _ interceptor: ClientInterceptor<Request, Response>
-  ) -> AnyClientInterceptor<Request, Response> {
-    return .init(.base(interceptor))
-  }
-
-  private init(_ implementation: Implementation) {
-    self._implementation = implementation
-  }
-
-  internal func receive(
-    _ part: GRPCClientResponsePart<Response>,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    switch self._implementation {
-    case let .head(handler):
-      handler.receive(part, context: context)
-    case let .tail(handler):
-      handler.receive(part, context: context)
-    case let .base(handler):
-      handler.receive(part, context: context)
-    }
-  }
-
-  internal func errorCaught(
-    _ error: Error,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    switch self._implementation {
-    case let .head(handler):
-      handler.errorCaught(error, context: context)
-    case let .tail(handler):
-      handler.errorCaught(error, context: context)
-    case let .base(handler):
-      handler.errorCaught(error, context: context)
-    }
-  }
-
-  @inlinable
-  internal func send(
-    _ part: GRPCClientRequestPart<Request>,
-    promise: EventLoopPromise<Void>?,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    switch self._implementation {
-    case let .head(handler):
-      handler.send(part, promise: promise, context: context)
-    case let .tail(handler):
-      handler.send(part, promise: promise, context: context)
-    case let .base(handler):
-      handler.send(part, promise: promise, context: context)
-    }
-  }
-
-  internal func cancel(
-    promise: EventLoopPromise<Void>?,
-    context: ClientInterceptorContext<Request, Response>
-  ) {
-    switch self._implementation {
-    case let .head(handler):
-      handler.cancel(promise: promise, context: context)
-    case let .tail(handler):
-      handler.cancel(promise: promise, context: context)
-    case let .base(handler):
-      handler.cancel(promise: promise, context: context)
-    }
-  }
-}

+ 12 - 0
Sources/GRPC/Interceptor/MessageParts.swift

@@ -76,6 +76,18 @@ public struct MessageMetadata: Equatable {
   }
 }
 
+extension GRPCClientResponsePart {
+  @inlinable
+  internal var isEnd: Bool {
+    switch self {
+    case .end:
+      return true
+    case .metadata, .message:
+      return false
+    }
+  }
+}
+
 extension GRPCServerResponsePart {
   @inlinable
   internal var isEnd: Bool {

+ 1 - 0
Sources/GRPC/TimeLimit.swift

@@ -83,6 +83,7 @@ public struct TimeLimit: Equatable, CustomStringConvertible {
 
 extension TimeLimit {
   /// Make a non-distant-future deadline from the give time limit.
+  @usableFromInline
   internal func makeDeadline() -> NIODeadline {
     switch self.wrapped {
     case .none: