Browse Source

Remove the 'error' case from the 'GRPCClientResponsePart' (#1035)

Motivation:

When implementing a bunch of interceptors I found it a little irksome
having to deal with errors along with response parts. It feels more
natural to deal with them separately.

Modifications:

- Remove 'error' from 'GRPCClientResponsePart'
- Add an 'errorCaught' client interceptor function

Result:

Errors are handled separately to response parts.
George Barnett 5 years ago
parent
commit
e70547dc1c

+ 4 - 1
Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift

@@ -78,7 +78,10 @@ public struct BidirectionalStreamingCall<
   }
 
   internal func invoke() {
-    self.call.invokeStreamingRequests(self.responseParts.handle(_:))
+    self.call.invokeStreamingRequests(
+      onError: self.responseParts.handleError(_:),
+      onResponsePart: self.responseParts.handle(_:)
+    )
   }
 
   // MARK: - Requests

+ 29 - 16
Sources/GRPC/ClientCalls/Call.swift

@@ -109,17 +109,22 @@ public class Call<Request, Response> {
   ///
   /// This must be called prior to `send(_:promise:)` or `cancel(promise:)`.
   ///
-  /// - Parameter onResponsePart: A callback which is invoked on every response part.
+  /// - Parameters:
+  ///   - onError: A callback invoked when an error is received.
+  ///   - onResponsePart: A callback which is invoked on every response part.
   /// - Important: This function should only be called once. Subsequent calls will be ignored.
   @inlinable
-  public func invoke(_ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void) {
+  public func invoke(
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+  ) {
     self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
 
     if self.eventLoop.inEventLoop {
-      self._invoke(onResponsePart)
+      self._invoke(onError: onError, onResponsePart: onResponsePart)
     } else {
       self.eventLoop.execute {
-        self._invoke(onResponsePart)
+        self._invoke(onError: onError, onResponsePart: onResponsePart)
       }
     }
   }
@@ -255,7 +260,8 @@ extension Call {
   /// - Important: This *must* to be called from the `eventLoop`.
   @usableFromInline
   internal func _invoke(
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     self.eventLoop.assertInEventLoop()
 
@@ -266,7 +272,8 @@ extension Call {
         for: self.type,
         withOptions: self.options,
         interceptedBy: self.interceptors,
-        onResponsePart
+        onError: onError,
+        onResponsePart: onResponsePart
       )
       self._state = .invoked(transport)
 
@@ -339,17 +346,19 @@ extension Call {
   /// request stream.
   /// - Parameters:
   ///   - request: The request to send.
+  ///   - onError: A callback invoked when an error is received.
   ///   - onResponsePart: A callback invoked for each response part received.
   @inlinable
   internal func invokeUnaryRequest(
     _ request: Request,
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     if self.eventLoop.inEventLoop {
-      self._invokeUnaryRequest(request: request, onResponsePart)
+      self._invokeUnaryRequest(request: request, onError: onError, onResponsePart: onResponsePart)
     } else {
       self.eventLoop.execute {
-        self._invokeUnaryRequest(request: request, onResponsePart)
+        self._invokeUnaryRequest(request: request, onError: onError, onResponsePart: onResponsePart)
       }
     }
   }
@@ -357,16 +366,18 @@ extension Call {
   /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
   /// additional messages and end the stream by calling `send(_:promise:)`.
   /// - Parameters:
+  ///   - onError: A callback invoked when an error is received.
   ///   - onResponsePart: A callback invoked for each response part received.
   @inlinable
   internal func invokeStreamingRequests(
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     if self.eventLoop.inEventLoop {
-      self._invokeStreamingRequests(onResponsePart)
+      self._invokeStreamingRequests(onError: onError, onResponsePart: onResponsePart)
     } else {
       self.eventLoop.execute {
-        self._invokeStreamingRequests(onResponsePart)
+        self._invokeStreamingRequests(onError: onError, onResponsePart: onResponsePart)
       }
     }
   }
@@ -375,12 +386,13 @@ extension Call {
   @usableFromInline
   internal func _invokeUnaryRequest(
     request: Request,
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     self.eventLoop.assertInEventLoop()
     assert(self.type == .unary || self.type == .serverStreaming)
 
-    self._invoke(onResponsePart)
+    self._invoke(onError: onError, onResponsePart: onResponsePart)
     self._send(.metadata(self.options.customMetadata), promise: nil)
     self._send(
       .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
@@ -392,12 +404,13 @@ extension Call {
   /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
   @usableFromInline
   internal func _invokeStreamingRequests(
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     self.eventLoop.assertInEventLoop()
     assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
 
-    self._invoke(onResponsePart)
+    self._invoke(onError: onError, onResponsePart: onResponsePart)
     self._send(.metadata(self.options.customMetadata), promise: nil)
   }
 }

+ 4 - 1
Sources/GRPC/ClientCalls/ClientStreamingCall.swift

@@ -78,7 +78,10 @@ public struct ClientStreamingCall<RequestPayload, ResponsePayload>: StreamingReq
   }
 
   internal func invoke() {
-    self.call.invokeStreamingRequests(self.responseParts.handle(_:))
+    self.call.invokeStreamingRequests(
+      onError: self.responseParts.handleError(_:),
+      onResponsePart: self.responseParts.handle(_:)
+    )
   }
 
   // MARK: - Request

+ 17 - 15
Sources/GRPC/ClientCalls/ResponseContainers.swift

@@ -79,16 +79,17 @@ internal class UnaryResponseParts<Response> {
 
       self.trailingMetadataPromise.succeed(trailers)
       self.statusPromise.succeed(status)
-
-    case let .error(error):
-      let withoutContext = error.removingContext()
-      let status = withoutContext.makeGRPCStatus()
-      self.initialMetadataPromise.fail(withoutContext)
-      self.responsePromise.fail(withoutContext)
-      self.trailingMetadataPromise.fail(withoutContext)
-      self.statusPromise.succeed(status)
     }
   }
+
+  internal func handleError(_ error: Error) {
+    let withoutContext = error.removingContext()
+    let status = withoutContext.makeGRPCStatus()
+    self.initialMetadataPromise.fail(withoutContext)
+    self.responsePromise.fail(withoutContext)
+    self.trailingMetadataPromise.fail(withoutContext)
+    self.statusPromise.succeed(status)
+  }
 }
 
 /// A bucket of promises for a streaming-response RPC.
@@ -144,15 +145,16 @@ internal class StreamingResponseParts<Response> {
       self.initialMetadataPromise.fail(status)
       self.trailingMetadataPromise.succeed(trailers)
       self.statusPromise.succeed(status)
-
-    case let .error(error):
-      let withoutContext = error.removingContext()
-      let status = withoutContext.makeGRPCStatus()
-      self.initialMetadataPromise.fail(withoutContext)
-      self.trailingMetadataPromise.fail(withoutContext)
-      self.statusPromise.succeed(status)
     }
   }
+
+  internal func handleError(_ error: Error) {
+    let withoutContext = error.removingContext()
+    let status = withoutContext.makeGRPCStatus()
+    self.initialMetadataPromise.fail(withoutContext)
+    self.trailingMetadataPromise.fail(withoutContext)
+    self.statusPromise.succeed(status)
+  }
 }
 
 extension EventLoop {

+ 5 - 1
Sources/GRPC/ClientCalls/ServerStreamingCall.swift

@@ -73,6 +73,10 @@ public struct ServerStreamingCall<RequestPayload, ResponsePayload>: ClientCall {
   }
 
   internal func invoke(_ request: RequestPayload) {
-    self.call.invokeUnaryRequest(request, self.responseParts.handle(_:))
+    self.call.invokeUnaryRequest(
+      request,
+      onError: self.responseParts.handleError(_:),
+      onResponsePart: self.responseParts.handle(_:)
+    )
   }
 }

+ 5 - 1
Sources/GRPC/ClientCalls/UnaryCall.swift

@@ -77,6 +77,10 @@ public struct UnaryCall<RequestPayload, ResponsePayload>: UnaryResponseClientCal
   }
 
   internal func invoke(_ request: RequestPayload) {
-    self.call.invokeUnaryRequest(request, self.responseParts.handle(_:))
+    self.call.invokeUnaryRequest(
+      request,
+      onError: self.responseParts.handleError(_:),
+      onResponsePart: self.responseParts.handle(_:)
+    )
   }
 }

+ 14 - 0
Sources/GRPC/Interceptor/ClientInterceptorContext.swift

@@ -83,6 +83,15 @@ public struct ClientInterceptorContext<Request, Response> {
     self.nextInbound?.invokeReceive(part)
   }
 
+  /// Forwards the error to the next inbound interceptor in the pipeline, if there is one.
+  ///
+  /// - Parameter error: The error to forward.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  public func errorCaught(_ error: Error) {
+    self.eventLoop.assertInEventLoop()
+    self.nextInbound?.invokeErrorCaught(error)
+  }
+
   /// Forwards the request part to the next outbound interceptor in the pipeline, if there is one.
   ///
   /// - Parameters:
@@ -137,4 +146,9 @@ extension ClientInterceptorContext {
     self.eventLoop.assertInEventLoop()
     self.interceptor.cancel(promise: promise, context: self)
   }
+
+  internal func invokeErrorCaught(_ error: Error) {
+    self.eventLoop.assertInEventLoop()
+    self.interceptor.errorCaught(error, context: self)
+  }
 }

