| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Logging
- import NIOCore
- import NIOHPACK
- import NIOHTTP2
- /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
- /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
- /// little API to use on this class: they may configure the transport by adding it to a
- /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
- /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
- /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
- ///
- /// In most instances the glue code is simple: transformations are applied to the request and
- /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
- /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
- /// when these change. This includes buffering request parts from the interceptor pipeline until
- /// the `NIO.Channel` becomes active.
- ///
- /// ### Thread Safety
- ///
- /// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`.
- @usableFromInline
- internal final class ClientTransport<Request, Response> {
- /// The `EventLoop` the call is running on. State must be accessed from this event loop.
- @usableFromInline
- internal let callEventLoop: EventLoop
- /// The current state of the transport.
- private var state: ClientTransportState = .idle
- /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
- /// and fail it when we transition to `closed`.
- private var channelPromise: EventLoopPromise<Channel>?
- // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
- // have 3 parts.
- /// A buffer to store request parts and promises in before the channel has become active.
- private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
- /// The request serializer.
- private let serializer: AnySerializer<Request>
- /// The response deserializer.
- private let deserializer: AnyDeserializer<Response>
- /// A request part and a promise.
- private struct RequestAndPromise {
- var request: GRPCClientRequestPart<Request>
- var promise: EventLoopPromise<Void>?
- }
- /// Details about the call.
- internal let callDetails: CallDetails
- /// A logger.
- internal var logger: Logger
- /// Is the call streaming requests?
- private var isStreamingRequests: Bool {
- switch self.callDetails.type {
- case .unary, .serverStreaming:
- return false
- case .clientStreaming, .bidirectionalStreaming:
- return true
- }
- }
- // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
- // convenient to have both at the same time when intercepting response parts. We'll hold on to the
- // trailers here and only forward them when we receive the status.
- private var trailers: HPACKHeaders?
- /// The interceptor pipeline connected to this transport. The pipeline also holds references
- /// to `self` which are dropped when the interceptor pipeline is closed.
- @usableFromInline
- internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
- /// The `NIO.Channel` used by the transport, if it is available.
- private var channel: Channel?
- /// A callback which is invoked once when the stream channel becomes active.
- private var onStart: (() -> Void)?
- /// Our current state as logging metadata.
- private var stateForLogging: Logger.MetadataValue {
- if self.state.mayBuffer {
- return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
- } else {
- return "\(self.state)"
- }
- }
- internal init(
- details: CallDetails,
- eventLoop: EventLoop,
- interceptors: [ClientInterceptor<Request, Response>],
- serializer: AnySerializer<Request>,
- deserializer: AnyDeserializer<Response>,
- errorDelegate: ClientErrorDelegate?,
- onStart: @escaping () -> Void,
- onError: @escaping (Error) -> Void,
- onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
- ) {
- self.callEventLoop = eventLoop
- self.callDetails = details
- self.onStart = onStart
- self.logger = details.options.logger
- self.serializer = serializer
- self.deserializer = deserializer
- // The references to self held by the pipeline are dropped when it is closed.
- self._pipeline = ClientInterceptorPipeline(
- eventLoop: eventLoop,
- details: details,
- logger: details.options.logger,
- interceptors: interceptors,
- errorDelegate: errorDelegate,
- onError: onError,
- onCancel: self.cancelFromPipeline(promise:),
- onRequestPart: self.sendFromPipeline(_:promise:),
- onResponsePart: onResponsePart
- )
- }
- // MARK: - Call Object API
- /// Configure the transport to communicate with the server.
- /// - Parameter configurator: A callback to invoke in order to configure this transport.
- /// - Important: This *must* to be called from the `callEventLoop`.
- internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
- self.callEventLoop.assertInEventLoop()
- if self.state.configureTransport() {
- self.configure(using: configurator)
- }
- }
- /// Send a request part – via the interceptor pipeline – to the server.
- /// - Parameters:
- /// - part: The part to send.
- /// - promise: A promise which will be completed when the request part has been handled.
- /// - Important: This *must* to be called from the `callEventLoop`.
- @inlinable
- internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
- self.callEventLoop.assertInEventLoop()
- if let pipeline = self._pipeline {
- pipeline.send(part, promise: promise)
- } else {
- promise?.fail(GRPCError.AlreadyComplete())
- }
- }
- /// Attempt to cancel the RPC notifying any interceptors.
- /// - Parameter promise: A promise which will be completed when the cancellation attempt has
- /// been handled.
- internal func cancel(promise: EventLoopPromise<Void>?) {
- self.callEventLoop.assertInEventLoop()
- if let pipeline = self._pipeline {
- pipeline.cancel(promise: promise)
- } else {
- promise?.fail(GRPCError.AlreadyComplete())
- }
- }
- /// A request for the underlying `Channel`.
- internal func getChannel() -> EventLoopFuture<Channel> {
- self.callEventLoop.assertInEventLoop()
- // Do we already have a promise?
- if let promise = self.channelPromise {
- return promise.futureResult
- } else {
- // Make and store the promise.
- let promise = self.callEventLoop.makePromise(of: Channel.self)
- self.channelPromise = promise
- // Ask the state machine if we can have it.
- switch self.state.getChannel() {
- case .succeed:
- if let channel = self.channel {
- promise.succeed(channel)
- }
- case .fail:
- promise.fail(GRPCError.AlreadyComplete())
- case .doNothing:
- ()
- }
- return promise.futureResult
- }
- }
- }
- // MARK: - Pipeline API
- extension ClientTransport {
- /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
- /// - Parameters:
- /// - part: The request part to send.
- /// - promise: A promise which will be completed when the part has been handled.
- /// - Important: This *must* to be called from the `callEventLoop`.
- private func sendFromPipeline(
- _ part: GRPCClientRequestPart<Request>,
- promise: EventLoopPromise<Void>?
- ) {
- self.callEventLoop.assertInEventLoop()
- switch self.state.send() {
- case .writeToBuffer:
- self.buffer(part, promise: promise)
- case .writeToChannel:
- // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the
- // correct state, the requirements of that state are having an active `Channel`.
- self.writeToChannel(
- self.channel!,
- part: part,
- promise: promise,
- flush: self.shouldFlush(after: part)
- )
- case .alreadyComplete:
- promise?.fail(GRPCError.AlreadyComplete())
- }
- }
- /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
- /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
- /// - Important: This *must* to be called from the `callEventLoop`.
- private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
- self.callEventLoop.assertInEventLoop()
- if self.state.cancel() {
- let error = GRPCError.RPCCancelledByClient()
- let status = error.makeGRPCStatus()
- self.forwardToInterceptors(.end(status, [:]))
- self.failBufferedWrites(with: error)
- self.channel?.close(mode: .all, promise: nil)
- self.channelPromise?.fail(error)
- promise?.succeed(())
- } else {
- promise?.succeed(())
- }
- }
- }
- // MARK: - ChannelHandler API
- extension ClientTransport: ChannelInboundHandler {
- @usableFromInline
- typealias InboundIn = _RawGRPCClientResponsePart
- @usableFromInline
- typealias OutboundOut = _RawGRPCClientRequestPart
- @usableFromInline
- internal func handlerRemoved(context: ChannelHandlerContext) {
- self.dropReferences()
- }
- @usableFromInline
- internal func handlerAdded(context: ChannelHandlerContext) {
- if context.channel.isActive {
- self.transportActivated(channel: context.channel)
- }
- }
- @usableFromInline
- internal func errorCaught(context: ChannelHandlerContext, error: Error) {
- self.handleError(error)
- }
- @usableFromInline
- internal func channelActive(context: ChannelHandlerContext) {
- self.transportActivated(channel: context.channel)
- }
- @usableFromInline
- internal func channelInactive(context: ChannelHandlerContext) {
- self.transportDeactivated()
- }
- @usableFromInline
- internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
- switch self.unwrapInboundIn(data) {
- case let .initialMetadata(headers):
- self.receiveFromChannel(initialMetadata: headers)
- case let .message(box):
- self.receiveFromChannel(message: box.message)
- case let .trailingMetadata(trailers):
- self.receiveFromChannel(trailingMetadata: trailers)
- case let .status(status):
- self.receiveFromChannel(status: status)
- }
- // (We're the end of the channel. No need to forward anything.)
- }
- }
- extension ClientTransport {
- /// The `Channel` became active. Send out any buffered requests.
- private func transportActivated(channel: Channel) {
- if self.callEventLoop.inEventLoop {
- self._transportActivated(channel: channel)
- } else {
- self.callEventLoop.execute {
- self._transportActivated(channel: channel)
- }
- }
- }
- /// On-loop implementation of `transportActivated(channel:)`.
- private func _transportActivated(channel: Channel) {
- self.callEventLoop.assertInEventLoop()
- switch self.state.activate() {
- case .unbuffer:
- self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
- self._pipeline?.logger = self.logger
- self.logger.debug("activated stream channel")
- self.channel = channel
- self.unbuffer()
- case .close:
- channel.close(mode: .all, promise: nil)
- case .doNothing:
- ()
- }
- }
- /// The `Channel` became inactive. Fail any buffered writes and forward an error to the
- /// interceptor pipeline if necessary.
- private func transportDeactivated() {
- if self.callEventLoop.inEventLoop {
- self._transportDeactivated()
- } else {
- self.callEventLoop.execute {
- self._transportDeactivated()
- }
- }
- }
- /// On-loop implementation of `transportDeactivated()`.
- private func _transportDeactivated() {
- self.callEventLoop.assertInEventLoop()
- switch self.state.deactivate() {
- case .doNothing:
- ()
- case .tearDown:
- let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
- self.forwardErrorToInterceptors(status)
- self.failBufferedWrites(with: status)
- self.channelPromise?.fail(status)
- case .failChannelPromise:
- self.channelPromise?.fail(GRPCError.AlreadyComplete())
- }
- }
- /// Drops any references to the `Channel` and interceptor pipeline.
- private func dropReferences() {
- if self.callEventLoop.inEventLoop {
- self.channel = nil
- } else {
- self.callEventLoop.execute {
- self.channel = nil
- }
- }
- }
- /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the
- /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will
- /// also be closed.
- internal func handleError(_ error: Error) {
- if self.callEventLoop.inEventLoop {
- self._handleError(error)
- } else {
- self.callEventLoop.execute {
- self._handleError(error)
- }
- }
- }
- /// On-loop implementation of `handleError(_:)`.
- private func _handleError(_ error: Error) {
- self.callEventLoop.assertInEventLoop()
- switch self.state.handleError() {
- case .doNothing:
- ()
- case .propagateError:
- self.forwardErrorToInterceptors(error)
- self.failBufferedWrites(with: error)
- case .propagateErrorAndClose:
- self.forwardErrorToInterceptors(error)
- self.failBufferedWrites(with: error)
- self.channel?.close(mode: .all, promise: nil)
- }
- }
- /// Receive initial metadata from the `Channel`.
- private func receiveFromChannel(initialMetadata headers: HPACKHeaders) {
- if self.callEventLoop.inEventLoop {
- self._receiveFromChannel(initialMetadata: headers)
- } else {
- self.callEventLoop.execute {
- self._receiveFromChannel(initialMetadata: headers)
- }
- }
- }
- /// On-loop implementation of `receiveFromChannel(initialMetadata:)`.
- private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) {
- self.callEventLoop.assertInEventLoop()
- if self.state.channelRead(isEnd: false) {
- self.forwardToInterceptors(.metadata(headers))
- }
- }
- /// Receive response message bytes from the `Channel`.
- private func receiveFromChannel(message buffer: ByteBuffer) {
- if self.callEventLoop.inEventLoop {
- self._receiveFromChannel(message: buffer)
- } else {
- self.callEventLoop.execute {
- self._receiveFromChannel(message: buffer)
- }
- }
- }
- /// On-loop implementation of `receiveFromChannel(message:)`.
- private func _receiveFromChannel(message buffer: ByteBuffer) {
- self.callEventLoop.assertInEventLoop()
- do {
- let message = try self.deserializer.deserialize(byteBuffer: buffer)
- if self.state.channelRead(isEnd: false) {
- self.forwardToInterceptors(.message(message))
- }
- } catch {
- self.handleError(error)
- }
- }
- /// Receive trailing metadata from the `Channel`.
- private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) {
- // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
- // in the interceptor pipeline.
- if self.callEventLoop.inEventLoop {
- self.trailers = trailers
- } else {
- self.callEventLoop.execute {
- self.trailers = trailers
- }
- }
- }
- /// Receive the final status from the `Channel`.
- private func receiveFromChannel(status: GRPCStatus) {
- if self.callEventLoop.inEventLoop {
- self._receiveFromChannel(status: status)
- } else {
- self.callEventLoop.execute {
- self._receiveFromChannel(status: status)
- }
- }
- }
- /// On-loop implementation of `receiveFromChannel(status:)`.
- private func _receiveFromChannel(status: GRPCStatus) {
- self.callEventLoop.assertInEventLoop()
- if self.state.channelRead(isEnd: true) {
- self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
- self.trailers = nil
- }
- }
- }
- // MARK: - State Handling
- private enum ClientTransportState {
- /// Idle. We're waiting for the RPC to be configured.
- ///
- /// Valid transitions:
- /// - `awaitingTransport` (the transport is being configured)
- /// - `closed` (the RPC cancels)
- case idle
- /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
- /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
- /// transport in this state is an error.
- ///
- /// Valid transitions:
- /// - `activatingTransport` (the channel becomes active)
- /// - `closing` (the RPC cancels)
- /// - `closed` (the channel fails to become active)
- case awaitingTransport
- /// The transport is active but we're unbuffering any requests to write on that transport.
- /// We'll continue buffering in this state. Receiving messages from the transport in this state
- /// is okay.
- ///
- /// Valid transitions:
- /// - `active` (we finish unbuffering)
- /// - `closing` (the RPC cancels, the channel encounters an error)
- /// - `closed` (the channel becomes inactive)
- case activatingTransport
- /// Fully active. An RPC is in progress and is communicating over an active transport.
- ///
- /// Valid transitions:
- /// - `closing` (the RPC cancels, the channel encounters an error)
- /// - `closed` (the channel becomes inactive)
- case active
- /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
- /// become inactive yet.
- ///
- /// Valid transitions:
- /// - `closed` (the channel becomes inactive)
- case closing
- /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
- /// be ignored.
- ///
- /// Valid transitions:
- /// - none: this state is terminal.
- case closed
- /// Whether writes may be unbuffered in this state.
- internal var isUnbuffering: Bool {
- switch self {
- case .activatingTransport:
- return true
- case .idle, .awaitingTransport, .active, .closing, .closed:
- return false
- }
- }
- /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
- internal var mayBuffer: Bool {
- switch self {
- case .idle, .activatingTransport, .awaitingTransport:
- return true
- case .active, .closing, .closed:
- return false
- }
- }
- }
- extension ClientTransportState {
- /// The caller would like to configure the transport. Returns a boolean indicating whether we
- /// should configure it or not.
- mutating func configureTransport() -> Bool {
- switch self {
- // We're idle until we configure. Anything else is just a repeat request to configure.
- case .idle:
- self = .awaitingTransport
- return true
- case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
- return false
- }
- }
- enum SendAction {
- /// Write the request into the buffer.
- case writeToBuffer
- /// Write the request into the channel.
- case writeToChannel
- /// The RPC has already completed, fail any promise associated with the write.
- case alreadyComplete
- }
- /// The pipeline would like to send a request part to the transport.
- mutating func send() -> SendAction {
- switch self {
- // We don't have any transport yet, just buffer the part.
- case .idle, .awaitingTransport, .activatingTransport:
- return .writeToBuffer
- // We have a `Channel`, we can pipe the write straight through.
- case .active:
- return .writeToChannel
- // The transport is going or has gone away. Fail the promise.
- case .closing, .closed:
- return .alreadyComplete
- }
- }
- enum UnbufferedAction {
- /// Nothing needs to be done.
- case doNothing
- /// Succeed the channel promise associated with the transport.
- case succeedChannelPromise
- }
- /// We finished dealing with the buffered writes.
- mutating func unbuffered() -> UnbufferedAction {
- switch self {
- // These can't happen since we only begin unbuffering when we transition to
- // '.activatingTransport', which must come after these two states..
- case .idle, .awaitingTransport:
- preconditionFailure("Requests can't be unbuffered before the transport is activated")
- // We dealt with any buffered writes. We can become active now. This is the only way to become
- // active.
- case .activatingTransport:
- self = .active
- return .succeedChannelPromise
- case .active:
- preconditionFailure("Unbuffering completed but the transport is already active")
- // Something caused us to close while unbuffering, that's okay, we won't take any further
- // action.
- case .closing, .closed:
- return .doNothing
- }
- }
- /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
- /// cancellation can go ahead (and also whether the channel should be torn down).
- mutating func cancel() -> Bool {
- switch self {
- case .idle:
- // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
- // we're done, fail any writes, and then deal with the cancellation promise.
- self = .closed
- return true
- case .awaitingTransport:
- // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
- // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
- // the `Channel` becoming active (see `channelActive(context:)`).
- self = .closing
- return true
- case .activatingTransport:
- // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
- // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
- // any buffered writes and then complete the cancellation promise.
- self = .closing
- return true
- case .active:
- // The RPC and channel are up and running. We'll fail the RPC and close the channel.
- self = .closing
- return true
- case .closing, .closed:
- // We're already closing or closing. The cancellation is too late.
- return false
- }
- }
- enum ActivateAction {
- case unbuffer
- case close
- case doNothing
- }
- /// `channelActive` was invoked on the transport by the `Channel`.
- mutating func activate() -> ActivateAction {
- // The channel has become active: what now?
- switch self {
- case .idle:
- preconditionFailure("Can't activate an idle transport")
- case .awaitingTransport:
- self = .activatingTransport
- return .unbuffer
- case .activatingTransport, .active:
- // Already activated.
- return .doNothing
- case .closing:
- // We remain in closing: we only transition to closed on 'channelInactive'.
- return .close
- case .closed:
- preconditionFailure("Invalid state: stream is already inactive")
- }
- }
- enum ChannelInactiveAction {
- /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
- case tearDown
- /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
- case failChannelPromise
- /// Do nothing.
- case doNothing
- }
- /// `channelInactive` was invoked on the transport by the `Channel`.
- mutating func deactivate() -> ChannelInactiveAction {
- switch self {
- case .idle:
- // We can't become inactive before we've requested a `Channel`.
- preconditionFailure("Can't deactivate an idle transport")
- case .awaitingTransport, .activatingTransport, .active:
- // We're activating the transport - i.e. offloading any buffered requests - and the channel
- // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
- // synthesize an error status to fail the RPC with.
- self = .closed
- return .tearDown
- case .closing:
- // We were already closing, now we're fully closed.
- self = .closed
- return .failChannelPromise
- case .closed:
- // We're already closed.
- return .doNothing
- }
- }
- /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
- /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
- mutating func channelRead(isEnd: Bool) -> Bool {
- switch self {
- case .idle, .awaitingTransport:
- // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
- preconditionFailure("Can't receive response part on idle transport")
- case .activatingTransport, .active:
- // We have an active `Channel`, we can forward the request part but we may need to start
- // closing if we see the status, since it indicates the call is terminating.
- if isEnd {
- self = .closing
- }
- return true
- case .closing, .closed:
- // We closed early, ignore any reads.
- return false
- }
- }
- enum HandleErrorAction {
- /// Propagate the error to the interceptor pipeline and fail any buffered writes.
- case propagateError
- /// As above, but close the 'Channel' as well.
- case propagateErrorAndClose
- /// No action is required.
- case doNothing
- }
- /// An error was caught.
- mutating func handleError() -> HandleErrorAction {
- switch self {
- case .idle:
- // The `Channel` can't error if it doesn't exist.
- preconditionFailure("Can't catch error on idle transport")
- case .awaitingTransport:
- // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
- // buffered writes along the way.
- self = .closing
- return .propagateError
- case .activatingTransport,
- .active:
- // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
- self = .closing
- return .propagateErrorAndClose
- case .closing, .closed:
- // We're already closing/closed, we can ignore this.
- return .doNothing
- }
- }
- enum GetChannelAction {
- /// No action is required.
- case doNothing
- /// Succeed the Channel promise.
- case succeed
- /// Fail the 'Channel' promise, the RPC is already complete.
- case fail
- }
- /// The caller has asked for the underlying `Channel`.
- mutating func getChannel() -> GetChannelAction {
- switch self {
- case .idle, .awaitingTransport, .activatingTransport:
- // Do nothing, we'll complete the promise when we become active or closed.
- return .doNothing
- case .active:
- // We're already active, so there was no promise to succeed when we made this transition. We
- // can complete it now.
- return .succeed
- case .closing:
- // We'll complete the promise when we transition to closed.
- return .doNothing
- case .closed:
- // We're already closed; there was no promise to fail when we made this transition. We can go
- // ahead and fail it now though.
- return .fail
- }
- }
- }
- // MARK: - State Actions
- extension ClientTransport {
- /// Configures this transport with the `configurator`.
- private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
- configurator(self).whenFailure { error in
- // We might be on a different EL, but `handleError` will sort that out for us, so no need to
- // hop.
- if error is GRPCStatus || error is GRPCStatusTransformable {
- self.handleError(error)
- } else {
- // Fallback to something which will mark the RPC as 'unavailable'.
- self.handleError(ConnectionFailure(reason: error))
- }
- }
- }
- /// Append a request part to the write buffer.
- /// - Parameters:
- /// - part: The request part to buffer.
- /// - promise: A promise to complete when the request part has been sent.
- private func buffer(
- _ part: GRPCClientRequestPart<Request>,
- promise: EventLoopPromise<Void>?
- ) {
- self.callEventLoop.assertInEventLoop()
- self.logger.trace(
- "buffering request part",
- metadata: [
- "request_part": "\(part.name)",
- "call_state": self.stateForLogging,
- ]
- )
- self.writeBuffer.append(.init(request: part, promise: promise))
- }
- /// Writes any buffered request parts to the `Channel`.
- private func unbuffer() {
- self.callEventLoop.assertInEventLoop()
- guard let channel = self.channel else {
- return
- }
- // Save any flushing until we're done writing.
- var shouldFlush = false
- self.logger.trace(
- "unbuffering request parts",
- metadata: [
- "request_parts": "\(self.writeBuffer.count)"
- ]
- )
- // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
- // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
- // may miss more buffered writes.
- while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
- // Pull out as many writes as possible.
- while let write = self.writeBuffer.popFirst() {
- self.logger.trace(
- "unbuffering request part",
- metadata: [
- "request_part": "\(write.request.name)"
- ]
- )
- if !shouldFlush {
- shouldFlush = self.shouldFlush(after: write.request)
- }
- self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false)
- }
- // Okay, flush now.
- if shouldFlush {
- shouldFlush = false
- channel.flush()
- }
- }
- if self.writeBuffer.isEmpty {
- self.logger.trace("request buffer drained")
- } else {
- self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging])
- }
- // We're unbuffered. What now?
- switch self.state.unbuffered() {
- case .doNothing:
- ()
- case .succeedChannelPromise:
- self.channelPromise?.succeed(channel)
- }
- }
- /// Fails any promises that come with buffered writes with `error`.
- /// - Parameter error: The `Error` to fail promises with.
- private func failBufferedWrites(with error: Error) {
- self.logger.trace("failing buffered writes", metadata: ["call_state": self.stateForLogging])
- while let write = self.writeBuffer.popFirst() {
- write.promise?.fail(error)
- }
- }
- /// Write a request part to the `Channel`.
- /// - Parameters:
- /// - channel: The `Channel` to write `part` to.
- /// - part: The request part to write.
- /// - promise: A promise to complete once the write has been completed.
- /// - flush: Whether to flush the `Channel` after writing.
- private func writeToChannel(
- _ channel: Channel,
- part: GRPCClientRequestPart<Request>,
- promise: EventLoopPromise<Void>?,
- flush: Bool
- ) {
- switch part {
- case let .metadata(headers):
- let head = self.makeRequestHead(with: headers)
- channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
- // Messages are buffered by this class and in the async writer for async calls. Initially the
- // async writer is not allowed to emit messages; the call to 'onStart()' signals that messages
- // may be emitted. We call it here to avoid races between writing headers and messages.
- self.onStart?()
- self.onStart = nil
- case let .message(request, metadata):
- do {
- let bytes = try self.serializer.serialize(request, allocator: channel.allocator)
- let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
- channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
- } catch {
- self.handleError(error)
- }
- case .end:
- channel.write(self.wrapOutboundOut(.end), promise: promise)
- }
- if flush {
- channel.flush()
- }
- }
- /// Forward the response part to the interceptor pipeline.
- /// - Parameter part: The response part to forward.
- private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
- self.callEventLoop.assertInEventLoop()
- self._pipeline?.receive(part)
- }
- /// Forward the error to the interceptor pipeline.
- /// - Parameter error: The error to forward.
- private func forwardErrorToInterceptors(_ error: Error) {
- self.callEventLoop.assertInEventLoop()
- self._pipeline?.errorCaught(error)
- }
- }
- // MARK: - Helpers
- extension ClientTransport {
- /// Returns whether the `Channel` should be flushed after writing the given part to it.
- private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
- switch part {
- case .metadata:
- // If we're not streaming requests then we hold off on the flush until we see end.
- return self.isStreamingRequests
- case let .message(_, metadata):
- // Message flushing is determined by caller preference.
- return metadata.flush
- case .end:
- // Always flush at the end of the request stream.
- return true
- }
- }
- /// Make a `_GRPCRequestHead` with the provided metadata.
- private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
- return _GRPCRequestHead(
- method: self.callDetails.options.cacheable ? "GET" : "POST",
- scheme: self.callDetails.scheme,
- path: self.callDetails.path,
- host: self.callDetails.authority,
- deadline: self.callDetails.options.timeLimit.makeDeadline(),
- customMetadata: metadata,
- encoding: self.callDetails.options.messageEncoding
- )
- }
- }
- extension GRPCClientRequestPart {
- /// The name of the request part, used for logging.
- fileprivate var name: String {
- switch self {
- case .metadata:
- return "metadata"
- case .message:
- return "message"
- case .end:
- return "end"
- }
- }
- }
- // A wrapper for connection errors: we need to be able to preserve the underlying error as
- // well as extract a 'GRPCStatus' with code '.unavailable'.
- internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
- /// The reason the connection failed.
- var reason: Error
- init(reason: Error) {
- self.reason = reason
- }
- var description: String {
- return String(describing: self.reason)
- }
- func makeGRPCStatus() -> GRPCStatus {
- return GRPCStatus(
- code: .unavailable,
- message: String(describing: self.reason),
- cause: self.reason
- )
- }
- }
|