Przeglądaj źródła

[async-await] Base types for server implementation (#1249)

This commit implements some of the types required by the proposal for async/await support, added in #1231.

To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation").

It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`.

It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically:

* `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245.
* `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
Si Beaumont 4 lat temu
rodzic
commit
bd7f40a8fe

+ 26 - 0
Sources/GRPC/AsyncAwaitSupport/CancellationError+GRPCStatusTransformable.swift

@@ -0,0 +1,26 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if compiler(>=5.5)
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension CancellationError: GRPCStatusTransformable {
+  public func makeGRPCStatus() -> GRPCStatus {
+    return GRPCStatus(code: .unavailable, message: nil)
+  }
+}
+
+#endif

+ 55 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift

@@ -0,0 +1,55 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if compiler(>=5.5)
+
+/// This is currently a wrapper around AsyncThrowingStream because we want to be
+/// able to swap out the implementation for something else in the future.
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
+  @usableFromInline
+  internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
+
+  @usableFromInline
+  internal let _stream: _WrappedStream
+
+  @inlinable
+  internal init(_ stream: _WrappedStream) {
+    self._stream = stream
+  }
+
+  @inlinable
+  public func makeAsyncIterator() -> Iterator {
+    Self.AsyncIterator(self._stream)
+  }
+
+  public struct Iterator: AsyncIteratorProtocol {
+    @usableFromInline
+    internal var iterator: _WrappedStream.AsyncIterator
+
+    @usableFromInline
+    internal init(_ stream: _WrappedStream) {
+      self.iterator = stream.makeAsyncIterator()
+    }
+
+    @inlinable
+    public mutating func next() async throws -> Element? {
+      try await self.iterator.next()
+    }
+  }
+}
+
+#endif

+ 112 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift

@@ -0,0 +1,112 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if compiler(>=5.5)
+
+/// Writer for server-streaming RPC handlers to provide responses.
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public struct GRPCAsyncResponseStreamWriter<Response> {
+  @usableFromInline
+  internal typealias Element = (Response, Compression)
+
+  @usableFromInline
+  internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>
+
+  @usableFromInline
+  internal let asyncWriter: AsyncWriter<Delegate>
+
+  @inlinable
+  internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
+    self.asyncWriter = asyncWriter
+  }
+
+  @inlinable
+  public func send(
+    _ response: Response,
+    compression: Compression = .deferToCallDefault
+  ) async throws {
+    try await self.asyncWriter.write((response, compression))
+  }
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+@usableFromInline
+internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
+  @usableFromInline
+  internal typealias Element = (Response, Compression)
+
+  @usableFromInline
+  internal typealias End = GRPCStatus
+
+  @usableFromInline
+  internal let _context: GRPCAsyncServerCallContext
+
+  @usableFromInline
+  internal let _send: (Response, MessageMetadata) -> Void
+
+  @usableFromInline
+  internal let _finish: (GRPCStatus) -> Void
+
+  @usableFromInline
+  internal let _compressionEnabledOnServer: Bool
+
+  // Create a new AsyncResponseStreamWriterDelegate.
+  //
+  // - Important: the `send` and `finish` closures must be thread-safe.
+  @inlinable
+  internal init(
+    context: GRPCAsyncServerCallContext,
+    compressionIsEnabled: Bool,
+    send: @escaping (Response, MessageMetadata) -> Void,
+    finish: @escaping (GRPCStatus) -> Void
+  ) {
+    self._context = context
+    self._compressionEnabledOnServer = compressionIsEnabled
+    self._send = send
+    self._finish = finish
+  }
+
+  @inlinable
+  internal func _shouldCompress(_ compression: Compression) -> Bool {
+    guard self._compressionEnabledOnServer else {
+      return false
+    }
+    return compression.isEnabled(callDefault: self._context.compressionEnabled)
+  }
+
+  @inlinable
+  internal func _send(
+    _ response: Response,
+    compression: Compression = .deferToCallDefault
+  ) {
+    let compress = self._shouldCompress(compression)
+    self._send(response, .init(compress: compress, flush: true))
+  }
+
+  // MARK: - AsyncWriterDelegate conformance.
+
+  @inlinable
+  internal func write(_ element: (Response, Compression)) {
+    self._send(element.0, compression: element.1)
+  }
+
+  @inlinable
+  internal func writeEnd(_ end: GRPCStatus) {
+    self._finish(end)
+  }
+}
+
+#endif

+ 111 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift

@@ -0,0 +1,111 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+
+import Logging
+import NIOConcurrencyHelpers
+import NIOHPACK
+
+// We use a `class` here because we do not want copy-on-write semantics. The instance that the async
+// handler holds must not diverge from the instance the implementor of the RPC holds. They hold these
+// instances on different threads (EventLoop vs Task).
+//
+// We considered wrapping this in a `struct` and pass it `inout` to the RPC. This would communicate
+// explicitly that it stores mutable state. However, without copy-on-write semantics, this could
+// make for a surprising API.
+//
+// We also considered an `actor` but that felt clunky at the point of use since adopters would need
+// to `await` the retrieval of a logger or the updating of the trailers and each would requrie a
+// promise to glue the NIO and async-await paradigms in the handler.
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public final class GRPCAsyncServerCallContext {
+  private let lock = Lock()
+
+  /// Request headers for this request.
+  public let headers: HPACKHeaders
+
+  /// The logger used for this call.
+  public var logger: Logger {
+    get { self.lock.withLock {
+      self._logger
+    } }
+    set { self.lock.withLock {
+      self._logger = newValue
+    } }
+  }
+
+  @usableFromInline
+  internal var _logger: Logger
+
+  /// Whether compression should be enabled for responses, defaulting to `true`. Note that for
+  /// this value to take effect compression must have been enabled on the server and a compression
+  /// algorithm must have been negotiated with the client.
+  public var compressionEnabled: Bool {
+    get { self.lock.withLock {
+      self._compressionEnabled
+    } }
+    set { self.lock.withLock {
+      self._compressionEnabled = newValue
+    } }
+  }
+
+  private var _compressionEnabled: Bool = true
+
+  /// A `UserInfo` dictionary which is shared with the interceptor contexts for this RPC.
+  ///
+  /// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a
+  ///   reference wrapped `UserInfo`. The contexts passed to interceptors provide the same
+  ///   reference. As such this may be used as a mechanism to pass information between interceptors
+  ///   and service providers.
+  public var userInfo: UserInfo {
+    get { self.lock.withLock {
+      self.userInfoRef.value
+    } }
+    set { self.lock.withLock {
+      self.userInfoRef.value = newValue
+    } }
+  }
+
+  /// A reference to an underlying `UserInfo`. We share this with the interceptors.
+  @usableFromInline
+  internal let userInfoRef: Ref<UserInfo>
+
+  /// Metadata to return at the end of the RPC. If this is required it should be updated before
+  /// the `responsePromise` or `statusPromise` is fulfilled.
+  public var trailers: HPACKHeaders {
+    get { self.lock.withLock {
+      return self._trailers
+    } }
+    set { self.lock.withLock {
+      self._trailers = newValue
+    } }
+  }
+
+  private var _trailers: HPACKHeaders = [:]
+
+  @inlinable
+  internal init(
+    headers: HPACKHeaders,
+    logger: Logger,
+    userInfoRef: Ref<UserInfo>
+  ) {
+    self.headers = headers
+    self.userInfoRef = userInfoRef
+    self._logger = logger
+  }
+}
+
+#endif

+ 645 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

@@ -0,0 +1,645 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+
+import _NIOConcurrency
+import NIOCore
+import NIOHPACK
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+public struct GRPCAsyncServerHandler<
+  Serializer: MessageSerializer,
+  Deserializer: MessageDeserializer
+>: GRPCServerHandlerProtocol {
+  @usableFromInline
+  internal let _handler: AsyncServerHandler<Serializer, Deserializer>
+
+  public func receiveMetadata(_ metadata: HPACKHeaders) {
+    self._handler.receiveMetadata(metadata)
+  }
+
+  public func receiveMessage(_ bytes: ByteBuffer) {
+    self._handler.receiveMessage(bytes)
+  }
+
+  public func receiveEnd() {
+    self._handler.receiveEnd()
+  }
+
+  public func receiveError(_ error: Error) {
+    self._handler.receiveError(error)
+  }
+
+  public func finish() {
+    self._handler.finish()
+  }
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+extension GRPCAsyncServerHandler {
+  public typealias Request = Deserializer.Output
+  public typealias Response = Serializer.Input
+
+  @inlinable
+  public init(
+    context: CallHandlerContext,
+    requestDeserializer: Deserializer,
+    responseSerializer: Serializer,
+    interceptors: [ServerInterceptor<Request, Response>],
+    wrapping unary: @escaping @Sendable(Request, GRPCAsyncServerCallContext) async throws
+      -> Response
+  ) {
+    self._handler = .init(
+      context: context,
+      requestDeserializer: requestDeserializer,
+      responseSerializer: responseSerializer,
+      interceptors: interceptors,
+      userHandler: { requestStream, responseStreamWriter, context in
+        var iterator = requestStream.makeAsyncIterator()
+        guard let request = try await iterator.next(), try await iterator.next() == nil else {
+          throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
+        }
+        let response = try await unary(request, context)
+        try await responseStreamWriter.send(response)
+      }
+    )
+  }
+
+  @inlinable
+  public init(
+    context: CallHandlerContext,
+    requestDeserializer: Deserializer,
+    responseSerializer: Serializer,
+    interceptors: [ServerInterceptor<Request, Response>],
+    wrapping clientStreaming: @escaping @Sendable(
+      GRPCAsyncRequestStream<Request>,
+      GRPCAsyncServerCallContext
+    ) async throws -> Response
+  ) {
+    self._handler = .init(
+      context: context,
+      requestDeserializer: requestDeserializer,
+      responseSerializer: responseSerializer,
+      interceptors: interceptors,
+      userHandler: { requestStream, responseStreamWriter, context in
+        let response = try await clientStreaming(requestStream, context)
+        try await responseStreamWriter.send(response)
+      }
+    )
+  }
+
+  @inlinable
+  public init(
+    context: CallHandlerContext,
+    requestDeserializer: Deserializer,
+    responseSerializer: Serializer,
+    interceptors: [ServerInterceptor<Request, Response>],
+    wrapping serverStreaming: @escaping @Sendable(
+      Request,
+      GRPCAsyncResponseStreamWriter<Response>,
+      GRPCAsyncServerCallContext
+    ) async throws -> Void
+  ) {
+    self._handler = .init(
+      context: context,
+      requestDeserializer: requestDeserializer,
+      responseSerializer: responseSerializer,
+      interceptors: interceptors,
+      userHandler: { requestStream, responseStreamWriter, context in
+        var iterator = requestStream.makeAsyncIterator()
+        guard let request = try await iterator.next(), try await iterator.next() == nil else {
+          throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
+        }
+        try await serverStreaming(request, responseStreamWriter, context)
+      }
+    )
+  }
+
+  @inlinable
+  public init(
+    context: CallHandlerContext,
+    requestDeserializer: Deserializer,
+    responseSerializer: Serializer,
+    interceptors: [ServerInterceptor<Request, Response>],
+    wrapping bidirectional: @escaping @Sendable(
+      GRPCAsyncRequestStream<Request>,
+      GRPCAsyncResponseStreamWriter<Response>,
+      GRPCAsyncServerCallContext
+    ) async throws -> Void
+  ) {
+    self._handler = .init(
+      context: context,
+      requestDeserializer: requestDeserializer,
+      responseSerializer: responseSerializer,
+      interceptors: interceptors,
+      userHandler: bidirectional
+    )
+  }
+}
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+@usableFromInline
+internal final class AsyncServerHandler<
+  Serializer: MessageSerializer,
+  Deserializer: MessageDeserializer
+>: GRPCServerHandlerProtocol {
+  @usableFromInline
+  internal typealias Request = Deserializer.Output
+  @usableFromInline
+  internal typealias Response = Serializer.Input
+
+  /// A response serializer.
+  @usableFromInline
+  internal let serializer: Serializer
+
+  /// A request deserializer.
+  @usableFromInline
+  internal let deserializer: Deserializer
+
+  /// A pipeline of user provided interceptors.
+  @usableFromInline
+  internal var interceptors: ServerInterceptorPipeline<Request, Response>!
+
+  /// The context required in order create the function.
+  @usableFromInline
+  internal let context: CallHandlerContext
+
+  /// A reference to a `UserInfo`.
+  @usableFromInline
+  internal let userInfoRef: Ref<UserInfo>
+
+  /// The user provided function to execute.
+  @usableFromInline
+  internal let userHandler: (
+    GRPCAsyncRequestStream<Request>,
+    GRPCAsyncResponseStreamWriter<Response>,
+    GRPCAsyncServerCallContext
+  ) async throws -> Void
+
+  /// The state of the handler.
+  @usableFromInline
+  internal var state: State = .idle
+
+  /// The task used to run the async user handler.
+  ///
+  /// - TODO: I'd like it if this was part of the assoc data for the .active state but doing so may introduce a race condition.
+  @usableFromInline
+  internal var userHandlerTask: Task<Void, Never>? = nil
+
+  @usableFromInline
+  internal enum State {
+    /// No headers have been received.
+    case idle
+
+    /// Headers have been received, and an async `Task` has been created to execute the user
+    /// handler.
+    ///
+    /// The inputs to the user handler are held in the associated data of this enum value:
+    ///
+    /// - The `PassthroughMessageSource` is the source backing the request stream that is being
+    /// consumed by the user handler.
+    ///
+    /// - The `GRPCAsyncServerContext` is a reference to the context that was passed to the user
+    /// handler.
+    ///
+    /// - The `GRPCAsyncResponseStreamWriter` is the response stream writer that is being written to
+    /// by the user handler. Because this is pausable, it may contain responses after the user
+    /// handler has completed that have yet to be written. However we will remain in the `.active`
+    /// state until the response stream writer has completed.
+    ///
+    /// - The `EventLoopPromise` bridges the NIO and async-await worlds. It is the mechanism that we
+    /// use to run a callback when the user handler has completed. The promise is not passed to the
+    /// user handler directly. Instead it is fulfilled with the result of the async `Task` executing
+    /// the user handler using `completeWithTask(_:)`.
+    ///
+    /// - TODO: It shouldn't really be necessary to stash the `GRPCAsyncResponseStreamWriter` or the
+    /// `EventLoopPromise` in this enum value. Specifically they are never used anywhere when this
+    /// enum value is accessed. However, if we do not store them here then the tests periodically
+    /// segfault. This appears to be an bug in Swift and/or NIO since these should both have been
+    /// captured by `completeWithTask(_:)`.
+    case active(
+      PassthroughMessageSource<Request, Error>,
+      GRPCAsyncServerCallContext,
+      GRPCAsyncResponseStreamWriter<Response>,
+      EventLoopPromise<Void>
+    )
+
+    /// The handler has completed.
+    case completed
+  }
+
+  @inlinable
+  public init(
+    context: CallHandlerContext,
+    requestDeserializer: Deserializer,
+    responseSerializer: Serializer,
+    interceptors: [ServerInterceptor<Request, Response>],
+    userHandler: @escaping @Sendable(
+      GRPCAsyncRequestStream<Request>,
+      GRPCAsyncResponseStreamWriter<Response>,
+      GRPCAsyncServerCallContext
+    ) async throws -> Void
+  ) {
+    self.serializer = responseSerializer
+    self.deserializer = requestDeserializer
+    self.context = context
+    self.userHandler = userHandler
+
+    let userInfoRef = Ref(UserInfo())
+    self.userInfoRef = userInfoRef
+
+    self.interceptors = ServerInterceptorPipeline(
+      logger: context.logger,
+      eventLoop: context.eventLoop,
+      path: context.path,
+      callType: .bidirectionalStreaming,
+      remoteAddress: context.remoteAddress,
+      userInfoRef: userInfoRef,
+      interceptors: interceptors,
+      onRequestPart: self.receiveInterceptedPart(_:),
+      onResponsePart: self.sendInterceptedPart(_:promise:)
+    )
+  }
+
+  // MARK: - GRPCServerHandlerProtocol conformance
+
+  @inlinable
+  internal func receiveMetadata(_ headers: HPACKHeaders) {
+    self.interceptors.receive(.metadata(headers))
+  }
+
+  @inlinable
+  internal func receiveMessage(_ bytes: ByteBuffer) {
+    do {
+      let message = try self.deserializer.deserialize(byteBuffer: bytes)
+      self.interceptors.receive(.message(message))
+    } catch {
+      self.handleError(error)
+    }
+  }
+
+  @inlinable
+  internal func receiveEnd() {
+    self.interceptors.receive(.end)
+  }
+
+  @inlinable
+  internal func receiveError(_ error: Error) {
+    self.handleError(error)
+    self.finish()
+  }
+
+  @inlinable
+  internal func finish() {
+    switch self.state {
+    case .idle:
+      self.interceptors = nil
+      self.state = .completed
+
+    case .active:
+      self.userHandlerTask?.cancel()
+
+    case .completed:
+      self.interceptors = nil
+    }
+  }
+
+  // MARK: - Interceptors to User Function
+
+  @inlinable
+  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
+    switch part {
+    case let .metadata(headers):
+      self.receiveInterceptedMetadata(headers)
+    case let .message(message):
+      self.receiveInterceptedMessage(message)
+    case .end:
+      self.receiveInterceptedEnd()
+    }
+  }
+
+  @inlinable
+  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
+    switch self.state {
+    case .idle:
+      // Make a context to invoke the user handler with.
+      let context = GRPCAsyncServerCallContext(
+        headers: headers,
+        logger: self.context.logger,
+        userInfoRef: self.userInfoRef
+      )
+
+      // Create a source for our request stream.
+      let requestStreamSource = PassthroughMessageSource<Request, Error>()
+
+      // Create a promise to hang a callback off when the user handler completes.
+      let userHandlerPromise: EventLoopPromise<Void> = self.context.eventLoop.makePromise()
+
+      // Create a request stream from our stream source to pass to the user handler.
+      let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
+
+      // TODO: In future use `AsyncWriter.init(maxPendingElements:maxWritesBeforeYield:delegate:)`?
+      let responseStreamWriter =
+        GRPCAsyncResponseStreamWriter(
+          wrapping: AsyncWriter(delegate: AsyncResponseStreamWriterDelegate(
+            context: context,
+            compressionIsEnabled: self.context.encoding.isEnabled,
+            send: self.interceptResponse(_:metadata:),
+            finish: self.responseStreamDrained(_:)
+          ))
+        )
+
+      // Set the state to active and bundle in all the associated data.
+      self.state = .active(requestStreamSource, context, responseStreamWriter, userHandlerPromise)
+
+      // Register callback for the completion of the user handler.
+      userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:))
+
+      // Send response headers back via the interceptors.
+      // TODO: In future we may want to defer this until the first response is available from the user handler which will allow the user to set the response headers via the context.
+      self.interceptors.send(.metadata([:]), promise: nil)
+
+      // Spin up a task to call the async user handler.
+      self.userHandlerTask = userHandlerPromise.completeWithTask {
+        return try await withTaskCancellationHandler {
+          do {
+            // When the user handler completes we invalidate the request stream source.
+            defer { requestStreamSource.finish() }
+            // Call the user handler.
+            try await self.userHandler(requestStream, responseStreamWriter, context)
+          } catch let status as GRPCStatus where status.isOk {
+            // The user handler throwing `GRPCStatus.ok` is considered to be invalid.
+            await responseStreamWriter.asyncWriter.cancel()
+            throw GRPCStatus(
+              code: .unknown,
+              message: "Handler threw GRPCStatus error with code .ok"
+            )
+          } catch {
+            await responseStreamWriter.asyncWriter.cancel()
+            throw error
+          }
+          // Wait for the response stream writer to finish writing its responses.
+          try await responseStreamWriter.asyncWriter.finish(.ok)
+        } onCancel: {
+          /// The task being cancelled from outside is the signal to this task that an error has
+          /// occured and we should abort the user handler.
+          ///
+          /// Adopters are encouraged to cooperatively check for cancellation in their handlers but
+          /// we cannot rely on this.
+          ///
+          /// We additionally signal the handler that an error has occured by terminating the source
+          /// backing the request stream that the user handler is consuming.
+          ///
+          /// - NOTE: This handler has different semantics from the extant non-async-await handlers
+          /// where the `statusPromise` was explicitly failed with `GRPCStatus.unavailable` from
+          /// _outside_ the user handler. Here we terminate the request stream with a
+          /// `CancellationError` which manifests _inside_ the user handler when it tries to access
+          /// the next request in the stream. We have no control over the implementation of the user
+          /// handler. It may choose to handle this error or not. In the event that the handler
+          /// either rethrows or does not handle the error, this will be converted to a
+          /// `GRPCStatus.unknown` by `handleError(_:)`. Yielding a `CancellationError` _inside_
+          /// the user handler feels like the clearest semantics of what we want--"the RPC has an
+          /// error, cancel whatever you're doing." If we want to preserve the API of the
+          /// non-async-await handlers in this error flow we could add conformance to
+          /// `GRPCStatusTransformable` to `CancellationError`, but we still cannot control _how_
+          /// the user handler will handle the `CancellationError` which could even be swallowed.
+          ///
+          /// - NOTE: Currently we _have_ added `GRPCStatusTransformable` conformance to
+          /// `CancellationError` to convert it into `GRPCStatus.unavailable` and expect to
+          /// document that user handlers should always rethrow `CacellationError` if handled, after
+          /// optional cleanup.
+          requestStreamSource.finish(throwing: CancellationError())
+          /// Cancel the writer here to drop any pending responses.
+          responseStreamWriter.asyncWriter.cancelAsynchronously()
+        }
+      }
+
+    case .active:
+      self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
+
+    case .completed:
+      // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
+      // to an error or otherwise) and an interceptor doing some async work later emitting headers.
+      // Dropping them is fine.
+      ()
+    }
+  }
+
+  @inlinable
+  internal func receiveInterceptedMessage(_ request: Request) {
+    switch self.state {
+    case .idle:
+      self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
+    case let .active(requestStreamSource, _, _, _):
+      switch requestStreamSource.yield(request) {
+      case .accepted(queueDepth: _):
+        // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`.
+        break
+      case .dropped:
+        /// If we are in the `.active` state then we have yet to encounter an error. Therefore
+        /// if the request stream source has already terminated then it must have been the result of
+        /// receiving `.end`. Therefore this `.message` must have been sent by the client after it
+        /// sent `.end`, which is a protocol violation.
+        self.handleError(GRPCError.ProtocolViolation("Message received after end of stream"))
+      }
+    case .completed:
+      // We received a message but we're already done: this may happen if we terminate the RPC
+      // due to a channel error, for example.
+      ()
+    }
+  }
+
+  @inlinable
+  internal func receiveInterceptedEnd() {
+    switch self.state {
+    case .idle:
+      self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
+    case let .active(requestStreamSource, _, _, _):
+      switch requestStreamSource.finish() {
+      case .accepted(queueDepth: _):
+        break
+      case .dropped:
+        /// If we are in the `.active` state then we have yet to encounter an error. Therefore
+        /// if the request stream source has already terminated then it must have been the result of
+        /// receiving `.end`. Therefore this `.end` must have been sent by the client after it
+        /// already sent `.end`, which is a protocol violation.
+        self.handleError(GRPCError.ProtocolViolation("Message duplicate end of stream"))
+      }
+    case .completed:
+      // We received a message but we're already done: this may happen if we terminate the RPC
+      // due to a channel error, for example.
+      ()
+    }
+  }
+
+  // MARK: - User Function To Interceptors
+
+  @inlinable
+  internal func _interceptResponse(_ response: Response, metadata: MessageMetadata) {
+    self.context.eventLoop.assertInEventLoop()
+    switch self.state {
+    case .idle:
+      // The user handler cannot send responses before it has been invoked.
+      preconditionFailure()
+
+    case .active:
+      self.interceptors.send(.message(response, metadata), promise: nil)
+
+    case .completed:
+      /// If we are in the completed state then the async writer delegate must have terminated.
+      preconditionFailure()
+    }
+  }
+
+  @inlinable
+  internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
+    if self.context.eventLoop.inEventLoop {
+      self._interceptResponse(response, metadata: metadata)
+    } else {
+      self.context.eventLoop.execute {
+        self._interceptResponse(response, metadata: metadata)
+      }
+    }
+  }
+
+  @inlinable
+  internal func userHandlerCompleted(_ result: Result<Void, Error>) {
+    switch self.state {
+    case .idle:
+      // The user handler cannot complete before it is invoked.
+      preconditionFailure()
+
+    case .active:
+      switch result {
+      case .success:
+        /// The user handler has completed successfully.
+        /// We don't take any action here; the state transition and termination of the message
+        /// stream happen when the response stream has drained, in the response stream writer
+        /// delegate callback, `responseStreamDrained(_:)`.
+        break
+
+      case let .failure(error):
+        self.handleError(error, thrownFromHandler: true)
+      }
+
+    case .completed:
+      ()
+    }
+  }
+
+  @inlinable
+  internal func _responseStreamDrained(_ status: GRPCStatus) {
+    self.context.eventLoop.assertInEventLoop()
+    switch self.state {
+    case .idle:
+      preconditionFailure()
+
+    case let .active(_, context, _, _):
+      // Now we have drained the response stream writer from the user handler we can send end.
+      self.state = .completed
+      self.interceptors.send(.end(status, context.trailers), promise: nil)
+
+    case .completed:
+      ()
+    }
+  }
+
+  @inlinable
+  internal func responseStreamDrained(_ status: GRPCStatus) {
+    if self.context.eventLoop.inEventLoop {
+      self._responseStreamDrained(status)
+    } else {
+      self.context.eventLoop.execute {
+        self._responseStreamDrained(status)
+      }
+    }
+  }
+
+  @inlinable
+  internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
+    switch self.state {
+    case .idle:
+      assert(!isHandlerError)
+      self.state = .completed
+      let (status, trailers) = ServerErrorProcessor.processLibraryError(
+        error,
+        delegate: self.context.errorDelegate
+      )
+      self.interceptors.send(.end(status, trailers), promise: nil)
+
+    case let .active(_, context, _, _):
+      self.state = .completed
+
+      // If we have an async task, then cancel it, which will terminate the request stream from
+      // which it is reading and give the user handler an opportunity to cleanup.
+      self.userHandlerTask?.cancel()
+
+      let status: GRPCStatus
+      let trailers: HPACKHeaders
+
+      if isHandlerError {
+        (status, trailers) = ServerErrorProcessor.processObserverError(
+          error,
+          headers: context.headers,
+          trailers: context.trailers,
+          delegate: self.context.errorDelegate
+        )
+      } else {
+        (status, trailers) = ServerErrorProcessor.processLibraryError(
+          error,
+          delegate: self.context.errorDelegate
+        )
+      }
+
+      // TODO: This doesn't go via the user handler task.
+      self.interceptors.send(.end(status, trailers), promise: nil)
+
+    case .completed:
+      ()
+    }
+  }
+
+  @inlinable
+  internal func sendInterceptedPart(
+    _ part: GRPCServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?
+  ) {
+    switch part {
+    case let .metadata(headers):
+      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
+
+    case let .message(message, metadata):
+      do {
+        let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
+        self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
+      } catch {
+        // Serialization failed: fail the promise and send end.
+        promise?.fail(error)
+        let (status, trailers) = ServerErrorProcessor.processLibraryError(
+          error,
+          delegate: self.context.errorDelegate
+        )
+        // Loop back via the interceptors.
+        self.interceptors.send(.end(status, trailers), promise: nil)
+      }
+
+    case let .end(status, trailers):
+      self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
+    }
+  }
+}
+
+#endif

