Browse Source

Avoid copying server interceptor context (#1090)

Motivation:

The server interceptor pipeline holds an array of interceptor contexts.
Each context is a struct and also holds a reference to the pipeline.
Each time the context asks the pipeline for the next inbound/outbound
context we incur a copy of the context which is surprisingly costly due
to retain and release traffic for the pipeline ref as well as the cost
of initializing the unspecialized contexts.

Modifications:

- interceptor contexts now ask the pipeline to perform an operation for
  them rather than asking for a context on which to perform the
  operation, this avoids copying contexts
- as the pipeline is now responsible for performing the operations a
  further optimisation is opened up: head and tail interceptors and
  contexts are no longer required, the same action can be done directly
  by the pipeline

Result:

- An 11.2% reduction in instructions in the
  embedded_server_unary_10k_small_requests benchmark
George Barnett 5 years ago
parent
commit
6d0487603d

+ 12 - 0
Sources/GRPC/Interceptor/MessageParts.swift

@@ -75,3 +75,15 @@ public struct MessageMetadata: Equatable {
     self.flush = flush
   }
 }
+
+extension GRPCServerResponsePart {
+  @inlinable
+  internal var isEnd: Bool {
+    switch self {
+    case .end:
+      return true
+    case .metadata, .message:
+      return false
+    }
+  }
+}

+ 6 - 37
Sources/GRPC/Interceptor/ServerInterceptorContext.swift

