Browse Source

Use user events for timing out and cancelling client calls (#459)

George Barnett 6 years ago
parent
commit
7a4c019471

+ 4 - 2
Sources/SwiftGRPCNIO/ClientCalls/BaseClientCall.swift

@@ -122,7 +122,7 @@ extension BaseClientCall: ClientCall {
   public func cancel() {
     self.connection.channel.eventLoop.execute {
       self.subchannel.whenSuccess { channel in
-        channel.close(mode: .all, promise: nil)
+        channel.pipeline.fireUserInboundEventTriggered(GRPCClientUserEvent.cancelled)
       }
     }
   }
@@ -233,7 +233,9 @@ extension BaseClientCall {
     if timeout == .infinite { return }
 
     self.connection.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
-      self?.clientChannelHandler.observeError(GRPCError.client(.deadlineExceeded(timeout)))
+      self?.subchannel.whenSuccess { stream in
+        stream.pipeline.fireUserInboundEventTriggered(GRPCClientUserEvent.timedOut(timeout))
+      }
     }
   }
 

+ 26 - 3
Sources/SwiftGRPCNIO/GRPCClientChannelHandler.swift

@@ -109,7 +109,7 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
   ///
   /// - Parameter error: the error to observe.
   internal func observeError(_ error: GRPCError) {
-    self.errorDelegate?.didCatchError(error.error, file: error.file, line: error.line)
+    self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
     self.observeStatus(error.asGRPCStatus())
   }
 }
@@ -159,6 +159,22 @@ extension GRPCClientChannelHandler: ChannelInboundHandler {
       // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
     }
   }