+ 1 - 0
Sources/GRPC/AsyncAwaitSupport/PassthroughMessageSource.swift

@@ -83,6 +83,7 @@ internal final class PassthroughMessageSource<Element, Failure: Error> {
   }
 
   @inlinable
+  @discardableResult
   internal func finish(throwing error: Failure? = nil) -> YieldResult {
     let continuationResult: _ContinuationResult = error.map { .failure($0) } ?? .success(nil)
     return self._yield(continuationResult, isTerminator: true)

+ 1 - 1
Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift

@@ -339,7 +339,7 @@ public final class BidirectionalStreamingServerHandler<
 
       self.interceptors.send(.end(status, trailers), promise: nil)
       // We're already in the 'completed' state so failing the promise will be a no-op in the
-      // callback to 'userFunctionStatusResolved' (but we also need to avoid leaking the promise.)
+      // callback to 'userHandlerCompleted' (but we also need to avoid leaking the promise.)
       context.statusPromise.fail(error)
 
     case .completed:

+ 341 - 0
Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift

@@ -0,0 +1,341 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.5)
+
+@testable import GRPC
+import NIOCore
+import XCTest
+
+// MARK: - Tests
+
+@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
+class AsyncServerHandlerTests: ServerHandlerTestCaseBase {
+  private func makeHandler(
+    encoding: ServerMessageEncoding = .disabled,
+    observer: @escaping @Sendable(
+      GRPCAsyncRequestStream<String>,
+      GRPCAsyncResponseStreamWriter<String>,
+      GRPCAsyncServerCallContext
+    ) async throws -> Void
+  ) -> AsyncServerHandler<StringSerializer, StringDeserializer> {
+    return AsyncServerHandler(
+      context: self.makeCallHandlerContext(encoding: encoding),
+      requestDeserializer: StringDeserializer(),
+      responseSerializer: StringSerializer(),
+      interceptors: [],
+      userHandler: observer
+    )
+  }
+
+  @Sendable private func echo(
+    requests: GRPCAsyncRequestStream<String>,
+    responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    for try await message in requests {
+      try await responseStreamWriter.send(message)
+    }
+  }
+
+  @Sendable private func neverReceivesMessage(
+    requests: GRPCAsyncRequestStream<String>,
+    responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    for try await message in requests {
+      XCTFail("Unexpected message: '\(message)'")
+    }
+  }
+
+  @Sendable private func neverCalled(
+    requests: GRPCAsyncRequestStream<String>,
+    responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
+    context: GRPCAsyncServerCallContext
+  ) async throws {
+    XCTFail("This observer should never be called")
+  }
+
+  func testHappyPath() { XCTAsyncTest {
+    let handler = self.makeHandler(
+      observer: self.echo(requests:responseStreamWriter:context:)
+    )
+
+    handler.receiveMetadata([:])
+    await assertThat(self.recorder.metadata, .is([:]))
+
+    handler.receiveMessage(ByteBuffer(string: "1"))
+    handler.receiveMessage(ByteBuffer(string: "2"))
+    handler.receiveMessage(ByteBuffer(string: "3"))
+    handler.receiveEnd()
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    handler.finish()
+
+    await assertThat(
+      self.recorder.messages,
+      .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
+    )
+    await assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
+    await assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
+    await assertThat(self.recorder.trailers, .is([:]))
+  } }
+
+  func testHappyPathWithCompressionEnabled() { XCTAsyncTest {
+    let handler = self.makeHandler(
+      encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
+      observer: self.echo(requests:responseStreamWriter:context:)
+    )
+
+    handler.receiveMetadata([:])
+    handler.receiveMessage(ByteBuffer(string: "1"))
+    handler.receiveMessage(ByteBuffer(string: "2"))
+    handler.receiveMessage(ByteBuffer(string: "3"))
+    handler.receiveEnd()
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(
+      self.recorder.messages,
+      .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
+    )
+    await assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([true, true, true]))
+  } }
+
+  func testHappyPathWithCompressionEnabledButDisabledByCaller() { XCTAsyncTest {
+    let handler = self.makeHandler(
+      encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
+    ) { requests, responseStreamWriter, context in
+      context.compressionEnabled = false
+      return try await self.echo(
+        requests: requests,
+        responseStreamWriter: responseStreamWriter,
+        context: context
+      )
+    }
+
+    handler.receiveMetadata([:])
+    handler.receiveMessage(ByteBuffer(string: "1"))
+    handler.receiveMessage(ByteBuffer(string: "2"))
+    handler.receiveMessage(ByteBuffer(string: "3"))
+    handler.receiveEnd()
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(
+      self.recorder.messages,
+      .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
+    )
+    await assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
+  } }
+
+  func testTaskOnlyCreatedAfterHeaders() { XCTAsyncTest {
+    let handler = self.makeHandler(observer: self.echo(requests:responseStreamWriter:context:))
+
+    await assertThat(handler.userHandlerTask, .is(.nil()))
+
+    handler.receiveMetadata([:])
+
+    await assertThat(handler.userHandlerTask, .is(.notNil()))
+  } }
+
+  func testThrowingDeserializer() { XCTAsyncTest {
+    let handler = AsyncServerHandler(
+      context: self.makeCallHandlerContext(),
+      requestDeserializer: ThrowingStringDeserializer(),
+      responseSerializer: StringSerializer(),
+      interceptors: [],
+      userHandler: self.neverReceivesMessage(requests:responseStreamWriter:context:)
+    )
+
+    handler.receiveMetadata([:])
+
+    // Wait for the async user function to have processed the metadata.
+    try self.recorder.recordedMetadataPromise.futureResult.wait()
+
+    await assertThat(self.recorder.metadata, .is([:]))
+
+    let buffer = ByteBuffer(string: "hello")
+    handler.receiveMessage(buffer)
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(self.recorder.messages, .isEmpty())
+    await assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
+  } }
+
+  func testThrowingSerializer() { XCTAsyncTest {
+    let handler = AsyncServerHandler(
+      context: self.makeCallHandlerContext(),
+      requestDeserializer: StringDeserializer(),
+      responseSerializer: ThrowingStringSerializer(),
+      interceptors: [],
+      userHandler: self.echo(requests:responseStreamWriter:context:)
+    )
+
+    handler.receiveMetadata([:])
+    await assertThat(self.recorder.metadata, .is([:]))
+
+    let buffer = ByteBuffer(string: "hello")
+    handler.receiveMessage(buffer)
+    handler.receiveEnd()
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(self.recorder.messages, .isEmpty())
+    await assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
+  } }
+
+  func testReceiveMessageBeforeHeaders() { XCTAsyncTest {
+    let handler = self
+      .makeHandler(observer: self.neverCalled(requests:responseStreamWriter:context:))
+
+    handler.receiveMessage(ByteBuffer(string: "foo"))
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(self.recorder.metadata, .is(.nil()))
+    await assertThat(self.recorder.messages, .isEmpty())
+    await assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
+  } }
+
+  // TODO: Running this 1000 times shows up a segfault in NIO event loop group.
+  func testReceiveMultipleHeaders() { XCTAsyncTest {
+    let handler = self
+      .makeHandler(observer: self.neverReceivesMessage(requests:responseStreamWriter:context:))
+
+    handler.receiveMetadata([:])
+
+    // Wait for the async user function to have processed the metadata.
+    try self.recorder.recordedMetadataPromise.futureResult.wait()
+
+    await assertThat(self.recorder.metadata, .is([:]))
+
+    handler.receiveMetadata([:])
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(self.recorder.messages, .isEmpty())
+    await assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
+  } }
+
+  func testFinishBeforeStarting() { XCTAsyncTest {
+    let handler = self
+      .makeHandler(observer: self.neverCalled(requests:responseStreamWriter:context:))
+
+    handler.finish()
+    await assertThat(self.recorder.metadata, .is(.nil()))
+    await assertThat(self.recorder.messages, .isEmpty())
+    await assertThat(self.recorder.status, .is(.nil()))
+    await assertThat(self.recorder.trailers, .is(.nil()))
+  } }
+
+  func testFinishAfterHeaders() { XCTAsyncTest {
+    let handler = self.makeHandler(observer: self.echo(requests:responseStreamWriter:context:))
+    handler.receiveMetadata([:])
+
+    // Wait for the async user function to have processed the metadata.
+    try self.recorder.recordedMetadataPromise.futureResult.wait()
+
+    await assertThat(self.recorder.metadata, .is([:]))
+
+    handler.finish()
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(self.recorder.messages, .isEmpty())
+    await assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
+    await assertThat(self.recorder.trailers, .is([:]))
+  } }
+
+  func testFinishAfterMessage() { XCTAsyncTest {
+    let handler = self.makeHandler(observer: self.echo(requests:responseStreamWriter:context:))
+
+    handler.receiveMetadata([:])
+    handler.receiveMessage(ByteBuffer(string: "hello"))
+
+    // Wait for the async user function to have processed the message.
+    try self.recorder.recordedMessagePromise.futureResult.wait()
+
+    handler.finish()
+
+    // Wait for tasks to finish.
+    await handler.userHandlerTask?.value
+
+    await assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "hello")))
+    await assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
+    await assertThat(self.recorder.trailers, .is([:]))
+  } }
+
+  func testHandlerThrowsGRPCStatusOKResultsInUnknownStatus() { XCTAsyncTest {
+    // Create a user function that immediately throws GRPCStatus.ok.
+    let handler = self.makeHandler { _, _, _ in
+      throw GRPCStatus.ok
+    }
+
+    // Send some metadata to trigger the creation of the async task with the user function.
+    handler.receiveMetadata([:])
+
+    // Wait for user handler to finish (it's gonna throw immediately).
+    await assertThat(await handler.userHandlerTask?.value, .notNil())
+
+    // Check the status is `.unknown`.
+    await assertThat(self.recorder.status, .notNil(.hasCode(.unknown)))
+  } }
+
+  // TODO: We should be consistent about where we put the tasks... maybe even use a task group to simplify cancellation (unless they both go in the enum state which might be better).
+
+  func testResponseStreamDrain() { XCTAsyncTest {
+    // Set up echo handler.
+    let handler = self.makeHandler(
+      observer: self.echo(requests:responseStreamWriter:context:)
+    )
+
+    // Send some metadata to trigger the creation of the async task with the user function.
+    handler.receiveMetadata([:])
+
+    // Send two requests and end, pausing the writer in the middle.
+    switch handler.state {
+    case let .active(_, _, responseStreamWriter, promise):
+      handler.receiveMessage(ByteBuffer(string: "diaz"))
+      await responseStreamWriter.asyncWriter.toggleWritability()
+      handler.receiveMessage(ByteBuffer(string: "santiago"))
+      handler.receiveEnd()
+      await responseStreamWriter.asyncWriter.toggleWritability()
+      await handler.userHandlerTask?.value
+      _ = try await promise.futureResult.get()
+    default:
+      XCTFail("Unexpected handler state: \(handler.state)")
+    }
+
+    handler.finish()
+
+    await assertThat(self.recorder.messages, .is([
+      ByteBuffer(string: "diaz"),
+      ByteBuffer(string: "santiago"),
+    ]))
+    await assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
+  } }
+}
+#endif