@@ -19,7 +19,7 @@ import NIO
 public struct ServerInterceptorContext<Request, Response> {
   /// The interceptor this context is for.
   @usableFromInline
-  internal let interceptor: AnyServerInterceptor<Request, Response>
+  internal let interceptor: ServerInterceptor<Request, Response>
 
   /// The pipeline this context is associated with.
   @usableFromInline
@@ -29,18 +29,6 @@ public struct ServerInterceptorContext<Request, Response> {
   @usableFromInline
   internal let _index: Int
 
-  // The next context in the inbound direction, if one exists.
-  @inlinable
-  internal var _nextInbound: ServerInterceptorContext<Request, Response>? {
-    return self._pipeline.nextInboundContext(forIndex: self._index)
-  }
-
-  // The next context in the outbound direction, if one exists.
-  @inlinable
-  internal var _nextOutbound: ServerInterceptorContext<Request, Response>? {
-    return self._pipeline.nextOutboundContext(forIndex: self._index)
-  }
-
   /// The `EventLoop` this interceptor pipeline is being executed on.
   public var eventLoop: EventLoop {
     return self._pipeline.eventLoop
@@ -87,7 +75,7 @@ public struct ServerInterceptorContext<Request, Response> {
   /// interceptor pipeline.
   @inlinable
   internal init(
-    for interceptor: AnyServerInterceptor<Request, Response>,
+    for interceptor: ServerInterceptor<Request, Response>,
     atIndex index: Int,
     in pipeline: ServerInterceptorPipeline<Request, Response>
   ) {
@@ -100,8 +88,9 @@ public struct ServerInterceptorContext<Request, Response> {
   ///
   /// - Parameter part: The request part to forward.
   /// - Important: This *must* to be called from the `eventLoop`.
+  @inlinable
   public func receive(_ part: GRPCServerRequestPart<Request>) {
-    self._nextInbound?.invokeReceive(part)
+    self._pipeline.invokeReceive(part, fromContextAtIndex: self._index)
   }
 
   /// Forwards the response part to the next outbound interceptor in the pipeline, if there is one.
@@ -110,31 +99,11 @@ public struct ServerInterceptorContext<Request, Response> {
   ///   - part: The response part to forward.
   ///   - promise: The promise the complete when the part has been written.
   /// - Important: This *must* to be called from the `eventLoop`.
-  public func send(
-    _ part: GRPCServerResponsePart<Response>,
-    promise: EventLoopPromise<Void>?
-  ) {
-    if let outbound = self._nextOutbound {
-      outbound.invokeSend(part, promise: promise)
-    } else {
-      promise?.fail(GRPCError.AlreadyComplete())
-    }
-  }
-}
-
-extension ServerInterceptorContext {
   @inlinable
-  internal func invokeReceive(_ part: GRPCServerRequestPart<Request>) {
-    self.eventLoop.assertInEventLoop()
-    self.interceptor.receive(part, context: self)
-  }
-
-  @inlinable
-  internal func invokeSend(
+  public func send(
     _ part: GRPCServerResponsePart<Response>,
     promise: EventLoopPromise<Void>?
   ) {
-    self.eventLoop.assertInEventLoop()
-    self.interceptor.send(part, promise: promise, context: self)
+    self._pipeline.invokeSend(part, promise: promise, fromContextAtIndex: self._index)
   }
 }

+ 171 - 79
Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift

@@ -42,56 +42,53 @@ internal final class ServerInterceptorPipeline<Request, Response> {
   @usableFromInline
   internal let userInfoRef: Ref<UserInfo>
 
-  /// The contexts associated with the interceptors stored in this pipeline. Contexts will be
-  /// removed once the RPC has completed. Contexts are ordered from inbound to outbound, that is,
-  /// the head is first and the tail is last.
+  /// Called when a response part has traversed the interceptor pipeline.
   @usableFromInline
-  internal var _contexts: InterceptorContextList<ServerInterceptorContext<Request, Response>>?
+  internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
 
-  /// Returns the next context in the outbound direction for the context at the given index, if one
-  /// exists.
-  /// - Parameter index: The index of the `ServerInterceptorContext` which is requesting the next
-  ///   outbound context.
-  /// - Returns: The `ServerInterceptorContext` or `nil` if one does not exist.
-  @inlinable
-  internal func nextOutboundContext(
-    forIndex index: Int
-  ) -> ServerInterceptorContext<Request, Response>? {
-    return self._context(atIndex: index - 1)
-  }
+  /// Called when a request part has traversed the interceptor pipeline.
+  @usableFromInline
+  internal let _onRequestPart: (GRPCServerRequestPart<Request>) -> Void
 
-  /// Returns the next context in the inbound direction for the context at the given index, if one
-  /// exists.
-  /// - Parameter index: The index of the `ServerInterceptorContext` which is requesting the next
-  ///   inbound context.
-  /// - Returns: The `ServerInterceptorContext` or `nil` if one does not exist.
-  @inlinable
-  internal func nextInboundContext(
-    forIndex index: Int
-  ) -> ServerInterceptorContext<Request, Response>? {
-    return self._context(atIndex: index + 1)
-  }
+  /// The index before the first user interceptor context index. (always -1).
+  @usableFromInline
+  internal let _headIndex: Int
+
+  /// The index after the last user interceptor context index (i.e. 'userContext.endIndex').
+  @usableFromInline
+  internal let _tailIndex: Int
 
-  /// Returns the context for the given index, if one exists.
-  /// - Parameter index: The index of the `ServerInterceptorContext` to return.
-  /// - Returns: The `ServerInterceptorContext` or `nil` if one does not exist for the given index.
+  /// Contexts for user provided interceptors.
+  @usableFromInline
+  internal var _userContexts: [ServerInterceptorContext<Request, Response>]
+
+  /// Whether the interceptor pipeline is still open. It becomes closed after an 'end' response
+  /// part has traversed the pipeline.
+  @usableFromInline
+  internal var _isOpen = true
+
+  /// The index of the next context on the inbound side of the context at the given index.
   @inlinable
-  internal func _context(atIndex index: Int) -> ServerInterceptorContext<Request, Response>? {
-    return self._contexts?[checked: index]
+  internal func _nextInboundIndex(after index: Int) -> Int {
+    // Unchecked arithmetic is okay here: our greatest inbound index is '_tailIndex' but we will
+    // never ask for the inbound index after the tail.
+    assert(self._indexIsValid(index))
+    return index &+ 1
   }
 
-  /// The context closest to the `NIO.Channel`, i.e. where inbound events originate. This will be
-  /// `nil` once the RPC has completed.
+  /// The index of the next context on the outbound side of the context at the given index.
   @inlinable
-  internal var head: ServerInterceptorContext<Request, Response>? {
-    return self._contexts?.first
+  internal func _nextOutboundIndex(after index: Int) -> Int {
+    // Unchecked arithmetic is okay here: our lowest outbound index is '_headIndex' but we will
+    // never ask for the outbound index after the head.
+    assert(self._indexIsValid(index))
+    return index &- 1
   }
 
-  /// The context closest to the application, i.e. where outbound events originate. This will be
-  /// `nil` once the RPC has completed.
+  /// Returns true of the index is in the range `_headIndex ... _tailIndex`.
   @inlinable
-  internal var tail: ServerInterceptorContext<Request, Response>? {
-    return self._contexts?.last
+  internal func _indexIsValid(_ index: Int) -> Bool {
+    return self._headIndex <= index && index <= self._tailIndex
   }
 
   @inlinable
@@ -113,13 +110,22 @@ internal final class ServerInterceptorPipeline<Request, Response> {
     self.remoteAddress = remoteAddress
     self.userInfoRef = userInfoRef
 
-    // We need space for the head and tail as well as any user provided interceptors.
-    self._contexts = InterceptorContextList(
-      for: self,
-      interceptors: interceptors,
-      onRequestPart: onRequestPart,
-      onResponsePart: onResponsePart
-    )
+    self._onResponsePart = onResponsePart
+    self._onRequestPart = onRequestPart
+
+    // Head comes before user interceptors.
+    self._headIndex = -1
+    // Tail comes just after.
+    self._tailIndex = interceptors.endIndex
+
+    // Make some contexts.
+    self._userContexts = []
+    self._userContexts.reserveCapacity(interceptors.count)
+
+    for index in 0 ..< interceptors.count {
+      let context = ServerInterceptorContext(for: interceptors[index], atIndex: index, in: self)
+      self._userContexts.append(context)
+    }
   }
 
   /// Emit a request part message into the interceptor pipeline.
@@ -128,8 +134,55 @@ internal final class ServerInterceptorPipeline<Request, Response> {
   /// - Important: This *must* to be called from the `eventLoop`.
   @inlinable
   internal func receive(_ part: GRPCServerRequestPart<Request>) {
+    self._invokeReceive(part, onContextAtIndex: self._headIndex)
+  }
+
+  /// Invoke receive on the appropriate context when called from the context at the given index.
+  @inlinable
+  internal func invokeReceive(
+    _ part: GRPCServerRequestPart<Request>,
+    fromContextAtIndex index: Int
+  ) {
+    self._invokeReceive(part, onContextAtIndex: self._nextInboundIndex(after: index))
+  }
+
+  /// Invoke receive on the context at the given index, if doing so is safe.
+  @inlinable
+  internal func _invokeReceive(
+    _ part: GRPCServerRequestPart<Request>,
+    onContextAtIndex index: Int
+  ) {
     self.eventLoop.assertInEventLoop()
-    self.head?.invokeReceive(part)
+    assert(self._indexIsValid(index))
+    guard self._isOpen else {
+      return
+    }
+
+    // We've checked the index.
+    self._invokeReceive(part, onContextAtUncheckedIndex: index)
+  }
+
+  /// Invoke receive on the context at the given index, assuming that the index is valid and the
+  /// pipeline is still open.
+  @inlinable
+  internal func _invokeReceive(
+    _ part: GRPCServerRequestPart<Request>,
+    onContextAtUncheckedIndex index: Int
+  ) {
+    switch index {
+    case self._headIndex:
+      // The next inbound index must exist, either for the tail or a user interceptor.
+      self._invokeReceive(
+        part,
+        onContextAtUncheckedIndex: self._nextInboundIndex(after: self._headIndex)
+      )
+
+    case self._tailIndex:
+      self._onRequestPart(part)
+
+    default:
+      self._userContexts[index].invokeReceive(part)
+    }
   }
 
   /// Write a response message into the interceptor pipeline.
@@ -140,50 +193,89 @@ internal final class ServerInterceptorPipeline<Request, Response> {
   /// - Important: This *must* to be called from the `eventLoop`.
   @inlinable
   internal func send(_ part: GRPCServerResponsePart<Response>, promise: EventLoopPromise<Void>?) {
-    self.eventLoop.assertInEventLoop()
+    self._invokeSend(part, promise: promise, onContextAtIndex: self._tailIndex)
+  }
 
-    if let tail = self.tail {
-      tail.invokeSend(part, promise: promise)
-    } else {
-      promise?.fail(GRPCError.AlreadyComplete())
-    }
+  /// Invoke send on the appropriate context when called from the context at the given index.
+  @inlinable
+  internal func invokeSend(
+    _ part: GRPCServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?,
+    fromContextAtIndex index: Int
+  ) {
+    self._invokeSend(
+      part,
+      promise: promise,
+      onContextAtIndex: self._nextOutboundIndex(after: index)
+    )
   }
 
+  /// Invoke send on the context at the given index, if doing so is safe. Fails the `promise` if it
+  /// is not safe to do so.
   @inlinable
-  internal func close() {
+  internal func _invokeSend(
+    _ part: GRPCServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?,
+    onContextAtIndex index: Int
+  ) {
     self.eventLoop.assertInEventLoop()
-    self._contexts = nil
+    assert(self._indexIsValid(index))
+    guard self._isOpen else {
+      promise?.fail(GRPCError.AlreadyComplete())
+      return
+    }
+
+    self._invokeSend(uncheckedIndex: index, part, promise: promise)
   }
-}
 
-extension InterceptorContextList {
+  /// Invoke send on the context at the given index, assuming that the index is valid and the
+  /// pipeline is still open.
   @inlinable
-  init<Request, Response>(
-    for pipeline: ServerInterceptorPipeline<Request, Response>,
-    interceptors: [ServerInterceptor<Request, Response>],
-    onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
-    onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
-  ) where Element == ServerInterceptorContext<Request, Response> {
-    let middle = interceptors.enumerated().map { index, interceptor in
-      ServerInterceptorContext(
-        for: .userProvided(interceptor),
-        atIndex: index,
-        in: pipeline
+  internal func _invokeSend(
+    uncheckedIndex index: Int,
+    _ part: GRPCServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?
+  ) {
+    switch index {
+    case self._headIndex:
+      if part.isEnd {
+        self.close()
+      }
+      self._onResponsePart(part, promise)
+
+    case self._tailIndex:
+      // The next outbound index must exist: it will be the head or a user interceptor.
+      self._invokeSend(
+        uncheckedIndex: self._nextOutboundIndex(after: self._tailIndex),
+        part,
+        promise: promise
       )
+
+    default:
+      self._userContexts[index].invokeSend(part, promise: promise)
     }
+  }
 
-    let first = ServerInterceptorContext<Request, Response>(
-      for: .head(for: pipeline, onResponsePart),
-      atIndex: middle.startIndex - 1,
-      in: pipeline
-    )
+  @inlinable
+  internal func close() {
+    // We're no longer open.
+    self._isOpen = false
+    // Each context hold a ref to the pipeline; break the retain cycle.
+    self._userContexts.removeAll()
+  }
+}
 
-    let last = ServerInterceptorContext<Request, Response>(
-      for: .tail(onRequestPart),
-      atIndex: middle.endIndex,
-      in: pipeline
-    )
+extension ServerInterceptorContext {
+  @inlinable
+  internal func invokeReceive(_ part: GRPCServerRequestPart<Request>) {
+    self.interceptor.receive(part, context: self)
+  }
 
-    self.init(first: first, middle: middle, last: last)
+  @inlinable
+  internal func invokeSend(
+    _ part: GRPCServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?
+  ) {
+    self.interceptor.send(part, promise: promise, context: self)
   }
 }

+ 0 - 157
Sources/GRPC/Interceptor/ServerInterceptors.swift

@@ -69,160 +69,3 @@ open class ServerInterceptor<Request, Response> {
     context.send(part, promise: promise)
   }
 }
-
-// MARK: Head/Tail
-
-/// An interceptor which offloads requests to the service provider and forwards any response parts
-/// to the rest of the pipeline.
-@usableFromInline
-internal struct TailServerInterceptor<Request, Response> {
-  /// Called when a request part has been received.
-  @usableFromInline
-  internal let _onRequestPart: (GRPCServerRequestPart<Request>) -> Void
-
-  @inlinable
-  init(
-    _ onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void
-  ) {
-    self._onRequestPart = onRequestPart
-  }
-
-  @inlinable
-  internal func receive(
-    _ part: GRPCServerRequestPart<Request>,
-    context: ServerInterceptorContext<Request, Response>
-  ) {
-    self._onRequestPart(part)
-  }
-
-  @inlinable
-  internal func send(
-    _ part: GRPCServerResponsePart<Response>,
-    promise: EventLoopPromise<Void>?,
-    context: ServerInterceptorContext<Request, Response>
-  ) {
-    context.send(part, promise: promise)
-  }
-}
-
-@usableFromInline
-internal struct HeadServerInterceptor<Request, Response> {
-  /// The pipeline this interceptor belongs to.
-  @usableFromInline
-  internal let _pipeline: ServerInterceptorPipeline<Request, Response>
-
-  /// Called when a response part has been received.
-  @usableFromInline
-  internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
-
-  @inlinable
-  internal init(
-    for pipeline: ServerInterceptorPipeline<Request, Response>,
-    _ onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
-  ) {
-    self._pipeline = pipeline
-    self._onResponsePart = onResponsePart
-  }
-
-  @inlinable
-  internal func receive(
-    _ part: GRPCServerRequestPart<Request>,
-    context: ServerInterceptorContext<Request, Response>
-  ) {
-    context.receive(part)
-  }
-
-  @inlinable
-  internal func send(
-    _ part: GRPCServerResponsePart<Response>,
-    promise: EventLoopPromise<Void>?,
-    context: ServerInterceptorContext<Request, Response>
-  ) {
-    // Close the pipeline on end.
-    switch part {
-    case .metadata, .message:
-      ()
-    case .end:
-      self._pipeline.close()
-    }
-    self._onResponsePart(part, promise)
-  }
-}
-
-// MARK: - Any Interceptor
-
-/// A wrapping interceptor which delegates to the implementation of an underlying interceptor.
-@usableFromInline
-internal struct AnyServerInterceptor<Request, Response> {
-  @usableFromInline
-  internal enum Implementation {
-    case head(HeadServerInterceptor<Request, Response>)
-    case tail(TailServerInterceptor<Request, Response>)
-    case base(ServerInterceptor<Request, Response>)
-  }
-
-  /// The underlying interceptor implementation.
-  @usableFromInline
-  internal let _implementation: Implementation
-
-  @inlinable
-  internal static func head(
-    for pipeline: ServerInterceptorPipeline<Request, Response>,
-    _ onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
-  ) -> AnyServerInterceptor<Request, Response> {
-    return .init(.head(.init(for: pipeline, onResponsePart)))
-  }
-
-  @inlinable
-  internal static func tail(
-    _ onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void
-  ) -> AnyServerInterceptor<Request, Response> {
-    return .init(.tail(.init(onRequestPart)))
-  }
-
-  /// A user provided interceptor.
-  /// - Parameter interceptor: The interceptor to wrap.
-  /// - Returns: An `AnyServerInterceptor` which wraps `interceptor`.
-  @inlinable
-  internal static func userProvided(
-    _ interceptor: ServerInterceptor<Request, Response>
-  ) -> AnyServerInterceptor<Request, Response> {
-    return .init(.base(interceptor))
-  }
-
-  @inlinable
-  internal init(_ implementation: Implementation) {
-    self._implementation = implementation
-  }
-
-  @inlinable
-  internal func receive(
-    _ part: GRPCServerRequestPart<Request>,
-    context: ServerInterceptorContext<Request, Response>
-  ) {
-    switch self._implementation {
-    case let .head(interceptor):
-      interceptor.receive(part, context: context)
-    case let .tail(interceptor):
-      interceptor.receive(part, context: context)
-    case let .base(interceptor):
-      interceptor.receive(part, context: context)
-    }
-  }
-
-  @inlinable
-  internal func send(
-    _ part: GRPCServerResponsePart<Response>,
-    promise: EventLoopPromise<Void>?,
-    context: ServerInterceptorContext<Request, Response>
-  ) {
-    switch self._implementation {
-    case let .head(interceptor):
-      interceptor.send(part, promise: promise, context: context)
-    case let .tail(interceptor):
-      interceptor.send(part, promise: promise, context: context)
-    case let .base(interceptor):
-      interceptor.send(part, promise: promise, context: context)
-    }
-  }
-}