+
+  public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
+    if let clientUserEvent = event as? GRPCClientUserEvent {
+      switch clientUserEvent {
+      case .cancelled:
+        // We shouldn't observe an error since this event is triggered by the user: just observe the
+        // status.
+        self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus())
+        self.close(context: context, mode: .all, promise: nil)
+
+      case .timedOut(let timeout):
+        self.observeError(GRPCError.client(.deadlineExceeded(timeout)))
+        self.close(context: context, mode: .all, promise: nil)
+      }
+    }
+  }
 }
 
 extension GRPCClientChannelHandler: ChannelOutboundHandler {
@@ -192,8 +208,6 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
 extension GRPCClientChannelHandler {
   /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore.
   public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
-    self.observeError(GRPCError.client(.cancelledByClient))
-
     context.close(mode: mode, promise: promise)
 
     self.inboundState = .ignore
@@ -206,3 +220,12 @@ extension GRPCClientChannelHandler {
     context.close(mode: .all, promise: nil)
   }
 }
+
+/// Client user evenets.
+///
+/// - cancelled: The call has been cancelled.
+/// - timedOut: The call did not complete before the deadline was exceeded.
+public enum GRPCClientUserEvent {
+  case cancelled
+  case timedOut(GRPCTimeout)
+}

+ 1 - 1
Sources/SwiftGRPCNIO/GRPCDelegatingErrorHandler.swift

@@ -31,7 +31,7 @@ public class GRPCDelegatingErrorHandler: ChannelInboundHandler {
   public func errorCaught(context: ChannelHandlerContext, error: Error) {
     if let delegate = self.delegate {
       let grpcError = (error as? GRPCError) ?? .unknown(error, origin: .client)
-      delegate.didCatchError(grpcError.error, file: grpcError.file, line: grpcError.line)
+      delegate.didCatchError(grpcError.wrappedError, file: grpcError.file, line: grpcError.line)
     }
     context.close(promise: nil)
   }

+ 3 - 3
Sources/SwiftGRPCNIO/GRPCError.swift

@@ -21,7 +21,7 @@ public struct GRPCError: Error, GRPCStatusTransformable {
   public enum Origin { case client, server }
 
   /// The underlying error thrown by framework.
-  public let error: Error
+  public let wrappedError: Error
 
   /// The origin of the error.
   public let origin: Origin
@@ -33,11 +33,11 @@ public struct GRPCError: Error, GRPCStatusTransformable {
   public let line: Int
 
   public func asGRPCStatus() -> GRPCStatus {
-    return (error as? GRPCStatusTransformable)?.asGRPCStatus() ?? .processingError
+    return (wrappedError as? GRPCStatusTransformable)?.asGRPCStatus() ?? .processingError
   }
 
   private init(_ error: Error, origin: Origin, file: StaticString, line: Int) {
-    self.error = error
+    self.wrappedError = error
     self.origin = origin
     self.file = file
     self.line = line

+ 1 - 1
Sources/SwiftGRPCNIO/GRPCTLSVerificationHandler.swift

@@ -55,7 +55,7 @@ public class GRPCTLSVerificationHandler: ChannelInboundHandler, RemovableChannel
 
     if let delegate = self.delegate {
       let grpcError = (error as? GRPCError) ?? GRPCError.unknown(error, origin: .client)
-      delegate.didCatchError(grpcError.error, file: grpcError.file, line: grpcError.line)
+      delegate.didCatchError(grpcError.wrappedError, file: grpcError.file, line: grpcError.line)
     }
 
     verificationPromise.fail(error)

+ 2 - 2
Tests/SwiftGRPCNIOTests/GRPCChannelHandlerResponseCapturingTestCase.swift

@@ -21,11 +21,11 @@ class CollectingServerErrorDelegate: ServerErrorDelegate {
   }
 
   var asGRPCServerErrors: [GRPCServerError]? {
-    return (self.asGRPCErrors?.map { $0.error }) as? [GRPCServerError]
+    return (self.asGRPCErrors?.map { $0.wrappedError }) as? [GRPCServerError]
   }
 
   var asGRPCCommonErrors: [GRPCCommonError]? {
-    return (self.asGRPCErrors?.map { $0.error }) as? [GRPCCommonError]
+    return (self.asGRPCErrors?.map { $0.wrappedError }) as? [GRPCCommonError]
   }
 
   func observeLibraryError(_ error: Error) {

+ 2 - 2
Tests/SwiftGRPCNIOTests/LengthPrefixedMessageReaderTests.swift

@@ -195,7 +195,7 @@ class LengthPrefixedMessageReaderTests: XCTestCase {
     reader.append(buffer: &buffer)
 
     XCTAssertThrowsError(try reader.nextMessage()) { error in
-      XCTAssertEqual(.unsupportedCompressionMechanism("unknown"), (error as? GRPCError)?.error as? GRPCCommonError)
+      XCTAssertEqual(.unsupportedCompressionMechanism("unknown"), (error as? GRPCError)?.wrappedError as? GRPCCommonError)
     }
   }
 
@@ -208,7 +208,7 @@ class LengthPrefixedMessageReaderTests: XCTestCase {
     reader.append(buffer: &buffer)
 
     XCTAssertThrowsError(try reader.nextMessage()) { error in
-      XCTAssertEqual(.unexpectedCompression, (error as? GRPCError)?.error as? GRPCCommonError)
+      XCTAssertEqual(.unexpectedCompression, (error as? GRPCError)?.wrappedError as? GRPCCommonError)
     }
   }
 

+ 1 - 1
Tests/SwiftGRPCNIOTests/NIOBasicEchoTestCase.swift

@@ -71,7 +71,7 @@ extension TransportSecurity {
 
     case .anonymousClient, .mutualAuthentication:
       return .forServer(certificateChain: [.certificate(self.serverCert)],
-                        privateKey: .privateKey(SamplePrivateKey.server), 
+                        privateKey: .privateKey(SamplePrivateKey.server),
                         trustRoots: .certificates ([self.caCert]),
                         applicationProtocols: GRPCApplicationProtocolIdentifier.allCases.map { $0.rawValue })
     }

+ 1 - 1
Tests/SwiftGRPCNIOTests/NIOClientTLSFailureTests.swift

@@ -92,7 +92,7 @@ class NIOClientTLSFailureTests: XCTestCase {
     let connectionExpectation = self.makeClientConnectionExpectation()
 
     connection.assertError(fulfill: connectionExpectation) { error in
-      let clientError = (error as? GRPCError)?.error as? GRPCClientError
+      let clientError = (error as? GRPCError)?.wrappedError as? GRPCClientError
       XCTAssertEqual(clientError, .applicationLevelProtocolNegotiationFailed)
     }