+ 19 - 2
Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

@@ -127,6 +127,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
     details: CallDetails,
     interceptors: [ClientInterceptor<Request, Response>],
     errorDelegate: ClientErrorDelegate?,
+    onError: @escaping (Error) -> Void,
     onCancel: @escaping (EventLoopPromise<Void>?) -> Void,
     onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void,
     onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
@@ -141,7 +142,12 @@ internal final class ClientInterceptorPipeline<Request, Response> {
     // Start with the tail.
     contexts.append(
       ClientInterceptorContext(
-        for: .tail(for: self, errorDelegate: errorDelegate, onResponsePart),
+        for: .tail(
+          for: self,
+          errorDelegate: errorDelegate,
+          onError: onError,
+          onResponsePart: onResponsePart
+        ),
         atIndex: contexts.count,
         in: self
       )
@@ -182,6 +188,17 @@ internal final class ClientInterceptorPipeline<Request, Response> {
     self._head?.invokeReceive(part)
   }
 
+  /// Emit an error into the interceptor pipeline.
+  ///
+  /// This should be called by the transport layer when receiving an error.
+  ///
+  /// - Parameter error: The error to emit.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  internal func errorCaught(_ error: Error) {
+    self.eventLoop.assertInEventLoop()
+    self._head?.invokeErrorCaught(error)
+  }
+
   /// Writes a request message into the interceptor pipeline.
   ///
   /// This should be called by the call object to send requests parts to the transport.
@@ -268,7 +285,7 @@ extension ClientInterceptorPipeline {
     self.scheduledClose = self.eventLoop.scheduleTask(deadline: deadline) {
       // When the error hits the tail we'll call 'close()', this will cancel the transport if
       // necessary.
-      self.receive(.error(GRPCError.RPCTimedOut(timeLimit)))
+      self.errorCaught(GRPCError.RPCTimedOut(timeLimit))
     }
   }
 }

