Browse Source

Refactor client channel handlers into request and response handlers (#471)

* Refactor client channel handlers into request and response handlers

* Remove extraneous OutboundIn typealias

* Add a "flush" option to sendMessage
George Barnett 6 years ago
parent
commit
68d212e744

+ 1 - 1
Package.swift

@@ -28,7 +28,7 @@ var packageDependencies: [Package.Dependency] = [
   // Main SwiftNIO package
   .package(url: "https://github.com/apple/swift-nio.git", from: "2.2.0"),
   // HTTP2 via SwiftNIO
-  .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.2.0"),
+  .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.2.1"),
   // TLS via SwiftNIO
   .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.0.0"),
 ]

+ 84 - 190
Sources/SwiftGRPCNIO/ClientCalls/BaseClientCall.swift

@@ -25,23 +25,26 @@ import SwiftProtobuf
 /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
 /// channel will be configured as such:
 ///
-///                           ┌───────────────────────────┐
-///                           │ GRPCClientChannelHandler  │
-///                           └─▲───────────────────────┬─┘
-///   GRPCClientResponsePart<T1>│                       │GRPCClientRequestPart<T2>
-///                           ┌─┴───────────────────────▼─┐
-///                           │       GRPCClientCodec     │
-///                           └─▲───────────────────────┬─┘
-///    RawGRPCClientResponsePart│                       │RawGRPCClientRequestPart
-///                           ┌─┴───────────────────────▼─┐
-///                           │ HTTP1ToRawGRPCClientCodec │
-///                           └─▲───────────────────────┬─┘
-///       HTTPClientResponsePart│                       │HTTPClientRequestPart
-///                           ┌─┴───────────────────────▼─┐
-///                           │  HTTP2ToHTTP1ClientCodec  │
-///                           └─▲───────────────────────┬─┘
-///                   HTTP2Frame│                       │HTTP2Frame
-///                             |                       |
+///              ┌──────────────────────────────────┐
+///              │ GRPCClientResponseChannelHandler │
+///              └────────────▲─────────────────────┘
+///                           │  ┌─────────────────────────────────┐
+///                           │  │ GRPCClientRequestChannelHandler │
+///                           │  └────────────────────┬────────────┘
+/// GRPCClientResponsePart<T1>│                       │GRPCClientRequestPart<T2>
+///                         ┌─┴───────────────────────▼─┐
+///                         │       GRPCClientCodec     │
+///                         └─▲───────────────────────┬─┘
+///  RawGRPCClientResponsePart│                       │RawGRPCClientRequestPart
+///                         ┌─┴───────────────────────▼─┐
+///                         │ HTTP1ToRawGRPCClientCodec │
+///                         └─▲───────────────────────┬─┘
+///     HTTPClientResponsePart│                       │HTTPClientRequestPart
+///                         ┌─┴───────────────────────▼─┐
+///                         │  HTTP2ToHTTP1ClientCodec  │
+///                         └─▲───────────────────────┬─┘
+///                 HTTP2Frame│                       │HTTP2Frame
+///                           |                       |
 ///
 /// Note: below the `HTTP2ToHTTP1ClientCodec` is the "main" pipeline provided by the channel in
 /// `GRPCClientConnection`.
@@ -59,60 +62,65 @@ open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
   /// Promise for an HTTP/2 stream to execute the call on.
   internal let streamPromise: EventLoopPromise<Channel>
 
-  /// Client channel handler. Handles internal state for reading/writing messages to the channel.
-  /// The handler also owns the promises for the futures that this class surfaces to the user (such as
-  /// `initialMetadata` and `status`).
-  internal let clientChannelHandler: GRPCClientChannelHandler<RequestMessage, ResponseMessage>
+  /// Channel handler for responses.
+  internal let responseHandler: GRPCClientResponseChannelHandler<ResponseMessage>
+
+  /// Channel handler for requests.
+  internal let requestHandler: GRPCClientRequestChannelHandler<RequestMessage>
+
+  // Note: documentation is inherited from the `ClientCall` protocol.
+  public let subchannel: EventLoopFuture<Channel>
+  public let initialMetadata: EventLoopFuture<HTTPHeaders>
+  public let status: EventLoopFuture<GRPCStatus>
 
   /// Sets up a gRPC call.
   ///
-  /// A number of actions are performed:
-  /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`,
-  /// - a callback is registered on the new stream (`subchannel`) to send the request head,
-  /// - a timeout is scheduled if one is set in the `callOptions`.
+  /// This involves creating a new HTTP/2 stream on the multiplexer provided by `connection`. The
+  /// channel associated with the stream is configured to use the provided request and response
+  /// handlers. Note that the request head will be sent automatically from the request handler when
+  /// the channel becomes active.
   ///
   /// - Parameters:
   ///   - connection: connection containing the HTTP/2 channel and multiplexer to use for this call.
-  ///   - path: path for this RPC method.
-  ///   - callOptions: options to use when configuring this call.
-  ///   - responseObserver: observer for received messages.
+  ///   - responseHandler: a channel handler for receiving responses.
+  ///   - requestHandler: a channel handler for sending requests.
   init(
     connection: GRPCClientConnection,
-    path: String,
-    callOptions: CallOptions,
-    responseObserver: ResponseObserver<ResponseMessage>,
-    errorDelegate: ClientErrorDelegate?
+    responseHandler: GRPCClientResponseChannelHandler<ResponseMessage>,
+    requestHandler: GRPCClientRequestChannelHandler<RequestMessage>
   ) {
     self.connection = connection
+    self.responseHandler = responseHandler
+    self.requestHandler = requestHandler
     self.streamPromise = connection.channel.eventLoop.makePromise()
-    self.clientChannelHandler = GRPCClientChannelHandler(
-      initialMetadataPromise: connection.channel.eventLoop.makePromise(),
-      statusPromise: connection.channel.eventLoop.makePromise(),
-      responseObserver: responseObserver,
-      errorDelegate: errorDelegate)
+
+    self.subchannel = self.streamPromise.futureResult
+    self.initialMetadata = self.responseHandler.initialMetadataPromise.futureResult
+    self.status = self.responseHandler.statusPromise.futureResult
 
     self.streamPromise.futureResult.whenFailure { error in
-      self.clientChannelHandler.observeError(.unknown(error, origin: .client))
+      self.responseHandler.observeError(.unknown(error, origin: .client))
     }
 
     self.createStreamChannel()
-    self.setTimeout(callOptions.timeout)
-  }
-}
-
-extension BaseClientCall: ClientCall {
-  public var subchannel: EventLoopFuture<Channel> {
-    return self.streamPromise.futureResult
   }
 
-  public var initialMetadata: EventLoopFuture<HTTPHeaders> {
-    return self.clientChannelHandler.initialMetadataPromise.futureResult
-  }
-
-  public var status: EventLoopFuture<GRPCStatus> {
-    return self.clientChannelHandler.statusPromise.futureResult
+  /// Creates and configures an HTTP/2 stream channel. The `self.subchannel` future will hold the
+  /// stream channel once it has been created.
+  private func createStreamChannel() {
+    self.connection.channel.eventLoop.execute {
+      self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
+        subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.httpProtocol),
+                                        HTTP1ToRawGRPCClientCodec(),
+                                        GRPCClientCodec<RequestMessage, ResponseMessage>(),
+                                        self.requestHandler,
+                                        self.responseHandler)
+      }
+    }
   }
+}
 
+extension BaseClientCall: ClientCall {
   // Workaround for: https://bugs.swift.org/browse/SR-10128
   // Once resolved this can become a default implementation on `ClientCall`.
   public var trailingMetadata: EventLoopFuture<HTTPHeaders> {
@@ -128,143 +136,29 @@ extension BaseClientCall: ClientCall {
   }
 }
 
-extension BaseClientCall {
-  /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created.
-  ///
-  /// - Important: This should only ever be called once.
-  private func createStreamChannel() {
-    self.connection.channel.eventLoop.execute {
-      self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
-        subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.httpProtocol),
-                                        HTTP1ToRawGRPCClientCodec(),
-                                        GRPCClientCodec<RequestMessage, ResponseMessage>(),
-                                        self.clientChannelHandler)
-      }
-    }
-  }
-
-  /// Send the request head once `subchannel` becomes available.
-  ///
-  /// - Important: This should only ever be called once.
-  ///
-  /// - Parameters:
-  ///   - requestHead: The request head to send.
-  ///   - promise: A promise to fulfill once the request head has been sent.
-  internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise<Void>?) {
-    self.writeAndFlushOnStream(.head(requestHead), promise: promise)
-  }
-
-  /// Send the request head once `subchannel` becomes available.
-  ///
-  /// - Important: This should only ever be called once.
-  ///
-  /// - Parameter requestHead: The request head to send.
-  /// - Returns: A future which will be succeeded once the request head has been sent.
-  internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
-    let promise = connection.channel.eventLoop.makePromise(of: Void.self)
-    self.sendHead(requestHead, promise: promise)
-    return promise.futureResult
-  }
-
-  /// Send the given message once `subchannel` becomes available.
-  ///
-  /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
-  /// - Parameters:
-  ///   - message: The message to send.
-  ///   - promise: A promise to fulfil when the message reaches the network.
-  internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
-    self.writeAndFlushOnStream(.message(message), promise: promise)
-  }
-
-  /// Send the given message once `subchannel` becomes available.
-  ///
-  /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
-  /// - Returns: A future which will be fullfilled when the message reaches the network.
-  internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
-    let promise = connection.channel.eventLoop.makePromise(of: Void.self)
-    self._sendMessage(message, promise: promise)
-    return promise.futureResult
-  }
-
-  /// Send `end` once `subchannel` becomes available.
-  ///
-  /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
-  /// - Important: This should only ever be called once.
-  /// - Parameter promise: A promise to succeed once then end has been sent.
-  internal func _sendEnd(promise: EventLoopPromise<Void>?) {
-    self.writeAndFlushOnStream(.end, promise: promise)
-  }
-
-  /// Send `end` once `subchannel` becomes available.
-  ///
-  /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
-  /// - Important: This should only ever be called once.
-  ///- Returns: A future which will be succeeded once the end has been sent.
-  internal func _sendEnd() -> EventLoopFuture<Void> {
-    let promise = connection.channel.eventLoop.makePromise(of: Void.self)
-    self._sendEnd(promise: promise)
-    return promise.futureResult
-  }
-
-  /// Writes the given request on the future `Channel` for the HTTP/2 stream used to make this call.
-  ///
-  /// This method is intended to be used by the `sendX` methods in order to ensure that they fail
-  /// futures associated with this call should the write fail (e.g. due to a closed connection).
-  private func writeAndFlushOnStream(_ request: GRPCClientRequestPart<RequestMessage>, promise: EventLoopPromise<Void>?) {
-    // We need to use a promise here; if the write fails then it _must_ be observed by the handler
-    // to ensure that any futures given to the user are fulfilled.
-    let promise = promise ?? self.connection.channel.eventLoop.makePromise()
-
-    promise.futureResult.whenFailure { error in
-      self.clientChannelHandler.observeError(.unknown(error, origin: .client))
-    }
-
-    self.subchannel.cascadeFailure(to: promise)
-
-    self.subchannel.whenSuccess { channel in
-      channel.writeAndFlush(NIOAny(request), promise: promise)
-    }
-  }
-
-  /// Creates a client-side timeout for this call.
-  ///
-  /// - Important: This should only ever be called once.
-  private func setTimeout(_ timeout: GRPCTimeout) {
-    if timeout == .infinite { return }
-
-    self.connection.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
-      self?.subchannel.whenSuccess { stream in
-        stream.pipeline.fireUserInboundEventTriggered(GRPCClientUserEvent.timedOut(timeout))
-      }
-    }
-  }
-
-  /// Makes a new `HTTPRequestHead` for a call with this signature.
-  ///
-  /// - Parameters:
-  ///   - path: path for this RPC method.
-  ///   - host: the address of the host we are connected to.
-  ///   - callOptions: options to use when configuring this call.
-  /// - Returns: `HTTPRequestHead` configured for this call.
-  internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
-    var headers: HTTPHeaders = [
-      // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
-      "host": host,
-      "content-type": "application/grpc",
-      // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
-      "te": "trailers",
-      //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
-      "user-agent": "grpc-swift-nio",
-      GRPCHeaderName.acceptEncoding: CompressionMechanism.acceptEncodingHeader,
-    ]
-
-    if callOptions.timeout != .infinite {
-      headers.add(name: GRPCHeaderName.timeout, value: String(describing: callOptions.timeout))
-    }
-
-    headers.add(contentsOf: callOptions.customMetadata)
-
-    let method: HTTPMethod = callOptions.cacheable ? .GET : .POST
-    return HTTPRequestHead(version: .init(major: 2, minor: 0), method: method, uri: path, headers: headers)
-  }
+/// Makes a request head.
+///
+/// - Parameter path: The path of the gRPC call, e.g. "/serviceName/methodName".
+/// - Parameter host: The host serving the call.
+/// - Parameter callOptions: Options used when making this call.
+internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
+  var headers: HTTPHeaders = [
+    // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
+    "host": host,
+    "content-type": "application/grpc",
+    // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
+    "te": "trailers",
+    //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
+    "user-agent": "grpc-swift-nio",
+    GRPCHeaderName.acceptEncoding: CompressionMechanism.acceptEncodingHeader,
+  ]
+
+  if callOptions.timeout != .infinite {
+    headers.add(name: GRPCHeaderName.timeout, value: String(describing: callOptions.timeout))
+  }
+
+  headers.add(contentsOf: callOptions.customMetadata)
+
+  let method: HTTPMethod = callOptions.cacheable ? .GET : .POST
+  return HTTPRequestHead(version: .init(major: 2, minor: 0), method: method, uri: path, headers: headers)
 }

+ 15 - 20
Sources/SwiftGRPCNIO/ClientCalls/BidirectionalStreamingClientCall.swift

@@ -26,33 +26,28 @@ import NIO
 /// - `initialMetadata`: the initial metadata returned from the server,
 /// - `status`: the status of the gRPC call after it has ended,
 /// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
-public class BidirectionalStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage>, StreamingRequestClientCall {
+public final class BidirectionalStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>
+  : BaseClientCall<RequestMessage, ResponseMessage>,
+    StreamingRequestClientCall {
   private var messageQueue: EventLoopFuture<Void>
 
   public init(connection: GRPCClientConnection, path: String, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?, handler: @escaping (ResponseMessage) -> Void) {
     self.messageQueue = connection.channel.eventLoop.makeSucceededFuture(())
-    super.init(connection: connection, path: path, callOptions: callOptions, responseObserver: .callback(handler), errorDelegate: errorDelegate)
 
-    let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
-    self.messageQueue = self.messageQueue.flatMap {
-      self.sendHead(requestHead)
-    }
-  }
-
-  public func sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
-    return self._sendMessage(message)
-  }
+    let responseHandler = GRPCClientStreamingResponseChannelHandler(
+      initialMetadataPromise: connection.channel.eventLoop.makePromise(),
+      statusPromise: connection.channel.eventLoop.makePromise(),
+      errorDelegate: errorDelegate,
+      timeout: callOptions.timeout,
+      responseHandler: handler)
 
-  public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
-    self._sendMessage(message, promise: promise)
-  }
-
-  public func sendEnd() -> EventLoopFuture<Void> {
-    return self._sendEnd()
-  }
+    let requestHandler = GRPCClientStreamingRequestChannelHandler<RequestMessage>(
+      requestHead: makeRequestHead(path: path, host: connection.host, callOptions: callOptions))
 
-  public func sendEnd(promise: EventLoopPromise<Void>?) {
-    self._sendEnd(promise: promise)
+    super.init(
+      connection: connection,
+      responseHandler: responseHandler,
+      requestHandler: requestHandler)
   }
 
   public func newMessageQueue() -> EventLoopFuture<Void> {

+ 39 - 3
Sources/SwiftGRPCNIO/ClientCalls/ClientCall.swift

@@ -60,9 +60,11 @@ public protocol StreamingRequestClientCall: ClientCall {
   ///
   /// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
   ///
-  /// - Parameter message: The message to send.
+  /// - Parameters:
+  ///   - message: The message to
+  ///   - flush: Whether the buffer should be flushed after writing the message.
   /// - Returns: A future which will be fullfilled when the message has been sent.
-  func sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void>
+  func sendMessage(_ message: RequestMessage, flush: Bool) -> EventLoopFuture<Void>
 
   /// Sends a message to the service.
   ///
@@ -71,7 +73,8 @@ public protocol StreamingRequestClientCall: ClientCall {
   /// - Parameters:
   ///   - message: The message to send.
   ///   - promise: A promise to be fulfilled when the message has been sent.
-  func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?)
+  ///   - flush: Whether the buffer should be flushed after writing the message.
+  func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?, flush: Bool)
 
   /// Returns a future which can be used as a message queue.
   ///
@@ -107,3 +110,36 @@ public protocol UnaryResponseClientCall: ClientCall {
   /// Callers should rely on the `status` of the call for the canonical outcome.
   var response: EventLoopFuture<ResponseMessage> { get }
 }
+
+extension StreamingRequestClientCall {
+  public func sendMessage(_ message: RequestMessage, flush: Bool = true) -> EventLoopFuture<Void> {
+    return self.subchannel.flatMap { channel in
+      let writeFuture = channel.write(GRPCClientRequestPart.message(message))
+      if flush {
+        channel.flush()
+      }
+      return writeFuture
+    }
+  }
+
+  public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?, flush: Bool = true) {
+    self.subchannel.whenSuccess { channel in
+      channel.write(GRPCClientRequestPart.message(message), promise: promise)
+      if flush {
+        channel.flush()
+      }
+    }
+  }
+
+  public func sendEnd() -> EventLoopFuture<Void> {
+    return self.subchannel.flatMap { channel in
+      return channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.end)
+    }
+  }
+
+  public func sendEnd(promise: EventLoopPromise<Void>?) {
+    self.subchannel.whenSuccess { channel in
+      channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.end, promise: promise)
+    }
+  }
+}

+ 17 - 28
Sources/SwiftGRPCNIO/ClientCalls/ClientStreamingClientCall.swift

@@ -27,42 +27,31 @@ import NIO
 /// - `response`: the response from the call,
 /// - `status`: the status of the gRPC call after it has ended,
 /// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
-public class ClientStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage>, StreamingRequestClientCall, UnaryResponseClientCall {
+public final class ClientStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>
+  : BaseClientCall<RequestMessage, ResponseMessage>,
+    StreamingRequestClientCall,
+    UnaryResponseClientCall {
   public let response: EventLoopFuture<ResponseMessage>
   private var messageQueue: EventLoopFuture<Void>
 
   public init(connection: GRPCClientConnection, path: String, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?) {
-    let responsePromise: EventLoopPromise<ResponseMessage> = connection.channel.eventLoop.makePromise()
-    self.response = responsePromise.futureResult
+    let responseHandler = GRPCClientUnaryResponseChannelHandler<ResponseMessage>(
+      initialMetadataPromise: connection.channel.eventLoop.makePromise(),
+      responsePromise: connection.channel.eventLoop.makePromise(),
+      statusPromise: connection.channel.eventLoop.makePromise(),
+      errorDelegate: errorDelegate,
+      timeout: callOptions.timeout)
+
+    let requestHandler = GRPCClientStreamingRequestChannelHandler<RequestMessage>(
+      requestHead: makeRequestHead(path: path, host: connection.host, callOptions: callOptions))
+
+    self.response = responseHandler.responsePromise.futureResult
     self.messageQueue = connection.channel.eventLoop.makeSucceededFuture(())
 
     super.init(
       connection: connection,
-      path: path,
-      callOptions: callOptions,
-      responseObserver: .succeedPromise(responsePromise),
-      errorDelegate: errorDelegate)
-
-    let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
-    self.messageQueue = self.messageQueue.flatMap {
-      self.sendHead(requestHead)
-    }
-  }
-
-  public func sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
-    return self._sendMessage(message)
-  }
-
-  public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
-    self._sendMessage(message, promise: promise)
-  }
-
-  public func sendEnd() -> EventLoopFuture<Void> {
-    return self._sendEnd()
-  }
-
-  public func sendEnd(promise: EventLoopPromise<Void>?) {
-    self._sendEnd(promise: promise)
+      responseHandler: responseHandler,
+      requestHandler: requestHandler)
   }
 
   public func newMessageQueue() -> EventLoopFuture<Void> {

+ 0 - 48
Sources/SwiftGRPCNIO/ClientCalls/ResponseObserver.swift

@@ -1,48 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import NIO
-import SwiftProtobuf
-
-/// A response message observer.
-///
-/// - succeedPromise: succeed the given promise on receipt of a message.
-/// - callback: calls the given callback for each response observed.
-public enum ResponseObserver<ResponseMessage: Message> {
-  case succeedPromise(EventLoopPromise<ResponseMessage>)
-  case callback((ResponseMessage) -> Void)
-
-  /// Observe the given message.
-  func observe(_ message: ResponseMessage) {
-    switch self {
-    case .callback(let callback):
-      callback(message)
-
-    case .succeedPromise(let promise):
-      promise.succeed(message)
-    }
-  }
-
-  var expectsMultipleResponses: Bool {
-    switch self {
-    case .callback:
-      return true
-
-    case .succeedPromise:
-      return false
-    }
-  }
-}

+ 15 - 8
Sources/SwiftGRPCNIO/ClientCalls/ServerStreamingClientCall.swift

@@ -23,15 +23,22 @@ import NIO
 /// - `initialMetadata`: the initial metadata returned from the server,
 /// - `status`: the status of the gRPC call after it has ended,
 /// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
-public class ServerStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage> {
+public final class ServerStreamingClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage> {
   public init(connection: GRPCClientConnection, path: String, request: RequestMessage, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?, handler: @escaping (ResponseMessage) -> Void) {
-    super.init(connection: connection, path: path, callOptions: callOptions, responseObserver: .callback(handler), errorDelegate: errorDelegate)
+    let responseHandler = GRPCClientStreamingResponseChannelHandler(
+      initialMetadataPromise: connection.channel.eventLoop.makePromise(),
+      statusPromise: connection.channel.eventLoop.makePromise(),
+      errorDelegate: errorDelegate,
+      timeout: callOptions.timeout,
+      responseHandler: handler)
 
-    let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
-    self.sendHead(requestHead).flatMap {
-      self._sendMessage(request)
-    }.whenSuccess {
-      self._sendEnd(promise: nil)
-    }
+    let requestHandler = GRPCClientUnaryRequestChannelHandler<RequestMessage>(
+      requestHead: makeRequestHead(path: path, host: connection.host, callOptions: callOptions),
+      request: request)
+
+    super.init(
+      connection: connection,
+      responseHandler: responseHandler,
+      requestHandler: requestHandler)
   }
 }

+ 18 - 14
Sources/SwiftGRPCNIO/ClientCalls/UnaryClientCall.swift

@@ -16,6 +16,8 @@
 import Foundation
 import SwiftProtobuf
 import NIO
+import NIOHTTP1
+import NIOHTTP2
 
 /// A unary gRPC call. The request is sent on initialization.
 ///
@@ -24,25 +26,27 @@ import NIO
 /// - `response`: the response from the unary call,
 /// - `status`: the status of the gRPC call after it has ended,
 /// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
-public class UnaryClientCall<RequestMessage: Message, ResponseMessage: Message>: BaseClientCall<RequestMessage, ResponseMessage>, UnaryResponseClientCall {
+public final class UnaryClientCall<RequestMessage: Message, ResponseMessage: Message>
+  : BaseClientCall<RequestMessage, ResponseMessage>,
+    UnaryResponseClientCall {
   public let response: EventLoopFuture<ResponseMessage>
 
   public init(connection: GRPCClientConnection, path: String, request: RequestMessage, callOptions: CallOptions, errorDelegate: ClientErrorDelegate?) {
-    let responsePromise: EventLoopPromise<ResponseMessage> = connection.channel.eventLoop.makePromise()
-    self.response = responsePromise.futureResult
+    let responseHandler = GRPCClientUnaryResponseChannelHandler<ResponseMessage>(
+      initialMetadataPromise: connection.channel.eventLoop.makePromise(),
+      responsePromise: connection.channel.eventLoop.makePromise(),
+      statusPromise: connection.channel.eventLoop.makePromise(),
+      errorDelegate: errorDelegate,
+      timeout: callOptions.timeout)
 
+    let requestHandler = GRPCClientUnaryRequestChannelHandler<RequestMessage>(
+      requestHead: makeRequestHead(path: path, host: connection.host, callOptions: callOptions),
+      request: request)
+
+    self.response = responseHandler.responsePromise.futureResult
     super.init(
       connection: connection,
-      path: path,
-      callOptions: callOptions,
-      responseObserver: .succeedPromise(responsePromise),
-      errorDelegate: errorDelegate)
-
-    let requestHead = self.makeRequestHead(path: path, host: connection.host, callOptions: callOptions)
-    self.sendHead(requestHead).flatMap {
-      self._sendMessage(request)
-    }.whenSuccess {
-      self._sendEnd(promise: nil)
-    }
+      responseHandler: responseHandler,
+      requestHandler: requestHandler)
   }
 }

+ 67 - 0
Sources/SwiftGRPCNIO/GRPCClientRequestChannelHandler.swift

@@ -0,0 +1,67 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import SwiftProtobuf
+import NIO
+import NIOHTTP1
+
+/// A base channel handler for client requests.
+internal class GRPCClientRequestChannelHandler<RequestMessage: Message>: ChannelInboundHandler {
+  typealias InboundIn = Never
+  typealias OutboundOut = GRPCClientRequestPart<RequestMessage>
+
+  /// The request head to send.
+  internal let requestHead: HTTPRequestHead
+
+  init(requestHead: HTTPRequestHead) {
+    self.requestHead = requestHead
+  }
+
+  public func channelActive(context: ChannelHandlerContext) {
+    // If we don't provide a method here the default implementation on protocol (i.e. no-op) will be
+    // used in subclasses, even if they implement channelActive(context:).
+  }
+}
+
+/// A channel handler for unary client requests.
+///
+/// Sends the request head, message and end on `channelActive(context:)`.
+internal final class GRPCClientUnaryRequestChannelHandler<RequestMessage: Message>: GRPCClientRequestChannelHandler<RequestMessage> {
+  /// The request to send.
+  internal let request: RequestMessage
+
+  init(requestHead: HTTPRequestHead, request: RequestMessage) {
+    self.request = request
+    super.init(requestHead: requestHead)
+  }
+
+  override public func channelActive(context: ChannelHandlerContext) {
+    context.write(self.wrapOutboundOut(.head(self.requestHead)), promise: nil)
+    context.write(self.wrapOutboundOut(.message(self.request)), promise: nil)
+    context.writeAndFlush(self.wrapOutboundOut(.end), promise: nil)
+    context.fireChannelActive()
+  }
+}
+
+/// A channel handler for client calls which stream requests.
+///
+/// Sends the request head on `channelActive(context:)`.
+internal final class GRPCClientStreamingRequestChannelHandler<RequestMessage: Message>: GRPCClientRequestChannelHandler<RequestMessage> {
+  override public func channelActive(context: ChannelHandlerContext) {
+    context.writeAndFlush(self.wrapOutboundOut(.head(self.requestHead)), promise: nil)
+    context.fireChannelActive()
+  }
+}

+ 128 - 81
Sources/SwiftGRPCNIO/GRPCClientChannelHandler.swift → Sources/SwiftGRPCNIO/GRPCClientResponseChannelHandler.swift

@@ -14,34 +14,26 @@
  * limitations under the License.
  */
 import Foundation
+import SwiftProtobuf
 import NIO
 import NIOHTTP1
-import SwiftProtobuf
 
-/// The final client-side channel handler.
+/// A base channel handler for receiving responses.
 ///
-/// This handler holds promises for the initial metadata and the status, as well as an observer
-/// for responses. For unary and client-streaming calls the observer will succeed a response
-/// promise. For server-streaming and bidirectional-streaming the observer will call the supplied
-/// callback with each response received.
-///
-/// Errors are also handled by the channel handler. Promises for the initial metadata and
-/// response (if applicable) are failed with first error received. The status promise is __succeeded__
-/// with the error as the result of `GRPCStatusTransformable.asGRPCStatus()`, if available.
-/// The stream is also closed and any inbound or outbound messages are ignored.
-internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage: Message> {
+/// This includes hold promises for the initial metadata and status of the gRPC call. This handler
+/// is also responsible for error handling, via an error delegate and by appropriately failing the
+/// aforementioned promises.
+internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
+  public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
+
   internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
   internal let statusPromise: EventLoopPromise<GRPCStatus>
-  internal let responseObserver: ResponseObserver<ResponseMessage>
-  internal let errorDelegate: ClientErrorDelegate?
 
-  /// A promise for a unary response.
-  internal var responsePromise: EventLoopPromise<ResponseMessage>? {
-    guard case .succeedPromise(let promise) = responseObserver else { return nil }
-    return promise
-  }
+  internal let timeout: GRPCTimeout
+  internal var timeoutTask: Scheduled<Void>?
+  internal let errorDelegate: ClientErrorDelegate?
 
-  private enum InboundState {
+  internal enum InboundState {
     case expectingHeadersOrStatus
     case expectingMessageOrStatus
     case expectingStatus
@@ -58,32 +50,45 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
     }
   }
 
-  private enum OutboundState {
-    case expectingHead
-    case expectingMessageOrEnd
-    case ignore
+  /// The arity of a response.
+  internal enum ResponseArity {
+    case one
+    case many
+
+    /// The inbound state after receiving a response.
+    var inboundStateAfterResponse: InboundState {
+      switch self {
+      case .one:
+        return .expectingStatus
+      case .many:
+        return .expectingMessageOrStatus
+      }
+    }
   }
 
+  private let responseArity: ResponseArity
   private var inboundState: InboundState = .expectingHeadersOrStatus
-  private var outboundState: OutboundState = .expectingHead
 
-  /// Creates a new `GRPCClientChannelHandler`.
+  /// Creates a new `GRPCClientResponseChannelHandler`.
   ///
   /// - Parameters:
-  ///   - initialMetadataPromise: a promise to succeed on receiving the initial metadata from the service.
-  ///   - statusPromise: a promise to succeed with the outcome of the call.
-  ///   - responseObserver: an observer for response messages from the server; for unary responses this should
-  ///     be the `succeedPromise` case.
+  ///   - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
+  ///   - statusPromise: A promise to succeed with the outcome of the call.
+  ///   - errorDelegate: An error delegate to call when errors are observed.
+  ///   - timeout: The call timeout specified by the user.
+  ///   - expectedResponses: The number of responses expected.
   public init(
     initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
     statusPromise: EventLoopPromise<GRPCStatus>,
-    responseObserver: ResponseObserver<ResponseMessage>,
-    errorDelegate: ClientErrorDelegate?
+    errorDelegate: ClientErrorDelegate?,
+    timeout: GRPCTimeout,
+    expectedResponses: ResponseArity
   ) {
     self.initialMetadataPromise = initialMetadataPromise
     self.statusPromise = statusPromise
-    self.responseObserver = responseObserver
     self.errorDelegate = errorDelegate
+    self.timeout = timeout
+    self.responseArity = expectedResponses
   }
 
   /// Observe the given status.
@@ -96,9 +101,9 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
   internal func observeStatus(_ status: GRPCStatus) {
     if status.code != .ok {
       self.initialMetadataPromise.fail(status)
-      self.responsePromise?.fail(status)
     }
     self.statusPromise.succeed(status)
+    self.timeoutTask?.cancel()
   }
 
   /// Observe the given error.
@@ -112,23 +117,25 @@ internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage
     self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
     self.observeStatus(error.asGRPCStatus())
   }
-}
 
-extension GRPCClientChannelHandler: ChannelInboundHandler {
-  public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
+  /// Called when a response is received. Subclasses should override this method.
+  ///
+  /// - Parameter response: The received response.
+  internal func onResponse(_ response: ResponseMessage) {
+    // no-op
+  }
 
   /// Reads inbound data.
   ///
   /// On receipt of:
   /// - headers: the initial metadata promise is succeeded.
-  /// - message: the message observer is called with the message; for unary responses a response
-  ///   promise is succeeded, otherwise a callback is called.
+  /// - message: `onResponse(_:)` is called with the received message.
   /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
-  ///   and response promise (if available) are failed with the status. The channel is then closed.
+  ///   and response promise (if available) are failed with the status.
   public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
     guard self.inboundState != .ignore else { return }
 
-    switch unwrapInboundIn(data) {
+    switch self.unwrapInboundIn(data) {
     case .headers(let headers):
       guard self.inboundState == .expectingHeadersOrStatus else {
         self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)")))
@@ -144,8 +151,8 @@ extension GRPCClientChannelHandler: ChannelInboundHandler {
         return
       }
 
-      self.responseObserver.observe(message)
-      self.inboundState = self.responseObserver.expectsMultipleResponses ? .expectingMessageOrStatus : .expectingStatus
+      self.onResponse(message)
+      self.inboundState = self.responseArity.inboundStateAfterResponse
 
     case .status(let status):
       guard self.inboundState.expectingStatus else {
@@ -154,7 +161,6 @@ extension GRPCClientChannelHandler: ChannelInboundHandler {
       }
 
       self.observeStatus(status)
-
       // We don't expect any more requests/responses beyond this point and we don't need to close
       // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
     }
@@ -167,57 +173,101 @@ extension GRPCClientChannelHandler: ChannelInboundHandler {
         // We shouldn't observe an error since this event is triggered by the user: just observe the
         // status.
         self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus())
-        self.close(context: context, mode: .all, promise: nil)
+        context.close(promise: nil)
+      }
+    }
+  }
 
-      case .timedOut(let timeout):
-        self.observeError(GRPCError.client(.deadlineExceeded(timeout)))
-        self.close(context: context, mode: .all, promise: nil)
+  public func channelActive(context: ChannelHandlerContext) {
+    if self.timeout != .infinite {
+      let timeout = self.timeout
+      self.timeoutTask = context.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
+        self?.errorCaught(context: context, error: GRPCError.client(.deadlineExceeded(timeout)))
       }
     }
   }
-}
 
-extension GRPCClientChannelHandler: ChannelOutboundHandler {
-  public typealias OutboundIn = GRPCClientRequestPart<RequestMessage>
-  public typealias OutboundOut = GRPCClientRequestPart<RequestMessage>
+  public func channelInactive(context: ChannelHandlerContext) {
+    self.inboundState = .ignore
+    self.observeStatus(.init(code: .unavailable, message: nil))
+    context.fireChannelInactive()
+  }
 
-  public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
-    guard self.outboundState != .ignore else { return }
+  /// Observe an error from the pipeline and close the channel.
+  public func errorCaught(context: ChannelHandlerContext, error: Error) {
+    self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
+    context.close(mode: .all, promise: nil)
+  }
+}
 
-    switch self.unwrapOutboundIn(data) {
-    case .head:
-      guard self.outboundState == .expectingHead else {
-        self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.outboundState)")))
-        return
-      }
+/// A channel handler for client calls which recieve a single response.
+final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
+  let responsePromise: EventLoopPromise<ResponseMessage>
 
-      context.write(data, promise: promise)
-      self.outboundState = .expectingMessageOrEnd
+  internal init(
+    initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
+    responsePromise: EventLoopPromise<ResponseMessage>,
+    statusPromise: EventLoopPromise<GRPCStatus>,
+    errorDelegate: ClientErrorDelegate?,
+    timeout: GRPCTimeout
+  ) {
+    self.responsePromise = responsePromise
+
+    super.init(
+      initialMetadataPromise: initialMetadataPromise,
+      statusPromise: statusPromise,
+      errorDelegate: errorDelegate,
+      timeout: timeout,
+      expectedResponses: .one
+    )
+  }
 
-    default:
-      guard self.outboundState == .expectingMessageOrEnd else {
-        self.errorCaught(context: context, error: GRPCError.client(.invalidState("received message or end while in state \(self.outboundState)")))
-        return
-      }
+  /// Succeeds the response promise with the given response.
+  ///
+  /// - Parameter response: The response received from the service.
+  override func onResponse(_ response: ResponseMessage) {
+    self.responsePromise.succeed(response)
+  }
 
-      context.write(data, promise: promise)
+  /// Fails the response promise if the given status is not `.ok`.
+  override func observeStatus(_ status: GRPCStatus) {
+    super.observeStatus(status)
+
+    if status.code != .ok {
+      self.responsePromise.fail(status)
     }
   }
 }
 
-extension GRPCClientChannelHandler {
-  /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore.
-  public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
-    context.close(mode: mode, promise: promise)
+/// A channel handler for client calls which recieve a stream of responses.
+final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
+  typealias ResponseHandler = (ResponseMessage) -> Void
 
-    self.inboundState = .ignore
-    self.outboundState = .ignore
+  let responseHandler: ResponseHandler
+
+  internal init(
+    initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
+    statusPromise: EventLoopPromise<GRPCStatus>,
+    errorDelegate: ClientErrorDelegate?,
+    timeout: GRPCTimeout,
+    responseHandler: @escaping ResponseHandler
+  ) {
+    self.responseHandler = responseHandler
+
+    super.init(
+      initialMetadataPromise: initialMetadataPromise,
+      statusPromise: statusPromise,
+      errorDelegate: errorDelegate,
+      timeout: timeout,
+      expectedResponses: .many
+    )
   }
 
-  /// Observe an error from the pipeline and close the channel.
-  public func errorCaught(context: ChannelHandlerContext, error: Error) {
-    self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
-    context.close(mode: .all, promise: nil)
+  /// Calls a user-provided handler with the given response.
+  ///
+  /// - Parameter response: The response received from the service.
+  override func onResponse(_ response: ResponseMessage) {
+    self.responseHandler(response)
   }
 }
 
@@ -225,7 +275,4 @@ extension GRPCClientChannelHandler {
 public enum GRPCClientUserEvent {
   /// The call has been cancelled.
   case cancelled
-
-  /// The call did not complete before the deadline was exceeded.
-  case timedOut(GRPCTimeout)
 }

+ 4 - 3
Tests/SwiftGRPCNIOTests/GRPCStatusCodeTests.swift

@@ -37,11 +37,12 @@ class GRPCStatusCodeTests: XCTestCase {
     try! self.channel.pipeline.addHandlers([
       HTTP1ToRawGRPCClientCodec(),
       GRPCClientCodec<Echo_EchoRequest, Echo_EchoResponse>(),
-      GRPCClientChannelHandler<Echo_EchoRequest, Echo_EchoResponse>(
+      GRPCClientUnaryResponseChannelHandler<Echo_EchoResponse>(
         initialMetadataPromise: self.metadataPromise,
+        responsePromise: self.responsePromise,
         statusPromise: self.statusPromise,
-        responseObserver: .succeedPromise(self.responsePromise),
-        errorDelegate: nil)
+        errorDelegate: nil,
+        timeout: .infinite)
     ]).wait()
   }