Browse Source

Add a client error delegate (#456)

* Add a client error delegate

* Don't close the channel twice

* Fix typo
George Barnett 6 years ago
parent
commit
2f78c45b21

+ 6 - 4
Sources/SwiftGRPCNIO/ClientCalls/BaseClientCall.swift

@@ -80,17 +80,19 @@ open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
     connection: GRPCClientConnection,
     path: String,
     callOptions: CallOptions,
-    responseObserver: ResponseObserver<ResponseMessage>
+    responseObserver: ResponseObserver<ResponseMessage>,
+    errorDelegate: ClientErrorDelegate?
   ) {
     self.connection = connection
     self.streamPromise = connection.channel.eventLoop.makePromise()
     self.clientChannelHandler = GRPCClientChannelHandler(
       initialMetadataPromise: connection.channel.eventLoop.makePromise(),
       statusPromise: connection.channel.eventLoop.makePromise(),
-      responseObserver: responseObserver)
+      responseObserver: responseObserver,
+      errorDelegate: errorDelegate)
 
     self.streamPromise.futureResult.whenFailure { error in
-      self.clientChannelHandler.observeError(error)
+      self.clientChannelHandler.observeError(.unknown(error, origin: .client))
     }
 
     self.createStreamChannel()
@@ -214,7 +216,7 @@ extension BaseClientCall {
     let promise = promise ?? self.connection.channel.eventLoop.makePromise()
 
     promise.futureResult.whenFailure { error in
-      self.clientChannelHandler.observeError(error)
+      self.clientChannelHandler.observeError(.unknown(error, origin: .client))
     }
 
     self.subchannel.cascadeFailure(to: promise)

+ 2 - 2
Sources/SwiftGRPCNIO/ClientCalls/BidirectionalStreamingClientCall.swift

@@ -29,9 +29,9 @@ import NIO
 public class BidirectionalStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage>, StreamingRequestClientCall {
   private var messageQueue: EventLoopFuture<Void>
 
-  public init(connection: GRPCClientConnection, path: String, callOptions: CallOptions, handler: @escaping (ResponseMessage) -> Void) {
+  public init(connection: GRPCClientConnection, path: String, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?, handler: @escaping (ResponseMessage) -> Void) {
     self.messageQueue = connection.channel.eventLoop.makeSucceededFuture(())
-    super.init(connection: connection, path: path, callOptions: callOptions, responseObserver: .callback(handler))
+    super.init(connection: connection, path: path, callOptions: callOptions, responseObserver: .callback(handler), errorDelegate: errorDelegate)
 
     let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
     self.messageQueue = self.messageQueue.flatMap {

+ 3 - 2
Sources/SwiftGRPCNIO/ClientCalls/ClientStreamingClientCall.swift

@@ -31,7 +31,7 @@ public class ClientStreamingClientCall<RequestMessage: Message, ResponseMessage:
   public let response: EventLoopFuture<ResponseMessage>
   private var messageQueue: EventLoopFuture<Void>
 
-  public init(connection: GRPCClientConnection, path: String, callOptions: CallOptions) {
+  public init(connection: GRPCClientConnection, path: String, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?) {
     let responsePromise: EventLoopPromise<ResponseMessage> = connection.channel.eventLoop.makePromise()
     self.response = responsePromise.futureResult
     self.messageQueue = connection.channel.eventLoop.makeSucceededFuture(())
@@ -40,7 +40,8 @@ public class ClientStreamingClientCall<RequestMessage: Message, ResponseMessage:
       connection: connection,
       path: path,
       callOptions: callOptions,
-      responseObserver: .succeedPromise(responsePromise))
+      responseObserver: .succeedPromise(responsePromise),
+      errorDelegate: errorDelegate)
 
     let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
     self.messageQueue = self.messageQueue.flatMap {

+ 2 - 2
Sources/SwiftGRPCNIO/ClientCalls/ServerStreamingClientCall.swift

@@ -24,8 +24,8 @@ import NIO
 /// - `status`: the status of the gRPC call after it has ended,
 /// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
 public class ServerStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage> {
-  public init(connection: GRPCClientConnection, path: String, request: RequestMessage, callOptions: CallOptions, handler: @escaping (ResponseMessage) -> Void) {
-    super.init(connection: connection, path: path, callOptions: callOptions, responseObserver: .callback(handler))
+  public init(connection: GRPCClientConnection, path: String, request: RequestMessage, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?, handler: @escaping (ResponseMessage) -> Void) {
+    super.init(connection: connection, path: path, callOptions: callOptions, responseObserver: .callback(handler), errorDelegate: errorDelegate)
 
     let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
     self.sendHead(requestHead).flatMap {

+ 3 - 2
Sources/SwiftGRPCNIO/ClientCalls/UnaryClientCall.swift

@@ -27,7 +27,7 @@ import NIO
 public class UnaryClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage>, UnaryResponseClientCall {
   public let response: EventLoopFuture<ResponseMessage>
 
-  public init(connection: GRPCClientConnection, path: String, request: RequestMessage, callOptions: CallOptions) {
+  public init(connection: GRPCClientConnection, path: String, request: RequestMessage, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?) {
     let responsePromise: EventLoopPromise<ResponseMessage> = connection.channel.eventLoop.makePromise()
     self.response = responsePromise.futureResult
 
@@ -35,7 +35,8 @@ public class UnaryClientCall<RequestMessage: Message, ResponseMessage: Message>:
       connection: connection,
       path: path,
       callOptions: callOptions,
-      responseObserver: .succeedPromise(responsePromise))
+      responseObserver: .succeedPromise(responsePromise),
+      errorDelegate: errorDelegate)
 
     let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
     self.sendHead(requestHead).flatMap {

+ 54 - 0
Sources/SwiftGRPCNIO/ClientErrorDelegate.swift

@@ -0,0 +1,54 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+
+/// Delegate called when errors are caught by the client on individual HTTP/2 streams and errors in
+/// the underlying HTTP/2 connection.
+///
+/// The intended use of this protocol is with `GRPCClientConnection`. In order to avoid retain
+/// cycles, classes implementing this delegate **must not** maintain a strong reference to the
+/// `GRPCClientConnection`.
+public protocol ClientErrorDelegate: class {
+  /// Called when the client catches an error.
+  ///
+  /// - Parameters:
+  ///   - error: The error which was caught.
+  ///   - file: The file where the error was raised.
+  ///   - line: The line within the file where the error was raised.
+  func didCatchError(_ error: Error, file: StaticString, line: Int)
+}
+
+/// A `ClientErrorDelegate` which logs errors only in debug builds.
+public class DebugOnlyLoggingClientErrorDelegate: ClientErrorDelegate {
+  public static let shared = DebugOnlyLoggingClientErrorDelegate()
+
+  private init() { }
+
+  public func didCatchError(_ error: Error, file: StaticString, line: Int) {
+    debugOnly {
+      print("[grpc-client][\(Date())] error: \(error), file: \(file), line: \(line)")
+    }
+  }
+}
+
+/// A utility function that runs the body code only in debug builds, without emitting compiler
+/// warnings.
+///
+/// This is currently the only way to do this in Swift: see
+/// https://forums.swift.org/t/support-debug-only-code/11037 for a discussion.
+internal func debugOnly(_ body: () -> Void) {
+  assert({ body(); return true }())
+}

+ 4 - 4
Sources/SwiftGRPCNIO/GRPCClient.swift

@@ -32,7 +32,7 @@ extension GRPCClient {
     callOptions: CallOptions,
     responseType: Response.Type = Response.self
   ) -> UnaryClientCall<Request, Response> {
-    return UnaryClientCall(connection: self.connection, path: path, request: request, callOptions: callOptions)
+    return UnaryClientCall(connection: self.connection, path: path, request: request, callOptions: callOptions, errorDelegate: self.connection.errorDelegate)
   }
 
   public func makeServerStreamingCall<Request: Message, Response: Message>(
@@ -42,7 +42,7 @@ extension GRPCClient {
     responseType: Response.Type = Response.self,
     handler: @escaping (Response) -> Void
   ) -> ServerStreamingClientCall<Request, Response> {
-    return ServerStreamingClientCall(connection: self.connection, path: path, request: request, callOptions: callOptions, handler: handler)
+    return ServerStreamingClientCall(connection: self.connection, path: path, request: request, callOptions: callOptions, errorDelegate: self.connection.errorDelegate, handler: handler)
   }
 
   public func makeClientStreamingCall<Request: Message, Response: Message>(
@@ -51,7 +51,7 @@ extension GRPCClient {
     requestType: Request.Type = Request.self,
     responseType: Response.Type = Response.self
   ) -> ClientStreamingClientCall<Request, Response> {
-    return ClientStreamingClientCall(connection: self.connection, path: path, callOptions: callOptions)
+    return ClientStreamingClientCall(connection: self.connection, path: path, callOptions: callOptions, errorDelegate: self.connection.errorDelegate)
   }
 
   public func makeBidirectionalStreamingCall<Request: Message, Response: Message>(
@@ -61,7 +61,7 @@ extension GRPCClient {
     responseType: Response.Type = Response.self,
     handler: @escaping (Response) -> Void
   ) -> BidirectionalStreamingClientCall<Request, Response> {
-    return BidirectionalStreamingClientCall(connection: self.connection, path: path, callOptions: callOptions, handler: handler)
+    return BidirectionalStreamingClientCall(connection: self.connection, path: path, callOptions: callOptions, errorDelegate: self.connection.errorDelegate, handler: handler)
   }
 }
 

+ 12 - 12
Sources/SwiftGRPCNIO/GRPCClientChannelHandler.swift

@@ -33,6 +33,7 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
   internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
   internal let statusPromise: EventLoopPromise<GRPCStatus>
   internal let responseObserver: ResponseObserver<ResponseMessage>
+  internal let errorDelegate: ClientErrorDelegate?
 
   /// A promise for a unary response.
   internal var responsePromise: EventLoopPromise<ResponseMessage>? {
@@ -76,11 +77,13 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
   public init(
     initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
     statusPromise: EventLoopPromise<GRPCStatus>,
-    responseObserver: ResponseObserver<ResponseMessage>
+    responseObserver: ResponseObserver<ResponseMessage>,
+    errorDelegate: ClientErrorDelegate?
   ) {
     self.initialMetadataPromise = initialMetadataPromise
     self.statusPromise = statusPromise
     self.responseObserver = responseObserver
+    self.errorDelegate = errorDelegate
   }
 
   /// Observe the given status.
@@ -100,16 +103,14 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
 
   /// Observe the given error.
   ///
-  /// If the error conforms to `GRPCStatusTransformable` then `observeStatus(status:)` is called
-  /// with the transformed error, otherwise `GRPCStatus.processingError` is used.
+  /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
+  /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
+  /// the given error (see `observeStatus(_:)`).
   ///
   /// - Parameter error: the error to observe.
-  internal func observeError(_ error: Error) {
-    if let transformable = error as? GRPCStatusTransformable {
-      self.observeStatus(transformable.asGRPCStatus())
-    } else {
-      self.observeStatus(.processingError)
-    }
+  internal func observeError(_ error: GRPCError) {
+    self.errorDelegate?.didCatchError(error.error, file: error.file, line: error.line)
+    self.observeStatus(error.asGRPCStatus())
   }
 }
 
@@ -154,8 +155,8 @@ extension GRPCClientChannelHandler: ChannelInboundHandler {
 
       self.observeStatus(status)
 
-      // We don't expect any more requests/responses beyond this point.
-      self.close(context: context, mode: .all, promise: nil)
+      // We don't expect any more requests/responses beyond this point and we don't need to close
+      // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
     }
   }
 }
@@ -201,7 +202,6 @@ extension GRPCClientChannelHandler {
 
   /// Observe an error from the pipeline and close the channel.
   public func errorCaught(context: ChannelHandlerContext, error: Error) {
-    //! TODO: Add an error handling delegate, similar to in the server.
     self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
     context.close(mode: .all, promise: nil)
   }

+ 19 - 8
Sources/SwiftGRPCNIO/GRPCClientConnection.swift

@@ -48,7 +48,9 @@ import NIOTLS
 ///
 /// The `GRPCTLSVerificationHandler` observes the outcome of the SSL handshake and determines
 /// whether a `GRPCClientConnection` should be returned to the user. In either eventuality, the
-/// handler removes itself from the pipeline once TLS has been verified.
+/// handler removes itself from the pipeline once TLS has been verified. There is also a delegated
+/// error handler after the `HTTPStreamMultiplexer` in the main channel which uses the error
+/// delegate associated with this connection (see `GRPCDelegatingErrorHandler`).
 ///
 /// See `BaseClientCall` for a description of the remainder of the client pipeline.
 open class GRPCClientConnection {
@@ -58,6 +60,10 @@ open class GRPCClientConnection {
   ///   - host: Host to connect to.
   ///   - port: Port on the host to connect to.
   ///   - eventLoopGroup: Event loop group to run the connection on.
+  ///   - errorDelegate: An error delegate which is called when errors are caught. Provided
+  ///       delegates **must not maintain a strong reference to this `GRPCClientConnection`**. Doing
+  ///       so will cause a retain cycle. Defaults to a delegate which logs errors in debug builds
+  ///       only.
   ///   - tlsMode: How TLS should be configured for this connection.
   ///   - hostOverride: Value to use for TLS SNI extension; this must not be an IP address. Ignored
   ///       if `tlsMode` is `.none`.
@@ -66,6 +72,7 @@ open class GRPCClientConnection {
     host: String,
     port: Int,
     eventLoopGroup: EventLoopGroup,
+    errorDelegate: ClientErrorDelegate? = DebugOnlyLoggingClientErrorDelegate.shared,
     tls tlsMode: TLSMode = .none,
     hostOverride: String? = nil
   ) throws -> EventLoopFuture<GRPCClientConnection> {
@@ -73,9 +80,11 @@ open class GRPCClientConnection {
       // Enable SO_REUSEADDR.
       .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
       .channelInitializer { channel in
-        configureTLS(mode: tlsMode, channel: channel, host: hostOverride ?? host).flatMap {
+        configureTLS(mode: tlsMode, channel: channel, host: hostOverride ?? host, errorDelegate: errorDelegate).flatMap {
           channel.configureHTTP2Pipeline(mode: .client)
-        }.map { _ in }
+        }.flatMap { _ in
+          channel.pipeline.addHandler(GRPCDelegatingErrorHandler(delegate: errorDelegate))
+        }
       }
 
     return bootstrap.connect(host: host, port: port).flatMap { channel in
@@ -104,7 +113,7 @@ open class GRPCClientConnection {
       }.map {
         $0.handler as! HTTP2StreamMultiplexer
       }.map { multiplexer in
-        GRPCClientConnection(channel: channel, multiplexer: multiplexer, host: host, httpProtocol: tlsMode.httpProtocol)
+        GRPCClientConnection(channel: channel, multiplexer: multiplexer, host: host, httpProtocol: tlsMode.httpProtocol, errorDelegate: errorDelegate)
       }
     }
   }
@@ -115,8 +124,9 @@ open class GRPCClientConnection {
   ///   - mode: TLS mode to use when creating the new handler.
   ///   - channel: The channel on which to add the SSL handler.
   ///   - host: The hostname of the server we're connecting to.
+  ///   - errorDelegate: The error delegate to use.
   /// - Returns: A future which will be succeeded when the pipeline has been configured.
-  private static func configureTLS(mode tls: TLSMode, channel: Channel, host: String) -> EventLoopFuture<Void> {
+  private static func configureTLS(mode tls: TLSMode, channel: Channel, host: String, errorDelegate: ClientErrorDelegate?) -> EventLoopFuture<Void> {
     let handlerAddedPromise: EventLoopPromise<Void> = channel.eventLoop.makePromise()
 
     do {
@@ -126,7 +136,7 @@ open class GRPCClientConnection {
       }
 
       let sslHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: host)
-      let verificationHandler = GRPCTLSVerificationHandler()
+      let verificationHandler = GRPCTLSVerificationHandler(errorDelegate: errorDelegate)
 
       channel.pipeline.addHandlers(sslHandler, verificationHandler).cascade(to: handlerAddedPromise)
     } catch {
@@ -140,12 +150,14 @@ open class GRPCClientConnection {
   public let multiplexer: HTTP2StreamMultiplexer
   public let host: String
   public let httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol
+  public let errorDelegate: ClientErrorDelegate?
 
-  init(channel: Channel, multiplexer: HTTP2StreamMultiplexer, host: String, httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol) {
+  init(channel: Channel, multiplexer: HTTP2StreamMultiplexer, host: String, httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol, errorDelegate: ClientErrorDelegate?) {
     self.channel = channel
     self.multiplexer = multiplexer
     self.host = host
     self.httpProtocol = httpProtocol
+    self.errorDelegate = errorDelegate
   }
 
   /// Fired when the client shuts down.
@@ -158,7 +170,6 @@ open class GRPCClientConnection {
   }
 }
 
-
 extension GRPCClientConnection {
   public enum TLSMode {
     case none

+ 38 - 0
Sources/SwiftGRPCNIO/GRPCDelegatingErrorHandler.swift

@@ -0,0 +1,38 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import NIO
+
+/// A channel handler which allows caught errors to be passed to a `ClientErrorDelegate`. This
+/// handler is intended to be used in the client channel pipeline after the HTTP/2 stream
+/// multiplexer to handle errors which occur on the underlying connection.
+public class GRPCDelegatingErrorHandler: ChannelInboundHandler {
+  public typealias InboundIn = Any
+
+  private let delegate: ClientErrorDelegate?
+
+  public init(delegate: ClientErrorDelegate?) {
+    self.delegate = delegate
+  }
+
+  public func errorCaught(context: ChannelHandlerContext, error: Error) {
+    if let delegate = self.delegate {
+      let grpcError = (error as? GRPCError) ?? .unknown(error, origin: .client)
+      delegate.didCatchError(grpcError.error, file: grpcError.file, line: grpcError.line)
+    }
+    context.close(promise: nil)
+  }
+}

+ 9 - 1
Sources/SwiftGRPCNIO/GRPCTLSVerificationHandler.swift

@@ -25,6 +25,7 @@ public class GRPCTLSVerificationHandler: ChannelInboundHandler, RemovableChannel
   public typealias InboundIn = Any
 
   private var verificationPromise: EventLoopPromise<Void>!
+  private let delegate: ClientErrorDelegate?
 
   /// A future which is fulfilled when the state of the TLS handshake is known. If the handshake
   /// was successful and the negotiated application protocol is valid then the future is succeeded.
@@ -37,7 +38,9 @@ public class GRPCTLSVerificationHandler: ChannelInboundHandler, RemovableChannel
     return verificationPromise.futureResult
   }
 
-  public init() { }
+  public init(errorDelegate: ClientErrorDelegate?) {
+    self.delegate = errorDelegate
+  }
 
   public func handlerAdded(context: ChannelHandlerContext) {
     self.verificationPromise = context.eventLoop.makePromise()
@@ -50,6 +53,11 @@ public class GRPCTLSVerificationHandler: ChannelInboundHandler, RemovableChannel
   public func errorCaught(context: ChannelHandlerContext, error: Error) {
     precondition(self.verificationPromise != nil, "handler has not been added to the pipeline")
 
+    if let delegate = self.delegate {
+      let grpcError = (error as? GRPCError) ?? GRPCError.unknown(error, origin: .client)
+      delegate.didCatchError(grpcError.error, file: grpcError.file, line: grpcError.line)
+    }
+
     verificationPromise.fail(error)
   }