+ 6 - 0
Sources/GRPC/Interceptor/ClientInterceptorProtocol.swift

@@ -25,6 +25,12 @@ internal protocol ClientInterceptorProtocol {
     context: ClientInterceptorContext<Request, Response>
   )
 
+  /// Called when the interceptor has received an error to handle.
+  func errorCaught(
+    _ error: Error,
+    context: ClientInterceptorContext<Request, Response>
+  )
+
   /// Called when the interceptor has received a request part to handle.
   func send(
     _ part: GRPCClientRequestPart<Request>,

+ 92 - 43
Sources/GRPC/Interceptor/ClientInterceptors.swift

@@ -17,28 +17,29 @@ import NIO
 
 /// A base class for client interceptors.
 ///
-/// Interceptors allow request and response and response parts to be observed, mutated or dropped
-/// as necessary. The default behaviour for this base class is to forward any events to the next
-/// interceptor.
+/// Interceptors allow request and response parts to be observed, mutated or dropped as necessary.
+/// The default behaviour for this base class is to forward any events to the next interceptor.
 ///
-/// Interceptors may observe three different types of event:
+/// Interceptors may observe a number of different events:
 /// - receiving response parts with `receive(_:context:)`,
+/// - receiving errors with `errorCaught(_:context:)`,
 /// - sending request parts with `send(_:promise:context:)`, and
 /// - RPC cancellation with `cancel(context:)`.
 ///
 /// These events flow through a pipeline of interceptors for each RPC. Request parts sent from the
-/// call object (such as `UnaryCall` and `BidirectionalStreamingCall`) will traverse the pipeline
-/// from its tail via `send(_:context:)` eventually reaching the head of the pipeline where it will
-/// be sent sent to the server.
+/// call object (e.g. `UnaryCall`, `BidirectionalStreamingCall`) will traverse the pipeline in the
+/// outbound direction from its tail via `send(_:context:)` eventually reaching the head of the
+/// pipeline where it will be sent sent to the server.
 ///
-/// Response parts, or errors, received from the transport fill be fired back through the
-/// interceptor pipeline via `receive(_:context:)`. Note that the `end` and `error` response parts
-/// are terminal: the pipeline will be torn down once these parts reach the the tail of the
-/// pipeline.
+/// Response parts, or errors, received from the transport fill be fired in the inbound direction
+/// back through the interceptor pipeline via `receive(_:context:)` and `errorCaught(_:context:)`,
+/// respectively. Note that the `end` response part and any error received are terminal: the
+/// pipeline will be torn down once these parts reach the the tail and are a signal that the
+/// interceptor should free up any resources it may be using.
 ///
 /// Each of the interceptor functions is provided with a `context` which exposes analogous functions
-/// (`receive(_:)`, `send(_:promise:)`, and `cancel(promise:)`) which may be called to forward
-/// events to the next interceptor.
+/// (`receive(_:)`, `errorCaught(_:)`, `send(_:promise:)`, and `cancel(promise:)`) which may be
+/// called to forward events to the next interceptor in the appropriate direction.
 ///
 /// ### Thread Safety
 ///
@@ -61,6 +62,17 @@ open class ClientInterceptor<Request, Response> {
     context.receive(part)
   }
 
+  /// Called when the interceptor has received an error.
+  /// - Parameters:
+  ///   - error: The error.
+  ///   - context: An interceptor context which may be used to forward the error.
+  open func errorCaught(
+    _ error: Error,
+    context: ClientInterceptorContext<Request, Response>
+  ) {
+    context.errorCaught(error)
+  }
+
   /// Called when the interceptor has received a request part to handle.
   /// - Parameters:
   ///   - part: The request part which should be sent to the server.
@@ -129,6 +141,13 @@ internal struct HeadClientInterceptor<Request, Response>: ClientInterceptorProto
   ) {
     context.receive(part)
   }