+ 6 - 1
Tests/GRPCTests/ServerInterceptorTests.swift

@@ -37,7 +37,12 @@ extension GRPCServerHandlerProtocol {
 
 class ServerInterceptorTests: GRPCTestCase {
   private let eventLoop = EmbeddedEventLoop()
-  private let recorder = ResponseRecorder()
+  private var recorder: ResponseRecorder!
+
+  override func setUp() {
+    super.setUp()
+    self.recorder = ResponseRecorder(eventLoop: self.eventLoop)
+  }
 
   private func makeRecordingInterceptor()
     -> RecordingServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {

+ 33 - 26
Tests/GRPCTests/UnaryServerHandlerTests.swift

@@ -32,6 +32,7 @@ final class ResponseRecorder: GRPCServerResponseWriter {
     XCTAssertNil(self.metadata)
     self.metadata = metadata
     promise?.succeed(())
+    self.recordedMetadataPromise.succeed(())
   }
 
   func sendMessage(
@@ -42,6 +43,7 @@ final class ResponseRecorder: GRPCServerResponseWriter {
     self.messages.append(bytes)
     self.messageMetadata.append(metadata)
     promise?.succeed(())
+    self.recordedMessagePromise.succeed(())
   }
 
   func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
@@ -50,16 +52,37 @@ final class ResponseRecorder: GRPCServerResponseWriter {
     self.status = status
     self.trailers = trailers
     promise?.succeed(())
+    self.recordedEndPromise.succeed(())
+  }
+
+  var recordedMetadataPromise: EventLoopPromise<Void>
+  var recordedMessagePromise: EventLoopPromise<Void>
+  var recordedEndPromise: EventLoopPromise<Void>
+
+  init(eventLoop: EventLoop) {
+    self.recordedMetadataPromise = eventLoop.makePromise()
+    self.recordedMessagePromise = eventLoop.makePromise()
+    self.recordedEndPromise = eventLoop.makePromise()
   }
-}
 
-protocol ServerHandlerTestCase: GRPCTestCase {
-  var eventLoop: EmbeddedEventLoop { get }
-  var allocator: ByteBufferAllocator { get }
-  var recorder: ResponseRecorder { get }
+  deinit {
+    struct RecordedDidNotIntercept: Error {}
+    self.recordedMetadataPromise.fail(RecordedDidNotIntercept())
+    self.recordedMessagePromise.fail(RecordedDidNotIntercept())
+    self.recordedEndPromise.fail(RecordedDidNotIntercept())
+  }
 }
 
-extension ServerHandlerTestCase {
+class ServerHandlerTestCaseBase: GRPCTestCase {
+  let eventLoop = EmbeddedEventLoop()
+  let allocator = ByteBufferAllocator()
+  var recorder: ResponseRecorder!
+
+  override func setUp() {
+    super.setUp()
+    self.recorder = ResponseRecorder(eventLoop: self.eventLoop)
+  }
+
   func makeCallHandlerContext(encoding: ServerMessageEncoding = .disabled) -> CallHandlerContext {
     return CallHandlerContext(
       errorDelegate: nil,
@@ -77,11 +100,7 @@ extension ServerHandlerTestCase {
 
 // MARK: - Unary
 
-class UnaryServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
-  let eventLoop = EmbeddedEventLoop()
-  let allocator = ByteBufferAllocator()
-  let recorder = ResponseRecorder()
-
+class UnaryServerHandlerTests: ServerHandlerTestCaseBase {
   private func makeHandler(
     encoding: ServerMessageEncoding = .disabled,
     function: @escaping (String, StatusOnlyCallContext) -> EventLoopFuture<String>
@@ -292,11 +311,7 @@ class UnaryServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
 
 // MARK: - Client Streaming
 
-class ClientStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
-  let eventLoop = EmbeddedEventLoop()
-  let allocator = ByteBufferAllocator()
-  let recorder = ResponseRecorder()
-
+class ClientStreamingServerHandlerTests: ServerHandlerTestCaseBase {
   private func makeHandler(
     encoding: ServerMessageEncoding = .disabled,
     observerFactory: @escaping (UnaryResponseCallContext<String>)
@@ -549,11 +564,7 @@ class ClientStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
   }
 }
 
-class ServerStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
-  let eventLoop = EmbeddedEventLoop()
-  let allocator = ByteBufferAllocator()
-  let recorder = ResponseRecorder()
-
+class ServerStreamingServerHandlerTests: ServerHandlerTestCaseBase {
   private func makeHandler(
     encoding: ServerMessageEncoding = .disabled,
     userFunction: @escaping (String, StreamingResponseCallContext<String>)
@@ -771,11 +782,7 @@ class ServerStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
 
 // MARK: - Bidirectional Streaming
 
-class BidirectionalStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
-  let eventLoop = EmbeddedEventLoop()
-  let allocator = ByteBufferAllocator()
-  let recorder = ResponseRecorder()
-
+class BidirectionalStreamingServerHandlerTests: ServerHandlerTestCaseBase {
   private func makeHandler(
     encoding: ServerMessageEncoding = .disabled,
     observerFactory: @escaping (StreamingResponseCallContext<String>)