瀏覽代碼

Remove dead code (#1078)

George Barnett 5 年之前
父節點
當前提交
f23d374af0

+ 0 - 42
Sources/GRPC/ClientCalls/ClientCall.swift

@@ -164,45 +164,3 @@ public protocol UnaryResponseClientCall: ClientCall {
   /// Callers should rely on the `status` of the call for the canonical outcome.
   var response: EventLoopFuture<ResponsePayload> { get }
 }
-
-// These protocols sit between a `*ClientCall` and the channel. The intention behind them is to
-// allow the `*ClientCall` types to use a different transport mechanisms. Normally this will be
-// a NIO HTTP/2 stream channel.
-
-internal protocol ClientCallInbound {
-  associatedtype Response
-  typealias ResponsePart = _GRPCClientResponsePart<Response>
-
-  /// Receive an error.
-  func receiveError(_ error: Error)
-
-  /// Receive a response part.
-  func receiveResponse(_ part: ResponsePart)
-}
-
-internal protocol ClientCallOutbound {
-  associatedtype Request
-  typealias RequestPart = _GRPCClientRequestPart<Request>
-
-  /// Send a single request part and complete the promise once the part has been sent.
-  func sendRequest(_ part: RequestPart, promise: EventLoopPromise<Void>?)
-
-  /// Send the given request parts and complete the promise once all parts have been sent.
-  func sendRequests<S: Sequence>(_ parts: S, promise: EventLoopPromise<Void>?)
-    where S.Element == RequestPart
-
-  /// Cancel the call and complete the promise once the cancellation has been completed.
-  func cancel(promise: EventLoopPromise<Void>?)
-}
-
-extension ClientCallOutbound {
-  /// Convenience method for sending all parts of a unary-request RPC.
-  internal func sendUnary(_ head: _GRPCRequestHead, request: Request, compressed: Bool) {
-    let parts: [RequestPart] = [
-      .head(head),
-      .message(_MessageContext(request, compressed: compressed)),
-      .end,
-    ]
-    self.sendRequests(parts, promise: nil)
-  }
-}

+ 0 - 722
Sources/GRPC/ClientCalls/ClientCallTransport.swift

@@ -1,722 +0,0 @@
-/*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Logging
-import NIO
-import NIOHPACK
-import NIOHTTP2
-
-/// This class provides much of the boilerplate for the four types of gRPC call objects returned to
-/// framework users. It is the glue between a call object and the underlying transport (typically a
-/// NIO Channel).
-///
-/// Typically, each call will be configured on an HTTP/2 stream channel. The stream channel will
-/// will be configured as such:
-///
-/// ```
-///                           ┌────────────────────────────────────┐
-///                           │ ChannelTransport<Request,Response> │
-///                           └─────▲───────────────────────┬──────┘
-///                                 │                       │
-/// --------------------------------│-----------------------│------------------------------
-/// HTTP2StreamChannel              │                       │
-///                    ┌────────────┴──────────┐            │
-///                    │ GRPCClientCallHandler │            │
-///                    └────────────▲──────────┘            │
-/// GRPCClientResponsePart<Response>│                       │GRPCClientRequestPart<Request>
-///                               ┌─┴───────────────────────▼─┐
-///                               │ GRPCClientChannelHandler  │
-///                               └─▲───────────────────────┬─┘
-///                       HTTP2Frame│                       │HTTP2Frame
-///                                 |                       |
-/// ```
-///
-/// Note: the "main" pipeline provided by the channel in `ClientConnection`.
-internal class ChannelTransport<Request, Response> {
-  internal typealias RequestPart = _GRPCClientRequestPart<Request>
-  internal typealias ResponsePart = _GRPCClientResponsePart<Response>
-
-  /// The `EventLoop` this call is running on.
-  internal let eventLoop: EventLoop
-
-  /// A logger.
-  private let logger: Logger
-
-  /// The current state of the call.
-  private var state: State
-
-  /// A scheduled timeout for the call.
-  private var scheduledTimeout: Scheduled<Void>?
-
-  // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
-  // have 3 parts.
-  /// A buffer to store requests in before the channel has become active.
-  private var requestBuffer = MarkedCircularBuffer<BufferedRequest>(initialCapacity: 4)
-
-  /// A request that we'll deal with at a later point in time.
-  private struct BufferedRequest {
-    /// The request to write.
-    var message: _GRPCClientRequestPart<Request>
-
-    /// Any promise associated with the request.
-    var promise: EventLoopPromise<Void>?
-  }
-
-  /// An error delegate provided by the user.
-  private var errorDelegate: ClientErrorDelegate?
-
-  /// A container for response part promises for the call.
-  internal var responseContainer: ResponsePartContainer<Response>
-
-  /// A stopwatch for timing the RPC.
-  private var stopwatch: Stopwatch?
-
-  enum State {
-    // Waiting for a stream to become active.
-    //
-    // Valid transitions:
-    // - active
-    // - closed
-    case buffering(EventLoopFuture<Channel>)
-
-    // We have a channel, we're doing the RPC, there may be a timeout.
-    //
-    // Valid transitions:
-    // - closed
-    case active(Channel)
-
-    // We're closed; the RPC is done for one reason or another. This is terminal.
-    case closed
-  }
-
-  private init(
-    eventLoop: EventLoop,
-    state: State,
-    responseContainer: ResponsePartContainer<Response>,
-    errorDelegate: ClientErrorDelegate?,
-    logger: Logger
-  ) {
-    self.eventLoop = eventLoop
-    self.state = state
-    self.responseContainer = responseContainer
-    self.errorDelegate = errorDelegate
-    self.logger = logger
-
-    self.startTimer()
-  }
-
-  internal convenience init(
-    eventLoop: EventLoop,
-    responseContainer: ResponsePartContainer<Response>,
-    timeLimit: TimeLimit,
-    errorDelegate: ClientErrorDelegate?,
-    logger: Logger,
-    channelProvider: (ChannelTransport<Request, Response>, EventLoopPromise<Channel>) -> Void
-  ) {
-    let channelPromise = eventLoop.makePromise(of: Channel.self)
-
-    self.init(
-      eventLoop: eventLoop,
-      state: .buffering(channelPromise.futureResult),
-      responseContainer: responseContainer,
-      errorDelegate: errorDelegate,
-      logger: logger
-    )
-
-    // If the channel creation fails we need to error the call. Note that we receive an
-    // 'activation' from the channel instead of relying on the success of the future.
-    channelPromise.futureResult.whenFailure { error in
-      if error is GRPCStatus || error is GRPCStatusTransformable {
-        self.handleError(error, promise: nil)
-      } else {
-        // Fallback to something which will mark the RPC as 'unavailable'.
-        self.handleError(ConnectionFailure(reason: error), promise: nil)
-      }
-    }
-
-    // Schedule the timeout.
-    self.setUpTimeLimit(timeLimit)
-
-    // Now attempt to make the channel.
-    channelProvider(self, channelPromise)
-  }
-
-  internal convenience init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
-    multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
-    serializer: Serializer,
-    deserializer: Deserializer,
-    responseContainer: ResponsePartContainer<Response>,
-    callType: GRPCCallType,
-    timeLimit: TimeLimit,
-    errorDelegate: ClientErrorDelegate?,
-    logger: Logger
-  ) where Serializer.Input == Request, Deserializer.Output == Response {
-    self.init(
-      eventLoop: multiplexer.eventLoop,
-      responseContainer: responseContainer,
-      timeLimit: timeLimit,
-      errorDelegate: errorDelegate,
-      logger: logger
-    ) { call, streamPromise in
-      multiplexer.whenComplete { result in
-        switch result {
-        case let .success(mux):
-          mux.createStreamChannel(promise: streamPromise) { stream in
-            stream.pipeline.addHandlers([
-              _GRPCClientChannelHandler(callType: callType, logger: logger),
-              GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
-              GRPCClientCallHandler(call: call),
-            ])
-          }
-
-        case let .failure(error):
-          streamPromise.fail(error)
-        }
-      }
-    }
-  }
-
-  internal convenience init(
-    fakeResponse: _FakeResponseStream<Request, Response>,
-    responseContainer: ResponsePartContainer<Response>,
-    timeLimit: TimeLimit,
-    logger: Logger
-  ) {
-    self.init(
-      eventLoop: fakeResponse.channel.eventLoop,
-      responseContainer: responseContainer,
-      timeLimit: timeLimit,
-      errorDelegate: nil,
-      logger: logger
-    ) { call, streamPromise in
-      fakeResponse.channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).map {
-        fakeResponse.channel
-      }.cascade(to: streamPromise)
-    }
-  }
-
-  /// Makes a transport whose channel promise is failed immediately.
-  internal static func makeTransportForMissingFakeResponse(
-    eventLoop: EventLoop,
-    responseContainer: ResponsePartContainer<Response>,
-    logger: Logger
-  ) -> ChannelTransport<Request, Response> {
-    return .init(
-      eventLoop: eventLoop,
-      responseContainer: responseContainer,
-      timeLimit: .none,
-      errorDelegate: nil,
-      logger: logger
-    ) { _, promise in
-      let error = GRPCStatus(
-        code: .unavailable,
-        message: "No fake response was registered before starting an RPC."
-      )
-      promise.fail(error)
-    }
-  }
-}
-
-// MARK: - Call API (i.e. called from {Unary,ClientStreaming,...}Call)
-
-extension ChannelTransport: ClientCallOutbound {
-  /// Send a request part.
-  ///
-  /// Does not have to be called from the event loop.
-  internal func sendRequest(_ part: RequestPart, promise: EventLoopPromise<Void>?) {
-    if self.eventLoop.inEventLoop {
-      self.writePart(part, flush: true, promise: promise)
-    } else {
-      self.eventLoop.execute {
-        self.writePart(part, flush: true, promise: promise)
-      }
-    }
-  }
-
-  /// Send multiple request parts.
-  ///
-  /// Does not have to be called from the event loop.
-  internal func sendRequests<S>(
-    _ parts: S,
-    promise: EventLoopPromise<Void>?
-  ) where S: Sequence, S.Element == RequestPart {
-    if self.eventLoop.inEventLoop {
-      self._sendRequests(parts, promise: promise)
-    } else {
-      self.eventLoop.execute {
-        self._sendRequests(parts, promise: promise)
-      }
-    }
-  }
-
-  /// Request that the RPC is cancelled.
-  ///
-  /// Does not have to be called from the event loop.
-  internal func cancel(promise: EventLoopPromise<Void>?) {
-    self.logger.info("rpc cancellation requested", source: "GRPC")
-
-    if self.eventLoop.inEventLoop {
-      self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
-    } else {
-      self.eventLoop.execute {
-        self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
-      }
-    }
-  }
-
-  /// Returns the `Channel` for the HTTP/2 stream that this RPC is using.
-  internal func streamChannel() -> EventLoopFuture<Channel> {
-    if self.eventLoop.inEventLoop {
-      return self.getStreamChannel()
-    } else {
-      return self.eventLoop.flatSubmit {
-        self.getStreamChannel()
-      }
-    }
-  }
-}
-
-extension ChannelTransport {
-  /// Return a future for the stream channel.
-  ///
-  /// Must be called from the event loop.
-  private func getStreamChannel() -> EventLoopFuture<Channel> {
-    self.eventLoop.preconditionInEventLoop()
-
-    switch self.state {
-    case let .buffering(future):
-      return future
-
-    case let .active(channel):
-      return self.eventLoop.makeSucceededFuture(channel)
-
-    case .closed:
-      return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
-    }
-  }
-
-  /// Send many requests.
-  ///
-  /// Must be called from the event loop.
-  private func _sendRequests<S>(
-    _ parts: S,
-    promise: EventLoopPromise<Void>?
-  ) where S: Sequence, S.Element == RequestPart {
-    self.eventLoop.preconditionInEventLoop()
-
-    // We have a promise: create one for each request part and cascade the overall result to it.
-    // If we're flushing we'll do it at the end.
-    if let promise = promise {
-      let loop = promise.futureResult.eventLoop
-
-      let futures: [EventLoopFuture<Void>] = parts.map { part in
-        let partPromise = loop.makePromise(of: Void.self)
-        self.writePart(part, flush: false, promise: partPromise)
-        return partPromise.futureResult
-      }
-
-      // Cascade the futures to the provided promise.
-      EventLoopFuture.andAllSucceed(futures, on: loop).cascade(to: promise)
-    } else {
-      for part in parts {
-        self.writePart(part, flush: false, promise: nil)
-      }
-    }
-
-    // Now flush.
-    self.flush()
-  }
-
-  /// Buffer or send a flush.
-  ///
-  /// Must be called from the event loop.
-  private func flush() {
-    self.eventLoop.preconditionInEventLoop()
-
-    switch self.state {
-    case .buffering:
-      self.requestBuffer.mark()
-
-    case let .active(stream):
-      stream.flush()
-
-    case .closed:
-      ()
-    }
-  }
-
-  /// Write a request part.
-  ///
-  /// Must be called from the event loop.
-  ///
-  /// - Parameters:
-  ///   - part: The part to write.
-  ///   - flush: Whether we should flush the channel after this write.
-  ///   - promise: A promise to fulfill when the part has been written.
-  private func writePart(_ part: RequestPart, flush: Bool, promise: EventLoopPromise<Void>?) {
-    self.eventLoop.assertInEventLoop()
-
-    switch self.state {
-    // We're buffering, so buffer the message.
-    case .buffering:
-      self.logger.debug("buffering request part", metadata: [
-        "request_part": "\(part.name)",
-        "call_state": "\(self.describeCallState())",
-      ], source: "GRPC")
-      self.requestBuffer.append(BufferedRequest(message: part, promise: promise))
-      if flush {
-        self.requestBuffer.mark()
-      }
-
-    // We have an active stream, just pass the write and promise through.
-    case let .active(stream):
-      self.logger.debug(
-        "writing request part",
-        metadata: ["request_part": "\(part.name)"],
-        source: "GRPC"
-      )
-      stream.write(part, promise: promise)
-      if flush {
-        stream.flush()
-      }
-
-    // We're closed: drop the request.
-    case .closed:
-      self.logger.debug("dropping request part", metadata: [
-        "request_part": "\(part.name)",
-        "call_state": "\(self.describeCallState())",
-      ], source: "GRPC")
-      promise?.fail(ChannelError.ioOnClosedChannel)
-    }
-  }
-
-  /// The scheduled timeout triggered: timeout the RPC if it's not yet finished.
-  ///
-  /// Must be called from the event loop.
-  private func timedOut(after timeLimit: TimeLimit) {
-    self.eventLoop.preconditionInEventLoop()
-
-    let error = GRPCError.RPCTimedOut(timeLimit).captureContext()
-    self.handleError(error, promise: nil)
-  }
-
-  /// Handle an error and optionally fail the provided promise with the error.
-  ///
-  /// Must be called from the event loop.
-  private func handleError(_ error: Error, promise: EventLoopPromise<Void>?) {
-    self.eventLoop.preconditionInEventLoop()
-
-    switch self.state {
-    // We only care about errors if we're not shutdown yet.
-    case .buffering, .active:
-      // Add our current state to the logger we provide to the callback.
-      var loggerWithState = self.logger
-      loggerWithState[metadataKey: "call_state"] = "\(self.describeCallState())"
-      let errorStatus: GRPCStatus
-      let errorWithoutContext: Error
-
-      if let errorWithContext = error as? GRPCError.WithContext {
-        errorStatus = errorWithContext.error.makeGRPCStatus()
-        errorWithoutContext = errorWithContext.error
-        self.errorDelegate?.didCatchError(
-          errorWithContext.error,
-          logger: loggerWithState,
-          file: errorWithContext.file,
-          line: errorWithContext.line
-        )
-      } else if let transformable = error as? GRPCStatusTransformable {
-        errorStatus = transformable.makeGRPCStatus()
-        errorWithoutContext = error
-        self.errorDelegate?.didCatchErrorWithoutContext(error, logger: loggerWithState)
-      } else {
-        errorStatus = .processingError
-        errorWithoutContext = error
-        self.errorDelegate?.didCatchErrorWithoutContext(error, logger: loggerWithState)
-      }
-
-      // Update our state: we're closing.
-      self.close(error: errorWithoutContext, status: errorStatus)
-      promise?.fail(errorStatus)
-
-    case .closed:
-      promise?.fail(ChannelError.alreadyClosed)
-    }
-  }
-
-  /// Close the call, if it's not yet closed with the given status.
-  ///
-  /// Must be called from the event loop.
-  private func close(error: Error, status: GRPCStatus) {
-    self.eventLoop.preconditionInEventLoop()
-
-    switch self.state {
-    case let .buffering(streamFuture):
-      // We're closed now.
-      self.state = .closed
-      self.stopTimer(status: status)
-
-      // We're done; cancel the timeout.
-      self.scheduledTimeout?.cancel()
-      self.scheduledTimeout = nil
-
-      // Fail any outstanding promises.
-      self.responseContainer.fail(with: error, status: status)
-
-      // Fail any buffered writes.
-      while !self.requestBuffer.isEmpty {
-        let write = self.requestBuffer.removeFirst()
-        write.promise?.fail(status)
-      }
-
-      // Close the channel, if it comes up.
-      streamFuture.whenSuccess {
-        $0.close(mode: .all, promise: nil)
-      }
-
-    case let .active(channel):
-      // We're closed now.
-      self.state = .closed
-      self.stopTimer(status: status)
-
-      // We're done; cancel the timeout.
-      self.scheduledTimeout?.cancel()
-      self.scheduledTimeout = nil
-
-      // Fail any outstanding promises.
-      self.responseContainer.fail(with: error, status: status)
-
-      // Close the channel.
-      channel.close(mode: .all, promise: nil)
-
-    case .closed:
-      ()
-    }
-  }
-}
-
-// MARK: - Channel Inbound
-
-extension ChannelTransport: ClientCallInbound {
-  /// Receive an error from the Channel.
-  ///
-  /// Must be called on the event loop.
-  internal func receiveError(_ error: Error) {
-    self.eventLoop.preconditionInEventLoop()
-    self.handleError(error, promise: nil)
-  }
-
-  /// Receive a response part from the Channel.
-  ///
-  /// Must be called on the event loop.
-  func receiveResponse(_ part: _GRPCClientResponsePart<Response>) {
-    self.eventLoop.preconditionInEventLoop()
-
-    switch self.state {
-    case .buffering:
-      preconditionFailure("Received response part in 'buffering' state")
-
-    case .active:
-      self.logger.debug(
-        "received response part",
-        metadata: ["response_part": "\(part.name)"],
-        source: "GRPC"
-      )
-
-      switch part {
-      case let .initialMetadata(metadata):
-        self.responseContainer.lazyInitialMetadataPromise.completeWith(.success(metadata))
-
-      case let .message(messageContext):
-        switch self.responseContainer.responseHandler {
-        case let .unary(responsePromise):
-          responsePromise.succeed(messageContext.message)
-        case let .stream(handler):
-          handler(messageContext.message)
-        }
-
-      case let .trailingMetadata(metadata):
-        self.responseContainer.lazyTrailingMetadataPromise.succeed(metadata)
-
-      case let .status(status):
-        // We're closed now.
-        self.state = .closed
-        self.stopTimer(status: status)
-
-        // We're done; cancel the timeout.
-        self.scheduledTimeout?.cancel()
-        self.scheduledTimeout = nil
-
-        // We're not really failing the status here; in some cases the server may fast fail, in which
-        // case we'll only see trailing metadata and status: we should fail the initial metadata and
-        // response in that case.
-        self.responseContainer.fail(with: status, status: status)
-      }
-
-    case .closed:
-      self.logger.debug("dropping response part", metadata: [
-        "response_part": "\(part.name)",
-        "call_state": "\(self.describeCallState())",
-      ], source: "GRPC")
-    }
-  }
-
-  /// The underlying channel become active and can start accepting writes.
-  ///
-  /// Must be called on the event loop.
-  internal func activate(stream: Channel) {
-    self.eventLoop.preconditionInEventLoop()
-    self.logger.debug("activated stream channel", source: "GRPC")
-
-    // The channel has become active: what now?
-    switch self.state {
-    case .buffering:
-      while !self.requestBuffer.isEmpty {
-        // Are we marked?
-        let hadMark = self.requestBuffer.hasMark
-        let request = self.requestBuffer.removeFirst()
-        // We became unmarked: we need to flush.
-        let shouldFlush = hadMark && !self.requestBuffer.hasMark
-
-        self.logger.debug(
-          "unbuffering request part",
-          metadata: ["request_part": "\(request.message.name)"],
-          source: "GRPC"
-        )
-        stream.write(request.message, promise: request.promise)
-        if shouldFlush {
-          stream.flush()
-        }
-      }
-
-      self.logger.debug("request buffer drained", source: "GRPC")
-      self.state = .active(stream)
-
-    case .active:
-      preconditionFailure("Invalid state: stream is already active")
-
-    case .closed:
-      // The channel became active but we're already closed: we must've timed out waiting for the
-      // channel to activate so close the channel now.
-      stream.close(mode: .all, promise: nil)
-    }
-  }
-}
-
-// MARK: Private Helpers
-
-extension ChannelTransport {
-  private func describeCallState() -> String {
-    self.eventLoop.preconditionInEventLoop()
-
-    switch self.state {
-    case .buffering:
-      return "waiting for connection; \(self.requestBuffer.count) request part(s) buffered"
-    case .active:
-      return "active"
-    case .closed:
-      return "closed"
-    }
-  }
-
-  private func startTimer() {
-    assert(self.stopwatch == nil)
-    self.stopwatch = Stopwatch()
-    self.logger.debug("starting rpc", source: "GRPC")
-  }
-
-  private func stopTimer(status: GRPCStatus) {
-    self.eventLoop.preconditionInEventLoop()
-
-    if let stopwatch = self.stopwatch {
-      let millis = stopwatch.elapsedMillis()
-      self.logger.debug("rpc call finished", metadata: [
-        "duration_ms": "\(millis)",
-        "status_code": "\(status.code.rawValue)",
-        "status_message": "\(status.message ?? "nil")",
-      ], source: "GRPC")
-      self.stopwatch = nil
-    }
-  }
-
-  /// Sets a time limit for the RPC.
-  private func setUpTimeLimit(_ timeLimit: TimeLimit) {
-    let deadline = timeLimit.makeDeadline()
-
-    guard deadline != .distantFuture else {
-      // This is too distant to worry about.
-      return
-    }
-
-    let timedOutTask = {
-      self.timedOut(after: timeLimit)
-    }
-
-    // 'scheduledTimeout' must only be accessed from the event loop.
-    if self.eventLoop.inEventLoop {
-      self.scheduledTimeout = self.eventLoop.scheduleTask(deadline: deadline, timedOutTask)
-    } else {
-      self.eventLoop.execute {
-        self.scheduledTimeout = self.eventLoop.scheduleTask(deadline: deadline, timedOutTask)
-      }
-    }
-  }
-}
-
-extension _GRPCClientRequestPart {
-  fileprivate var name: String {
-    switch self {
-    case .head:
-      return "head"
-    case .message:
-      return "message"
-    case .end:
-      return "end"
-    }
-  }
-}
-
-extension _GRPCClientResponsePart {
-  fileprivate var name: String {
-    switch self {
-    case .initialMetadata:
-      return "initial metadata"
-    case .message:
-      return "message"
-    case .trailingMetadata:
-      return "trailing metadata"
-    case .status:
-      return "status"
-    }
-  }
-}
-
-// A wrapper for connection errors: we need to be able to preserve the underlying error as
-// well as extract a 'GRPCStatus' with code '.unavailable'.
-internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
-  /// The reason the connection failed.
-  var reason: Error
-
-  init(reason: Error) {
-    self.reason = reason
-  }
-
-  var description: String {
-    return String(describing: self.reason)
-  }
-
-  func makeGRPCStatus() -> GRPCStatus {
-    return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
-  }
-}

+ 0 - 46
Sources/GRPC/ClientCalls/GRPCClientCallHandler.swift

@@ -1,46 +0,0 @@
-/*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import NIO
-
-/// An inbound channel handler which forwards events and messages to a client call.
-internal class GRPCClientCallHandler<Request, Response>: ChannelInboundHandler {
-  typealias InboundIn = _GRPCClientResponsePart<Response>
-  private var call: ChannelTransport<Request, Response>
-
-  init(call: ChannelTransport<Request, Response>) {
-    self.call = call
-  }
-
-  func errorCaught(context: ChannelHandlerContext, error: Error) {
-    self.call.receiveError(error)
-    context.fireErrorCaught(error)
-  }
-
-  func channelActive(context: ChannelHandlerContext) {
-    self.call.activate(stream: context.channel)
-    context.fireChannelActive()
-  }
-
-  func channelInactive(context: ChannelHandlerContext) {
-    self.errorCaught(context: context, error: GRPCStatus(code: .unavailable, message: nil))
-    context.fireChannelInactive()
-  }
-
-  func channelRead(context: ChannelHandlerContext, data: NIOAny) {
-    let part = self.unwrapInboundIn(data)
-    self.call.receiveResponse(part)
-  }
-}

+ 19 - 0
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -852,3 +852,22 @@ extension GRPCClientRequestPart {
     }
   }
 }
+
+// A wrapper for connection errors: we need to be able to preserve the underlying error as
+// well as extract a 'GRPCStatus' with code '.unavailable'.
+internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
+  /// The reason the connection failed.
+  var reason: Error
+
+  init(reason: Error) {
+    self.reason = reason
+  }
+
+  var description: String {
+    return String(describing: self.reason)
+  }
+
+  func makeGRPCStatus() -> GRPCStatus {
+    return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
+  }
+}

+ 0 - 538
Tests/GRPCTests/ChannelTransportTests.swift

@@ -1,538 +0,0 @@
-/*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import EchoModel
-@testable import GRPC
-import NIO
-import NIOHTTP2
-import XCTest
-
-class ChannelTransportTests: GRPCTestCase {
-  typealias Request = Echo_EchoRequest
-  typealias RequestPart = _GRPCClientRequestPart<Request>
-
-  typealias Response = Echo_EchoResponse
-  typealias ResponsePart = _GRPCClientResponsePart<Response>
-
-  private func makeEmbeddedTransport(
-    channel: EmbeddedChannel,
-    container: ResponsePartContainer<Response>,
-    deadline: NIODeadline = .distantFuture
-  ) -> ChannelTransport<Request, Response> {
-    let transport = ChannelTransport<Request, Response>(
-      eventLoop: channel.eventLoop,
-      responseContainer: container,
-      timeLimit: .deadline(deadline),
-      errorDelegate: nil,
-      logger: self.logger
-    ) { call, promise in
-      channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).whenComplete { result in
-        switch result {
-        case .success:
-          promise.succeed(channel)
-        case let .failure(error):
-          promise.fail(error)
-        }
-      }
-    }
-
-    return transport
-  }
-
-  private func makeTransport(
-    on eventLoop: EventLoop,
-    container: ResponsePartContainer<Response>,
-    deadline: NIODeadline = .distantFuture,
-    channelProvider: @escaping (ChannelTransport<Request, Response>, EventLoopPromise<Channel>)
-      -> Void
-  ) -> ChannelTransport<Request, Response> {
-    let transport = ChannelTransport<Request, Response>(
-      eventLoop: eventLoop,
-      responseContainer: container,
-      timeLimit: .deadline(deadline),
-      errorDelegate: nil,
-      logger: self.logger,
-      channelProvider: channelProvider
-    )
-    return transport
-  }
-
-  private func makeRequestHead() -> _GRPCRequestHead {
-    return _GRPCRequestHead(
-      method: "POST",
-      scheme: "http",
-      path: "/foo/bar",
-      host: "localhost",
-      deadline: .distantFuture,
-      customMetadata: [:],
-      encoding: .disabled
-    )
-  }
-
-  private func makeRequest(_ text: String) -> _MessageContext<Request> {
-    return _MessageContext(Request.with { $0.text = text }, compressed: false)
-  }
-
-  private func makeResponse(_ text: String) -> _MessageContext<Response> {
-    return _MessageContext(Response.with { $0.text = text }, compressed: false)
-  }
-
-  // MARK: - Happy path
-
-  func testUnaryHappyPath() throws {
-    let channel = EmbeddedChannel()
-    let responsePromise = channel.eventLoop.makePromise(of: Response.self)
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel.eventLoop,
-      unaryResponsePromise: responsePromise
-    )
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-
-    // Okay, let's send a unary request.
-    transport.sendUnary(
-      self.makeRequestHead(),
-      request: .with { $0.text = "hello" },
-      compressed: false
-    )
-
-    // We haven't activated yet so the transport should buffer the message.
-    XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
-
-    // Activate the channel.
-    channel.pipeline.fireChannelActive()
-
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
-    XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
-
-    transport.receiveResponse(.initialMetadata([:]))
-    transport.receiveResponse(.message(.init(.with { $0.text = "Hello!" }, compressed: false)))
-    transport.receiveResponse(.trailingMetadata([:]))
-    transport.receiveResponse(.status(.ok))
-
-    XCTAssertNoThrow(
-      try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
-        .wait()
-    )
-    XCTAssertNoThrow(try responsePromise.futureResult.wait())
-    XCTAssertNoThrow(
-      try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
-        .wait()
-    )
-    XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
-  }
-
-  func testBidirectionalHappyPath() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-
-    // Okay, send the request. We'll do it before activating.
-    transport.sendRequests([
-      .head(self.makeRequestHead()),
-      .message(self.makeRequest("1")),
-      .message(self.makeRequest("2")),
-      .message(self.makeRequest("3")),
-      .end,
-    ], promise: nil)
-
-    // We haven't activated yet so the transport should buffer the messages.
-    XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
-
-    // Activate the channel.
-    channel.pipeline.fireChannelActive()
-
-    // Read the parts.
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
-    XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
-
-    // Write some responses.
-    XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
-    XCTAssertNoThrow(try channel.writeInbound(ResponsePart.trailingMetadata([:])))
-    XCTAssertNoThrow(try channel.writeInbound(ResponsePart.status(.ok)))
-
-    // Check the responses.
-    XCTAssertNoThrow(
-      try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
-        .wait()
-    )
-    XCTAssertNoThrow(
-      try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
-        .wait()
-    )
-    XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
-  }
-
-  // MARK: - Timeout
-
-  func testTimeoutBeforeActivating() throws {
-    let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
-    let channel = EmbeddedChannel()
-    let responsePromise = channel.eventLoop.makePromise(of: Response.self)
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel.eventLoop,
-      unaryResponsePromise: responsePromise
-    )
-    let transport = self.makeEmbeddedTransport(
-      channel: channel,
-      container: container,
-      deadline: deadline
-    )
-
-    // Advance time beyond the timeout.
-    channel.embeddedEventLoop.advanceTime(by: .minutes(42))
-
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyInitialMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertThrowsError(try responsePromise.futureResult.wait())
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyTrailingMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertEqual(
-      try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
-      .deadlineExceeded
-    )
-
-    // Writing should fail.
-    let sendPromise = channel.eventLoop.makePromise(of: Void.self)
-
-    transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
-    XCTAssertThrowsError(try sendPromise.futureResult.wait())
-  }
-
-  func testTimeoutAfterActivating() throws {
-    let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
-    let channel = EmbeddedChannel()
-
-    let responsePromise = channel.eventLoop.makePromise(of: Response.self)
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel.eventLoop,
-      unaryResponsePromise: responsePromise
-    )
-    let transport = self.makeEmbeddedTransport(
-      channel: channel,
-      container: container,
-      deadline: deadline
-    )
-
-    // Activate the channel.
-    channel.pipeline.fireChannelActive()
-
-    // Advance time beyond the timeout.
-    channel.embeddedEventLoop.advanceTime(by: .minutes(42))
-
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyInitialMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertThrowsError(try responsePromise.futureResult.wait())
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyTrailingMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertEqual(
-      try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
-      .deadlineExceeded
-    )
-
-    // Writing should fail.
-    let sendPromise = channel.eventLoop.makePromise(of: Void.self)
-    transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
-    XCTAssertThrowsError(try sendPromise.futureResult.wait())
-  }
-
-  func testTimeoutMidRPC() throws {
-    let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(
-      channel: channel,
-      container: container,
-      deadline: deadline
-    )
-
-    // Activate the channel.
-    channel.pipeline.fireChannelActive()
-
-    // Okay, send some requests.
-    transport.sendRequests([
-      .head(self.makeRequestHead()),
-      .message(self.makeRequest("1")),
-    ], promise: nil)
-
-    // Read the parts.
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
-    XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
-
-    // We'll send back the initial metadata.
-    XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
-    XCTAssertNoThrow(
-      try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
-        .wait()
-    )
-
-    // Advance time beyond the timeout.
-    channel.embeddedEventLoop.advanceTime(by: .minutes(42))
-
-    // Check the remaining response parts.
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyTrailingMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertEqual(
-      try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
-      .deadlineExceeded
-    )
-  }
-
-  // MARK: - Channel errors
-
-  func testChannelBecomesInactive() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-
-    // Activate and deactivate the channel.
-    channel.pipeline.fireChannelActive()
-    channel.pipeline.fireChannelInactive()
-
-    // Everything should fail.
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyInitialMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyTrailingMetadataPromise
-        .getFutureResult().wait()
-    )
-    // Except the status, that will never fail.
-    XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
-  }
-
-  func testChannelError() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-
-    // Activate the channel.
-    channel.pipeline.fireChannelActive()
-
-    // Fire an error.
-    channel.pipeline.fireErrorCaught(GRPCStatus.processingError)
-
-    // Everything should fail.
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyInitialMetadataPromise
-        .getFutureResult().wait()
-    )
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyTrailingMetadataPromise
-        .getFutureResult().wait()
-    )
-    // Except the status, that will never fail.
-    XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
-  }
-
-  func testChannelFutureError() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) {
-      XCTFail("No response expected but got: \($0)")
-    }
-
-    struct DoomedChannelError: Error {}
-    let transport = self.makeTransport(on: channel.eventLoop, container: container) { _, promise in
-      promise.fail(GRPCStatus(code: .unavailable, message: "\(DoomedChannelError())"))
-    }
-
-    let status = try transport.responseContainer.lazyStatusPromise.getFutureResult().wait()
-    XCTAssertEqual(status, GRPCStatus(code: .unavailable, message: "\(DoomedChannelError())"))
-  }
-
-  // MARK: - Test Transport after Shutdown
-
-  func testOutboundMethodsAfterShutdown() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-    // Close the channel.
-    XCTAssertNoThrow(try channel.close().wait())
-
-    // Sending should fail.
-    let sendRequestPromise = channel.eventLoop.makePromise(of: Void.self)
-    transport.sendRequest(.head(self.makeRequestHead()), promise: sendRequestPromise)
-    XCTAssertThrowsError(try sendRequestPromise.futureResult.wait()) { error in
-      XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
-    }
-
-    // Sending many should fail.
-    let sendRequestsPromise = channel.eventLoop.makePromise(of: Void.self)
-    transport.sendRequests([.end], promise: sendRequestsPromise)
-    XCTAssertThrowsError(try sendRequestsPromise.futureResult.wait()) { error in
-      XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
-    }
-
-    // Cancelling should fail.
-    let cancelPromise = channel.eventLoop.makePromise(of: Void.self)
-    transport.cancel(promise: cancelPromise)
-    XCTAssertThrowsError(try cancelPromise.futureResult.wait()) { error in
-      XCTAssertEqual(error as? ChannelError, ChannelError.alreadyClosed)
-    }
-
-    let channelFuture = transport.streamChannel()
-    XCTAssertThrowsError(try channelFuture.wait()) { error in
-      XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
-    }
-  }
-
-  func testInboundMethodsAfterShutdown() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-    // Close the channel.
-    XCTAssertNoThrow(try channel.close().wait())
-
-    // We'll fail the handler in the container if this one is received.
-    transport.receiveResponse(.message(self.makeResponse("ignored!")))
-    transport.receiveError(GRPCStatus.processingError)
-  }
-
-  func testBufferedWritesAreFailedOnClose() throws {
-    let channel = EmbeddedChannel()
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel
-        .eventLoop
-    ) { (response: Response) in
-      XCTFail("No response expected but got: \(response)")
-    }
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-
-    let requestHeadPromise = channel.eventLoop.makePromise(of: Void.self)
-    transport.sendRequest(.head(self.makeRequestHead()), promise: requestHeadPromise)
-
-    // Close the channel.
-    XCTAssertNoThrow(try channel.close().wait())
-
-    // Promise should fail.
-    XCTAssertThrowsError(try requestHeadPromise.futureResult.wait())
-  }
-
-  func testErrorsAreNotAlwaysStatus() throws {
-    let channel = EmbeddedChannel()
-    let responsePromise = channel.eventLoop.makePromise(of: Response.self)
-    let container = ResponsePartContainer<Response>(
-      eventLoop: channel.eventLoop,
-      unaryResponsePromise: responsePromise
-    )
-
-    let transport = self.makeEmbeddedTransport(channel: channel, container: container)
-    transport.activate(stream: channel)
-
-    // Send an error
-    transport.receiveError(GRPCError.RPCCancelledByClient())
-
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyInitialMetadataPromise
-        .getFutureResult().wait()
-    ) { error in
-      XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
-    }
-
-    XCTAssertThrowsError(
-      try transport.responseContainer.lazyTrailingMetadataPromise
-        .getFutureResult().wait()
-    ) { error in
-      XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
-    }
-
-    XCTAssertThrowsError(try responsePromise.futureResult.wait()) { error in
-      XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
-    }
-
-    // Status never fails.
-    XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
-  }
-}
-
-extension _GRPCClientRequestPart {
-  var requestHead: _GRPCRequestHead? {
-    switch self {
-    case let .head(head):
-      return head
-    case .message, .end:
-      return nil
-    }
-  }
-
-  var message: Request? {
-    switch self {
-    case let .message(message):
-      return message.message
-    case .head, .end:
-      return nil
-    }
-  }
-
-  var isEnd: Bool {
-    switch self {
-    case .end:
-      return true
-    case .head, .message:
-      return false
-    }
-  }
-}