| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- /*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Foundation
- import SwiftProtobuf
- import NIO
- import NIOHPACK
- import Logging
- /// A base channel handler for receiving responses.
- ///
- /// This includes holding promises for the initial metadata and status of the gRPC call. This handler
- /// is also responsible for error handling, via an error delegate and by appropriately failing the
- /// aforementioned promises.
- internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
- public typealias InboundIn = _GRPCClientResponsePart<ResponseMessage>
- internal let logger: Logger
- internal var stopwatch: Stopwatch?
- internal let initialMetadataPromise: EventLoopPromise<HPACKHeaders>
- internal let trailingMetadataPromise: EventLoopPromise<HPACKHeaders>
- internal let statusPromise: EventLoopPromise<GRPCStatus>
- internal let timeout: GRPCTimeout
- internal var timeoutTask: Scheduled<Void>?
- internal let errorDelegate: ClientErrorDelegate?
- internal var context: ChannelHandlerContext?
- /// Creates a new `ClientResponseChannelHandler`.
- ///
- /// - Parameters:
- /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
- /// - trailingMetadataPromise: A promise to succeed on receiving the trailing metadata from the service.
- /// - statusPromise: A promise to succeed with the outcome of the call.
- /// - errorDelegate: An error delegate to call when errors are observed.
- /// - timeout: The call timeout specified by the user.
- public init(
- initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
- trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
- statusPromise: EventLoopPromise<GRPCStatus>,
- errorDelegate: ClientErrorDelegate?,
- timeout: GRPCTimeout,
- logger: Logger
- ) {
- self.initialMetadataPromise = initialMetadataPromise
- self.trailingMetadataPromise = trailingMetadataPromise
- self.statusPromise = statusPromise
- self.errorDelegate = errorDelegate
- self.timeout = timeout
- self.logger = logger
- }
- /// Observe the given status.
- ///
- /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
- /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
- /// are failed with the given status.
- ///
- /// - Parameter status: the status to observe.
- internal func onStatus(_ status: GRPCStatus) {
- if status.code != .ok {
- self.initialMetadataPromise.fail(status)
- }
- self.trailingMetadataPromise.fail(status)
- self.statusPromise.succeed(status)
- self.timeoutTask?.cancel()
- self.context = nil
- if let stopwatch = self.stopwatch {
- let millis = stopwatch.elapsedMillis()
- self.logger.debug("rpc call finished", metadata: [
- "duration_ms": "\(millis)",
- "status_code": "\(status.code.rawValue)"
- ])
- self.stopwatch = nil
- }
- }
- /// Observe the given error.
- ///
- /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:logger:file:line:)` method is
- /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
- /// the given error (see `observeStatus(_:)`).
- ///
- /// - Parameter error: the error to observe.
- internal func onError(_ error: Error) {
- if let errorWithContext = error as? GRPCError.WithContext {
- self.errorDelegate?.didCatchError(
- errorWithContext.error,
- logger: self.logger,
- file: errorWithContext.file,
- line: errorWithContext.line
- )
- self.onStatus(errorWithContext.error.asGRPCStatus())
- } else {
- self.errorDelegate?.didCatchErrorWithoutContext(error, logger: self.logger)
- self.onStatus((error as? GRPCStatusTransformable)?.asGRPCStatus() ?? .processingError)
- }
- }
- /// Called when a response is received. Subclasses should override this method.
- ///
- /// - Parameter response: The received response.
- internal func onResponse(_ response: _Box<ResponseMessage>) {
- // no-op
- }
- public func handlerAdded(context: ChannelHandlerContext) {
- // We need to hold the context in case we timeout and need to close the pipeline.
- self.context = context
- self.stopwatch = .start()
- }
- /// Reads inbound data.
- ///
- /// On receipt of:
- /// - headers: the initial metadata promise is succeeded.
- /// - message: `onResponse(_:)` is called with the received message.
- /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
- /// and response promise (if available) are failed with the status.
- public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
- switch self.unwrapInboundIn(data) {
- case .initialMetadata(let headers):
- self.initialMetadataPromise.succeed(headers)
- case .message(let message):
- self.onResponse(message)
- case .trailingMetadata(let trailers):
- self.trailingMetadataPromise.succeed(trailers)
- case let .status(status):
- self.onStatus(status)
- // We don't expect any more requests/responses beyond this point and we don't need to close
- // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
- }
- }
- public func channelInactive(context: ChannelHandlerContext) {
- self.onStatus(.init(code: .unavailable, message: nil))
- context.fireChannelInactive()
- }
- /// Observe an error from the pipeline and close the channel.
- public func errorCaught(context: ChannelHandlerContext, error: Error) {
- self.onError(error)
- context.close(mode: .all, promise: nil)
- }
- /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
- /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
- internal func scheduleTimeout(eventLoop: EventLoop) {
- guard self.timeout != .infinite else {
- return
- }
- let timeout = self.timeout
- self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
- self?.performTimeout(error: GRPCError.RPCTimedOut(timeout).captureContext())
- }
- }
- /// Called when this call times out. Any promises which have not been fulfilled will be timed out
- /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
- /// its channel is closed.
- ///
- /// - Parameter error: The error to fail any promises with.
- internal func performTimeout(error: GRPCError.WithContext) {
- self.onError(error)
- self.context?.close(mode: .all, promise: nil)
- self.context = nil
- }
- }
- /// A channel handler for client calls which receive a single response.
- final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
- let responsePromise: EventLoopPromise<ResponseMessage>
- internal init(
- initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
- trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
- responsePromise: EventLoopPromise<ResponseMessage>,
- statusPromise: EventLoopPromise<GRPCStatus>,
- errorDelegate: ClientErrorDelegate?,
- timeout: GRPCTimeout,
- logger: Logger
- ) {
- self.responsePromise = responsePromise
- super.init(
- initialMetadataPromise: initialMetadataPromise,
- trailingMetadataPromise: trailingMetadataPromise,
- statusPromise: statusPromise,
- errorDelegate: errorDelegate,
- timeout: timeout,
- logger: logger
- )
- }
- /// Succeeds the response promise with the given response.
- ///
- /// - Parameter response: The response received from the service.
- override func onResponse(_ response: _Box<ResponseMessage>) {
- self.responsePromise.succeed(response.value)
- }
- /// Fails the response promise if the given status is not `.ok`.
- override func onStatus(_ status: GRPCStatus) {
- super.onStatus(status)
- if status.code != .ok {
- self.responsePromise.fail(status)
- }
- }
- // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
- // See: https://bugs.swift.org/browse/SR-11564
- //
- // TODO: Remove this once SR-11564 is resolved.
- override internal func scheduleTimeout(eventLoop: EventLoop) {
- super.scheduleTimeout(eventLoop: eventLoop)
- }
- }
- /// A channel handler for client calls which receive a stream of responses.
- final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
- typealias ResponseHandler = (ResponseMessage) -> Void
- let responseHandler: ResponseHandler
- internal init(
- initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
- trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
- statusPromise: EventLoopPromise<GRPCStatus>,
- errorDelegate: ClientErrorDelegate?,
- timeout: GRPCTimeout,
- logger: Logger,
- responseHandler: @escaping ResponseHandler
- ) {
- self.responseHandler = responseHandler
- super.init(
- initialMetadataPromise: initialMetadataPromise,
- trailingMetadataPromise: trailingMetadataPromise,
- statusPromise: statusPromise,
- errorDelegate: errorDelegate,
- timeout: timeout,
- logger: logger
- )
- }
- /// Calls a user-provided handler with the given response.
- ///
- /// - Parameter response: The response received from the service.
- override func onResponse(_ response: _Box<ResponseMessage>) {
- self.responseHandler(response.value)
- }
- // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
- // See: https://bugs.swift.org/browse/SR-11564
- //
- // TODO: Remove this once SR-11564 is resolved.
- override internal func scheduleTimeout(eventLoop: EventLoop) {
- super.scheduleTimeout(eventLoop: eventLoop)
- }
- }
- /// Client user events.
- internal enum GRPCClientUserEvent {
- /// The call has been cancelled.
- case cancelled
- }
|