+
+  internal func errorCaught(
+    _ error: Error,
+    context: ClientInterceptorContext<Request, Response>
+  ) {
+    context.errorCaught(error)
+  }
 }
 
 /// An interceptor which offloads responses to a provided callback and forwards any requests parts
@@ -141,6 +160,9 @@ internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProto
   /// A user-provided error delegate.
   private let errorDelegate: ClientErrorDelegate?
 
+  /// A callback invoked when an error is received.
+  private let onErrorCaught: (Error) -> Void
+
   /// A response part handler; typically this will complete some promises, for streaming responses
   /// it will also invoke a user-supplied handler. This closure may also be provided by the user.
   /// We need to be careful about re-entrancy.
@@ -149,10 +171,12 @@ internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProto
   internal init(
     for pipeline: ClientInterceptorPipeline<Request, Response>,
     errorDelegate: ClientErrorDelegate?,
+    _ onErrorCaught: @escaping (Error) -> Void,
     _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     self.pipeline = pipeline
     self.errorDelegate = errorDelegate
+    self.onErrorCaught = onErrorCaught
     self.onResponsePart = onResponsePart
   }
 
@@ -162,40 +186,44 @@ internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProto
   ) {
     switch part {
     case .metadata, .message:
-      self.onResponsePart(part)
-
+      ()
     case .end:
       // We're about to complete, close the pipeline before calling out via `onResponsePart`.
       self.pipeline.close()
-      self.onResponsePart(part)
+    }
 
-    case let .error(error):
-      // We're about to complete, close the pipeline before calling out via the error delegate
-      // or `onResponsePart`.
-      self.pipeline.close()
+    self.onResponsePart(part)
+  }
 
-      var unwrappedError: Error
-
-      // Unwrap the error, if possible.
-      if let errorContext = error as? GRPCError.WithContext {
-        unwrappedError = errorContext.error
-        self.errorDelegate?.didCatchError(
-          errorContext.error,
-          logger: context.logger,
-          file: errorContext.file,
-          line: errorContext.line
-        )
-      } else {
-        unwrappedError = error
-        self.errorDelegate?.didCatchErrorWithoutContext(
-          error,
-          logger: context.logger
-        )
-      }
-
-      // Emit the unwrapped error.
-      self.onResponsePart(.error(unwrappedError))
+  internal func errorCaught(
+    _ error: Error,
+    context: ClientInterceptorContext<Request, Response>
+  ) {
+    // We're about to complete, close the pipeline before calling out via the error delegate
+    // or `onResponsePart`.
+    self.pipeline.close()
+
+    var unwrappedError: Error
+
+    // Unwrap the error, if possible.
+    if let errorContext = error as? GRPCError.WithContext {
+      unwrappedError = errorContext.error
+      self.errorDelegate?.didCatchError(
+        errorContext.error,
+        logger: context.logger,
+        file: errorContext.file,
+        line: errorContext.line
+      )
+    } else {
+      unwrappedError = error
+      self.errorDelegate?.didCatchErrorWithoutContext(
+        error,
+        logger: context.logger
+      )
     }
+
+    // Emit the unwrapped error.
+    self.onErrorCaught(unwrappedError)
   }
 
   @inlinable
@@ -244,14 +272,21 @@ internal struct AnyClientInterceptor<Request, Response>: ClientInterceptorProtoc
   /// - Parameters:
   ///   - pipeline: The pipeline the tail interceptor belongs to.
   ///   - errorDelegate: An error delegate.
+  ///   - onError: A callback invoked when an error is received.
   ///   - onResponsePart: A handler called for each response part received from the pipeline.
   /// - Returns: An `AnyClientInterceptor` which wraps a `TailClientInterceptor`.
   internal static func tail(
     for pipeline: ClientInterceptorPipeline<Request, Response>,
     errorDelegate: ClientErrorDelegate?,
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) -> AnyClientInterceptor<Request, Response> {
-    let tail = TailClientInterceptor(for: pipeline, errorDelegate: errorDelegate, onResponsePart)
+    let tail = TailClientInterceptor(
+      for: pipeline,
+      errorDelegate: errorDelegate,
+      onError,
+      onResponsePart
+    )
     return .init(.tail(tail))
   }
 
@@ -282,6 +317,20 @@ internal struct AnyClientInterceptor<Request, Response>: ClientInterceptorProtoc
     }
   }
 
+  internal func errorCaught(
+    _ error: Error,
+    context: ClientInterceptorContext<Request, Response>
+  ) {
+    switch self._implementation {
+    case let .head(handler):
+      handler.errorCaught(error, context: context)
+    case let .tail(handler):
+      handler.errorCaught(error, context: context)
+    case let .base(handler):
+      handler.errorCaught(error, context: context)
+    }
+  }
+
   @inlinable
   internal func send(
     _ part: GRPCClientRequestPart<Request>,

+ 4 - 2
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -100,7 +100,8 @@ internal final class ClientTransport<Request, Response> {
     eventLoop: EventLoop,
     interceptors: [ClientInterceptor<Request, Response>],
     errorDelegate: ClientErrorDelegate?,
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) {
     self.eventLoop = eventLoop
     self.callDetails = details
@@ -109,6 +110,7 @@ internal final class ClientTransport<Request, Response> {
       details: details,
       interceptors: interceptors,
       errorDelegate: errorDelegate,
+      onError: onError,
       onCancel: self.cancelFromPipeline(promise:),
       onRequestPart: self.sendFromPipeline(_:promise:),
       onResponsePart: onResponsePart
@@ -799,7 +801,7 @@ extension ClientTransport {
   /// Forward the error to the interceptor pipeline.
   /// - Parameter error: The error to forward.
   private func forwardErrorToInterceptors(_ error: Error) {
-    self._pipeline?.receive(.error(error))
+    self._pipeline?.errorCaught(error)
   }
 }
 

+ 13 - 5
Sources/GRPC/Interceptor/ClientTransportFactory.swift

@@ -100,6 +100,7 @@ internal struct ClientTransportFactory<Request, Response> {
   ///   - type: The type of RPC, e.g. `.unary`.
   ///   - options: Options for the RPC.
   ///   - interceptors: Interceptors to use for the RPC.
+  ///   - onError: A callback invoked when an error is received.
   ///   - onResponsePart: A closure called for each response part received.
   /// - Returns: A configured transport.
   internal func makeConfiguredTransport<Request, Response>(
@@ -107,7 +108,8 @@ internal struct ClientTransportFactory<Request, Response> {
     for type: GRPCCallType,
     withOptions options: CallOptions,
     interceptedBy interceptors: [ClientInterceptor<Request, Response>],
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) -> ClientTransport<Request, Response> {
     switch self.factory {
     case let .http2(factory):
@@ -116,7 +118,8 @@ internal struct ClientTransportFactory<Request, Response> {
         for: type,
         withOptions: options,
         interceptedBy: interceptors,
-        onResponsePart
+        onError: onError,
+        onResponsePart: onResponsePart
       )
       factory.configure(transport)
       return transport
@@ -126,6 +129,7 @@ internal struct ClientTransportFactory<Request, Response> {
         for: type,
         withOptions: options,
         interceptedBy: interceptors,
+        onError: onError,
         onResponsePart
       )
       factory.configure(transport)
@@ -170,14 +174,16 @@ private struct HTTP2ClientTransportFactory<Request, Response> {
     for type: GRPCCallType,
     withOptions options: CallOptions,
     interceptedBy interceptors: [ClientInterceptor<Request, Response>],
-    _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
+    onError: @escaping (Error) -> Void,
+    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) -> ClientTransport<Request, Response> {
     return ClientTransport(
       details: self.makeCallDetails(type: type, path: path, options: options),
       eventLoop: self.multiplexer.eventLoop,
       interceptors: interceptors,
       errorDelegate: self.errorDelegate,
-      onResponsePart
+      onError: onError,
+      onResponsePart: onResponsePart
     )
   }
 
@@ -240,6 +246,7 @@ private struct FakeClientTransportFactory<Request, Response> {
     for type: GRPCCallType,
     withOptions options: CallOptions,
     interceptedBy interceptors: [ClientInterceptor<Request, Response>],
+    onError: @escaping (Error) -> Void,
     _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) -> ClientTransport<Request, Response> {
     return ClientTransport(
@@ -253,7 +260,8 @@ private struct FakeClientTransportFactory<Request, Response> {
       eventLoop: self.eventLoop,
       interceptors: interceptors,
       errorDelegate: nil,
-      onResponsePart
+      onError: onError,
+      onResponsePart: onResponsePart
     )
   }
 

+ 0 - 3
Sources/GRPC/Interceptor/MessageParts.swift

@@ -35,9 +35,6 @@ public enum GRPCClientResponsePart<Response> {
 
   /// The end of response stream sent by the server.
   case end(GRPCStatus, HPACKHeaders)
-
-  /// Error.
-  case error(Error)
 }
 
 public enum GRPCServerRequestPart<Request> {

+ 20 - 8
Tests/GRPCTests/ClientCallTests.swift

@@ -87,8 +87,6 @@ class ClientCallTests: GRPCTestCase {
         ()
       case let .end(status, _):
         promise.succeed(status)
-      case let .error(error):
-        promise.fail(error)
       }
     }
   }
@@ -99,7 +97,10 @@ class ClientCallTests: GRPCTestCase {
     let get = self.get()
 
     let statusPromise = self.makeStatusPromise()
-    get.invoke(self.makeResponsePartHandler(completing: statusPromise))
+    get.invoke(
+      onError: statusPromise.fail(_:),
+      onResponsePart: self.makeResponsePartHandler(completing: statusPromise)
+    )
 
     let f1 = get.send(.metadata(get.options.customMetadata))
     let f2 = get.send(.message(.with { $0.text = "get" }, .init(compress: false, flush: false)))
@@ -120,7 +121,8 @@ class ClientCallTests: GRPCTestCase {
     let promise = self.makeStatusPromise()
     get.invokeUnaryRequest(
       .with { $0.text = "get" },
-      self.makeResponsePartHandler(completing: promise)
+      onError: promise.fail(_:),
+      onResponsePart: self.makeResponsePartHandler(completing: promise)
     )
 
     assertThat(try promise.futureResult.wait(), .hasCode(.ok))
@@ -130,7 +132,10 @@ class ClientCallTests: GRPCTestCase {
     let collect = self.collect()
 
     let promise = self.makeStatusPromise()
-    collect.invokeStreamingRequests(self.makeResponsePartHandler(completing: promise))
+    collect.invokeStreamingRequests(
+      onError: promise.fail(_:),
+      onResponsePart: self.makeResponsePartHandler(completing: promise)
+    )
     collect.send(
       .message(.with { $0.text = "collect" }, .init(compress: false, flush: false)),
       promise: nil
@@ -146,7 +151,8 @@ class ClientCallTests: GRPCTestCase {
     let promise = self.makeStatusPromise()
     expand.invokeUnaryRequest(
       .with { $0.text = "expand" },
-      self.makeResponsePartHandler(completing: promise)
+      onError: promise.fail(_:),
+      onResponsePart: self.makeResponsePartHandler(completing: promise)
     )
 
     assertThat(try promise.futureResult.wait(), .hasCode(.ok))
@@ -156,7 +162,10 @@ class ClientCallTests: GRPCTestCase {
     let update = self.update()
 
     let promise = self.makeStatusPromise()
-    update.invokeStreamingRequests(self.makeResponsePartHandler(completing: promise))
+    update.invokeStreamingRequests(
+      onError: promise.fail(_:),
+      onResponsePart: self.makeResponsePartHandler(completing: promise)
+    )
     update.send(
       .message(.with { $0.text = "update" }, .init(compress: false, flush: false)),
       promise: nil
@@ -179,7 +188,10 @@ class ClientCallTests: GRPCTestCase {
   func testCancelMidRPC() throws {
     let get = self.get()
     let promise = self.makeStatusPromise()
-    get.invoke(self.makeResponsePartHandler(completing: promise))
+    get.invoke(
+      onError: promise.fail(_:),
+      onResponsePart: self.makeResponsePartHandler(completing: promise)
+    )
 
     // Cancellation should succeed.
     assertThat(try get.cancel().wait(), .doesNotThrow())

+ 15 - 21
Tests/GRPCTests/ClientInterceptorPipelineTests.swift

@@ -33,6 +33,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     details: CallDetails? = nil,
     interceptors: [ClientInterceptor<Request, Response>] = [],
     errorDelegate: ClientErrorDelegate? = nil,
+    onError: @escaping (Error) -> Void = { _ in },
     onCancel: @escaping (EventLoopPromise<Void>?) -> Void = { _ in },
     onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void,
     onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
@@ -42,6 +43,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
       details: details ?? self.makeCallDetails(),
       interceptors: interceptors,
       errorDelegate: errorDelegate,
+      onError: onError,
       onCancel: onCancel,
       onRequestPart: onRequestPart,
       onResponsePart: onResponsePart
@@ -104,14 +106,12 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
       onRequestPart: { _, promise in
         XCTAssertNil(promise)
       },
-      onResponsePart: {
-        XCTAssertNotNil($0.error)
-      }
+      onResponsePart: { _ in }
     )
 
     // Fire an error; this should close the pipeline.
     struct DummyError: Error {}
-    pipeline.receive(.error(DummyError()))
+    pipeline.errorCaught(DummyError())
 
     // We're closed, writes should fail.
     let writePromise = pipeline.eventLoop.makePromise(of: Void.self)
@@ -147,6 +147,11 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
       responses: String.self,
       details: self.makeCallDetails(timeLimit: .deadline(deadline)),
       interceptors: [FailOnCancel()],
+      onError: { error in
+        assertThat(error, .is(.instanceOf(GRPCError.RPCTimedOut.self)))
+        assertThat(timedOut, .is(false))
+        timedOut = true
+      },
       onCancel: { promise in
         assertThat(cancelled, .is(false))
         cancelled = true
@@ -156,10 +161,8 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
       onRequestPart: { _, _ in
         XCTFail("Unexpected request part")
       },
-      onResponsePart: { part in
-        assertThat(part.error, .is(.instanceOf(GRPCError.RPCTimedOut.self)))
-        assertThat(timedOut, .is(false))
-        timedOut = true
+      onResponsePart: { _ in
+        XCTFail("Unexpected response part")
       }
     )
 
@@ -270,7 +273,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
         onRequestPart: { _, _ in },
         onResponsePart: { _ in }
       )
-      pipeline.receive(.error(error))
+      pipeline.errorCaught(error)
     }
 
     let invalidState = GRPCError.InvalidState("invalid state")
@@ -365,7 +368,7 @@ extension GRPCClientResponsePart {
     switch self {
     case let .metadata(headers):
       return headers
-    case .message, .end, .error:
+    case .message, .end:
       return nil
     }
   }
@@ -374,7 +377,7 @@ extension GRPCClientResponsePart {
     switch self {
     case let .message(response):
       return response
-    case .metadata, .end, .error:
+    case .metadata, .end:
       return nil
     }
   }
@@ -383,16 +386,7 @@ extension GRPCClientResponsePart {
     switch self {
     case let .end(status, trailers):
       return (status, trailers)
-    case .metadata, .message, .error:
-      return nil
-    }
-  }
-
-  var error: Error? {
-    switch self {
-    case let .error(error):
-      return error
-    case .metadata, .message, .end:
+    case .metadata, .message:
       return nil
     }
   }

+ 20 - 17
Tests/GRPCTests/ClientTransportTests.swift

@@ -45,6 +45,7 @@ class ClientTransportTests: GRPCTestCase {
   private func setUpTransport(
     details: CallDetails? = nil,
     interceptors: [ClientInterceptor<String, String>] = [],
+    onError: @escaping (Error) -> Void = { _ in },
     onResponsePart: @escaping (GRPCClientResponsePart<String>) -> Void = { _ in }
   ) {
     self.transport = .init(
@@ -52,7 +53,8 @@ class ClientTransportTests: GRPCTestCase {
       eventLoop: self.eventLoop,
       interceptors: interceptors,
       errorDelegate: nil,
-      onResponsePart
+      onError: onError,
+      onResponsePart: onResponsePart
     )
   }
 
@@ -133,9 +135,9 @@ extension ClientTransportTests {
 
   func testCancelWhenIdle() throws {
     // Set up the transport, configure it and connect.
-    self.setUpTransport { part in
-      assertThat(part.error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
-    }
+    self.setUpTransport(onError: { error in
+      assertThat(error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
+    })
 
     // Cancellation should succeed.
     let promise = self.eventLoop.makePromise(of: Void.self)
@@ -145,9 +147,9 @@ extension ClientTransportTests {
 
   func testCancelWhenAwaitingTransport() throws {
     // Set up the transport, configure it and connect.
-    self.setUpTransport { part in
-      assertThat(part.error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
-    }
+    self.setUpTransport(onError: { error in
+      assertThat(error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
+    })
 
     // Start configuring the transport.
     let transportActivatedPromise = self.eventLoop.makePromise(of: Void.self)
@@ -179,9 +181,12 @@ extension ClientTransportTests {
   func testCancelWhenActivating() throws {
     // Set up the transport, configure it and connect.
     // We use bidirectional streaming here so that we also flush after writing the metadata.
-    self.setUpTransport(details: self.makeDetails(type: .bidirectionalStreaming)) { part in
-      assertThat(part.error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
-    }
+    self.setUpTransport(
+      details: self.makeDetails(type: .bidirectionalStreaming),
+      onError: { error in
+        assertThat(error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
+      }
+    )
 
     // Write a request. This will buffer.
     let writePromise1 = self.eventLoop.makePromise(of: Void.self)
@@ -268,9 +273,10 @@ extension ClientTransportTests {
 
   func testErrorWhenActive() throws {
     // Setup the transport, we only expect an error back.
-    self.setUpTransport { part in
-      assertThat(part.error, .is(.instanceOf(DummyError.self)))
-    }
+    self.setUpTransport(onError: { error in
+      assertThat(error, .is(.instanceOf(DummyError.self)))
+    })
+
     // Configure and activate.
     self.configureTransport()
     try self.connect()
@@ -294,10 +300,7 @@ extension ClientTransportTests {
   }
 
   func testConfigurationFails() throws {
-    self.setUpTransport { part in
-      // We wrap the configuration error up, so we can't assert on its type.
-      assertThat(part.error, .is(.notNil()))
-    }
+    self.setUpTransport()
 
     let p1 = self.eventLoop.makePromise(of: Void.self)
     self.sendRequest(.metadata([:]), promise: p1)

+ 1 - 1
Tests/GRPCTests/InterceptorsTests.swift

@@ -271,7 +271,7 @@ class NotReallyAuthClientInterceptor<Request: Message, Response: Message>:
         self.state = .retrying(call)
 
         // Invoke the call and redirect responses here.
-        call.invoke(context.receive(_:))
+        call.invoke(onError: context.errorCaught(_:), onResponsePart: context.receive(_:))
 
         // Parts must contain the metadata as the first item if we got that first response.
         if case var .some(.metadata(metadata)) = parts.first {