ClientTransport.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `eventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` this transport is running on.
  39. @usableFromInline
  40. internal let eventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: ClientTransportState = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// A request part and a promise.
  51. private struct RequestAndPromise {
  52. var request: GRPCClientRequestPart<Request>
  53. var promise: EventLoopPromise<Void>?
  54. }
  55. /// Details about the call.
  56. internal let callDetails: CallDetails
  57. /// A logger.
  58. internal var logger: Logger {
  59. return self.callDetails.options.logger
  60. }
  61. /// Is the call streaming requests?
  62. private var isStreamingRequests: Bool {
  63. switch self.callDetails.type {
  64. case .unary, .serverStreaming:
  65. return false
  66. case .clientStreaming, .bidirectionalStreaming:
  67. return true
  68. }
  69. }
  70. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  71. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  72. // trailers here and only forward them when we receive the status.
  73. private var trailers: HPACKHeaders?
  74. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  75. /// from the `ChannelPipeline` in order to break reference cycles.
  76. @usableFromInline
  77. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  78. /// The 'ChannelHandlerContext'.
  79. private var context: ChannelHandlerContext?
  80. /// Our current state as logging metadata.
  81. private var stateForLogging: Logger.MetadataValue {
  82. if self.state.mayBuffer {
  83. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  84. } else {
  85. return "\(self.state)"
  86. }
  87. }
  88. internal init(
  89. details: CallDetails,
  90. eventLoop: EventLoop,
  91. interceptors: [ClientInterceptor<Request, Response>],
  92. errorDelegate: ClientErrorDelegate?,
  93. onError: @escaping (Error) -> Void,
  94. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  95. ) {
  96. self.eventLoop = eventLoop
  97. self.callDetails = details
  98. self._pipeline = ClientInterceptorPipeline(
  99. eventLoop: eventLoop,
  100. details: details,
  101. interceptors: interceptors,
  102. errorDelegate: errorDelegate,
  103. onError: onError,
  104. onCancel: self.cancelFromPipeline(promise:),
  105. onRequestPart: self.sendFromPipeline(_:promise:),
  106. onResponsePart: onResponsePart
  107. )
  108. }
  109. // MARK: - Call Object API
  110. /// Configure the transport to communicate with the server.
  111. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  112. /// - Important: This *must* to be called from the `eventLoop`.
  113. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  114. self.eventLoop.assertInEventLoop()
  115. if self.state.configureTransport() {
  116. self.configure(using: configurator)
  117. }
  118. }
  119. /// Send a request part – via the interceptor pipeline – to the server.
  120. /// - Parameters:
  121. /// - part: The part to send.
  122. /// - promise: A promise which will be completed when the request part has been handled.
  123. /// - Important: This *must* to be called from the `eventLoop`.
  124. @inlinable
  125. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  126. self.eventLoop.assertInEventLoop()
  127. if let pipeline = self._pipeline {
  128. pipeline.send(part, promise: promise)
  129. } else {
  130. promise?.fail(GRPCError.AlreadyComplete())
  131. }
  132. }
  133. /// Attempt to cancel the RPC notifying any interceptors.
  134. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  135. /// been handled.
  136. internal func cancel(promise: EventLoopPromise<Void>?) {
  137. self.eventLoop.assertInEventLoop()
  138. if let pipeline = self._pipeline {
  139. pipeline.cancel(promise: promise)
  140. } else {
  141. promise?.fail(GRPCError.AlreadyComplete())
  142. }
  143. }
  144. /// A request for the underlying `Channel`.
  145. internal func channel() -> EventLoopFuture<Channel> {
  146. self.eventLoop.assertInEventLoop()
  147. // Do we already have a promise?
  148. if let promise = self.channelPromise {
  149. return promise.futureResult
  150. } else {
  151. // Make and store the promise.
  152. let promise = self.eventLoop.makePromise(of: Channel.self)
  153. self.channelPromise = promise
  154. // Ask the state machine if we can have it.
  155. switch self.state.getChannel() {
  156. case .succeed:
  157. if let channel = self.context?.channel {
  158. promise.succeed(channel)
  159. }
  160. case .fail:
  161. promise.fail(GRPCError.AlreadyComplete())
  162. case .doNothing:
  163. ()
  164. }
  165. return promise.futureResult
  166. }
  167. }
  168. }
  169. // MARK: - Pipeline API
  170. extension ClientTransport {
  171. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  172. /// - Parameters:
  173. /// - part: The request part to send.
  174. /// - promise: A promise which will be completed when the part has been handled.
  175. /// - Important: This *must* to be called from the `eventLoop`.
  176. private func sendFromPipeline(
  177. _ part: GRPCClientRequestPart<Request>,
  178. promise: EventLoopPromise<Void>?
  179. ) {
  180. self.eventLoop.assertInEventLoop()
  181. switch self.state.send() {
  182. case .writeToBuffer:
  183. self.buffer(part, promise: promise)
  184. case .writeToChannel:
  185. self.write(part, promise: promise, flush: self.shouldFlush(after: part))
  186. case .alreadyComplete:
  187. promise?.fail(GRPCError.AlreadyComplete())
  188. }
  189. }
  190. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  191. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  192. /// - Important: This *must* to be called from the `eventLoop`.
  193. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  194. self.eventLoop.assertInEventLoop()
  195. if self.state.cancel() {
  196. let error = GRPCError.RPCCancelledByClient()
  197. self.forwardErrorToInterceptors(error)
  198. self.failBufferedWrites(with: error)
  199. self.context?.channel.close(mode: .all, promise: nil)
  200. self.channelPromise?.fail(error)
  201. promise?.succeed(())
  202. } else {
  203. promise?.fail(GRPCError.AlreadyComplete())
  204. }
  205. }
  206. }
  207. // MARK: - ChannelHandler API
  208. extension ClientTransport: ChannelInboundHandler {
  209. @usableFromInline
  210. typealias InboundIn = _GRPCClientResponsePart<Response>
  211. @usableFromInline
  212. typealias OutboundOut = _GRPCClientRequestPart<Request>
  213. @usableFromInline
  214. func handlerAdded(context: ChannelHandlerContext) {
  215. self.context = context
  216. }
  217. @usableFromInline
  218. internal func handlerRemoved(context: ChannelHandlerContext) {
  219. self.eventLoop.assertInEventLoop()
  220. self.context = nil
  221. // Break the reference cycle.
  222. self._pipeline = nil
  223. }
  224. internal func channelError(_ error: Error) {
  225. self.eventLoop.assertInEventLoop()
  226. switch self.state.channelError() {
  227. case .doNothing:
  228. ()
  229. case .propagateError:
  230. self.forwardErrorToInterceptors(error)
  231. self.failBufferedWrites(with: error)
  232. case .propagateErrorAndClose:
  233. self.forwardErrorToInterceptors(error)
  234. self.failBufferedWrites(with: error)
  235. self.context?.close(mode: .all, promise: nil)
  236. }
  237. }
  238. @usableFromInline
  239. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  240. self.channelError(error)
  241. }
  242. @usableFromInline
  243. internal func channelActive(context: ChannelHandlerContext) {
  244. self.eventLoop.assertInEventLoop()
  245. self.logger.debug("activated stream channel", source: "GRPC")
  246. if self.state.channelActive() {
  247. self.unbuffer(to: context.channel)
  248. } else {
  249. context.close(mode: .all, promise: nil)
  250. }
  251. }
  252. @usableFromInline
  253. internal func channelInactive(context: ChannelHandlerContext) {
  254. self.eventLoop.assertInEventLoop()
  255. switch self.state.channelInactive() {
  256. case .doNothing:
  257. ()
  258. case .tearDown:
  259. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  260. self.forwardErrorToInterceptors(status)
  261. self.failBufferedWrites(with: status)
  262. self.channelPromise?.fail(status)
  263. case .failChannelPromise:
  264. self.channelPromise?.fail(GRPCError.AlreadyComplete())
  265. }
  266. }
  267. @usableFromInline
  268. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  269. self.eventLoop.assertInEventLoop()
  270. let part = self.unwrapInboundIn(data)
  271. let isEnd: Bool
  272. switch part {
  273. case .initialMetadata, .message, .trailingMetadata:
  274. isEnd = false
  275. case .status:
  276. isEnd = true
  277. }
  278. if self.state.channelRead(isEnd: isEnd) {
  279. self.forwardToInterceptors(part)
  280. }
  281. // (We're the end of the channel. No need to forward anything.)
  282. }
  283. }
  284. // MARK: - State Handling
  285. private enum ClientTransportState {
  286. /// Idle. We're waiting for the RPC to be configured.
  287. ///
  288. /// Valid transitions:
  289. /// - `awaitingTransport` (the transport is being configured)
  290. /// - `closed` (the RPC cancels)
  291. case idle
  292. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  293. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  294. /// transport in this state is an error.
  295. ///
  296. /// Valid transitions:
  297. /// - `activatingTransport` (the channel becomes active)
  298. /// - `closing` (the RPC cancels)
  299. /// - `closed` (the channel fails to become active)
  300. case awaitingTransport
  301. /// The transport is active but we're unbuffering any requests to write on that transport.
  302. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  303. /// is okay.
  304. ///
  305. /// Valid transitions:
  306. /// - `active` (we finish unbuffering)
  307. /// - `closing` (the RPC cancels, the channel encounters an error)
  308. /// - `closed` (the channel becomes inactive)
  309. case activatingTransport
  310. /// Fully active. An RPC is in progress and is communicating over an active transport.
  311. ///
  312. /// Valid transitions:
  313. /// - `closing` (the RPC cancels, the channel encounters an error)
  314. /// - `closed` (the channel becomes inactive)
  315. case active
  316. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  317. /// become inactive yet.
  318. ///
  319. /// Valid transitions:
  320. /// - `closed` (the channel becomes inactive)
  321. case closing
  322. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  323. /// be ignored.
  324. ///
  325. /// Valid transitions:
  326. /// - none: this state is terminal.
  327. case closed
  328. /// Whether writes may be unbuffered in this state.
  329. internal var isUnbuffering: Bool {
  330. switch self {
  331. case .activatingTransport:
  332. return true
  333. case .idle, .awaitingTransport, .active, .closing, .closed:
  334. return false
  335. }
  336. }
  337. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  338. internal var mayBuffer: Bool {
  339. switch self {
  340. case .idle, .activatingTransport, .awaitingTransport:
  341. return true
  342. case .active, .closing, .closed:
  343. return false
  344. }
  345. }
  346. }
  347. extension ClientTransportState {
  348. /// The caller would like to configure the transport. Returns a boolean indicating whether we
  349. /// should configure it or not.
  350. mutating func configureTransport() -> Bool {
  351. switch self {
  352. // We're idle until we configure. Anything else is just a repeat request to configure.
  353. case .idle:
  354. self = .awaitingTransport
  355. return true
  356. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  357. return false
  358. }
  359. }
  360. enum SendAction {
  361. /// Write the request into the buffer.
  362. case writeToBuffer
  363. /// Write the request into the channel.
  364. case writeToChannel
  365. /// The RPC has already completed, fail any promise associated with the write.
  366. case alreadyComplete
  367. }
  368. /// The pipeline would like to send a request part to the transport.
  369. mutating func send() -> SendAction {
  370. switch self {
  371. // We don't have any transport yet, just buffer the part.
  372. case .idle, .awaitingTransport, .activatingTransport:
  373. return .writeToBuffer
  374. // We have a `Channel`, we can pipe the write straight through.
  375. case .active:
  376. return .writeToChannel
  377. // The transport is going or has gone away. Fail the promise.
  378. case .closing, .closed:
  379. return .alreadyComplete
  380. }
  381. }
  382. enum UnbufferedAction {
  383. /// Nothing needs to be done.
  384. case doNothing
  385. /// Succeed the channel promise associated with the transport.
  386. case succeedChannelPromise
  387. }
  388. /// We finished dealing with the buffered writes.
  389. mutating func unbuffered() -> UnbufferedAction {
  390. switch self {
  391. // These can't happen since we only begin unbuffering when we transition to
  392. // '.activatingTransport', which must come after these two states..
  393. case .idle, .awaitingTransport:
  394. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  395. // We dealt with any buffered writes. We can become active now. This is the only way to become
  396. // active.
  397. case .activatingTransport:
  398. self = .active
  399. return .succeedChannelPromise
  400. case .active:
  401. preconditionFailure("Unbuffering completed but the transport is already active")
  402. // Something caused us to close while unbuffering, that's okay, we won't take any further
  403. // action.
  404. case .closing, .closed:
  405. return .doNothing
  406. }
  407. }
  408. /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
  409. /// cancellation can go ahead (and also whether the channel should be torn down).
  410. mutating func cancel() -> Bool {
  411. switch self {
  412. case .idle:
  413. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  414. // we're done, fail any writes, and then deal with the cancellation promise.
  415. self = .closed
  416. return true
  417. case .awaitingTransport:
  418. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  419. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  420. // the `Channel` becoming active (see `channelActive(context:)`).
  421. self = .closing
  422. return true
  423. case .activatingTransport:
  424. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  425. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  426. // any buffered writes and then complete the cancellation promise.
  427. self = .closing
  428. return true
  429. case .active:
  430. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  431. self = .closing
  432. return true
  433. case .closing, .closed:
  434. // We're already closing or closing. The cancellation is too late.
  435. return false
  436. }
  437. }
  438. /// `channelActive` was invoked on the transport by the `Channel`.
  439. mutating func channelActive() -> Bool {
  440. // The channel has become active: what now?
  441. switch self {
  442. case .idle:
  443. preconditionFailure("Can't activate an idle transport")
  444. case .awaitingTransport:
  445. self = .activatingTransport
  446. return true
  447. case .activatingTransport, .active:
  448. preconditionFailure("Invalid state: stream is already active")
  449. case .closing:
  450. // We remain in closing: we only transition to closed on 'channelInactive'.
  451. return false
  452. case .closed:
  453. preconditionFailure("Invalid state: stream is already inactive")
  454. }
  455. }
  456. enum ChannelInactiveAction {
  457. /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
  458. case tearDown
  459. /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
  460. case failChannelPromise
  461. /// Do nothing.
  462. case doNothing
  463. }
  464. /// `channelInactive` was invoked on the transport by the `Channel`.
  465. mutating func channelInactive() -> ChannelInactiveAction {
  466. switch self {
  467. case .idle:
  468. // We can't become inactive before we've requested a `Channel`.
  469. preconditionFailure("Can't deactivate an idle transport")
  470. case .awaitingTransport, .activatingTransport, .active:
  471. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  472. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  473. // synthesize an error status to fail the RPC with.
  474. self = .closed
  475. return .tearDown
  476. case .closing:
  477. // We were already closing, now we're fully closed.
  478. self = .closed
  479. return .failChannelPromise
  480. case .closed:
  481. // We're already closed.
  482. return .doNothing
  483. }
  484. }
  485. /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
  486. /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
  487. mutating func channelRead(isEnd: Bool) -> Bool {
  488. switch self {
  489. case .idle, .awaitingTransport:
  490. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  491. preconditionFailure("Can't receive response part on idle transport")
  492. case .activatingTransport, .active:
  493. // We have an active `Channel`, we can forward the request part but we may need to start
  494. // closing if we see the status, since it indicates the call is terminating.
  495. if isEnd {
  496. self = .closing
  497. }
  498. return true
  499. case .closing, .closed:
  500. // We closed early, ignore any reads.
  501. return false
  502. }
  503. }
  504. enum ChannelErrorAction {
  505. /// Propagate the error to the interceptor pipeline and fail any buffered writes.
  506. case propagateError
  507. /// As above, but close the 'Channel' as well.
  508. case propagateErrorAndClose
  509. /// No action is required.
  510. case doNothing
  511. }
  512. /// We received an error from the `Channel`.
  513. mutating func channelError() -> ChannelErrorAction {
  514. switch self {
  515. case .idle:
  516. // The `Channel` can't error if it doesn't exist.
  517. preconditionFailure("Can't catch error on idle transport")
  518. case .awaitingTransport:
  519. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  520. // buffered writes along the way.
  521. self = .closing
  522. return .propagateError
  523. case .activatingTransport,
  524. .active:
  525. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  526. self = .closing
  527. return .propagateErrorAndClose
  528. case .closing, .closed:
  529. // We're already closing/closed, we can ignore this.
  530. return .doNothing
  531. }
  532. }
  533. enum GetChannelAction {
  534. /// No action is required.
  535. case doNothing
  536. /// Succeed the Channel promise.
  537. case succeed
  538. /// Fail the 'Channel' promise, the RPC is already complete.
  539. case fail
  540. }
  541. /// The caller has asked for the underlying `Channel`.
  542. mutating func getChannel() -> GetChannelAction {
  543. switch self {
  544. case .idle, .awaitingTransport, .activatingTransport:
  545. // Do nothing, we'll complete the promise when we become active or closed.
  546. return .doNothing
  547. case .active:
  548. // We're already active, so there was no promise to succeed when we made this transition. We
  549. // can complete it now.
  550. return .succeed
  551. case .closing:
  552. // We'll complete the promise when we transition to closed.
  553. return .doNothing
  554. case .closed:
  555. // We're already closed; there was no promise to fail when we made this transition. We can go
  556. // ahead and fail it now though.
  557. return .fail
  558. }
  559. }
  560. }
  561. // MARK: - State Actions
  562. extension ClientTransport {
  563. /// Configures this transport with the `configurator`.
  564. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  565. configurator(self).whenFailure { error in
  566. if error is GRPCStatus || error is GRPCStatusTransformable {
  567. self.channelError(error)
  568. } else {
  569. // Fallback to something which will mark the RPC as 'unavailable'.
  570. self.channelError(ConnectionFailure(reason: error))
  571. }
  572. }
  573. }
  574. /// Append a request part to the write buffer.
  575. /// - Parameters:
  576. /// - part: The request part to buffer.
  577. /// - promise: A promise to complete when the request part has been sent.
  578. private func buffer(
  579. _ part: GRPCClientRequestPart<Request>,
  580. promise: EventLoopPromise<Void>?
  581. ) {
  582. self.logger.debug("buffering request part", metadata: [
  583. "request_part": "\(part.name)",
  584. "call_state": self.stateForLogging,
  585. ], source: "GRPC")
  586. self.writeBuffer.append(.init(request: part, promise: promise))
  587. }
  588. /// Writes any buffered request parts to the `Channel`.
  589. /// - Parameter channel: The `Channel` to write any buffered request parts to.
  590. private func unbuffer(to channel: Channel) {
  591. // Save any flushing until we're done writing.
  592. var shouldFlush = false
  593. self.logger.debug("unbuffering request parts", metadata: [
  594. "request_parts": "\(self.writeBuffer.count)",
  595. ], source: "GRPC")
  596. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  597. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  598. // may miss more buffered writes.
  599. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  600. // Pull out as many writes as possible.
  601. while let write = self.writeBuffer.popFirst() {
  602. self.logger.debug("unbuffering request part", metadata: [
  603. "request_part": "\(write.request.name)",
  604. ], source: "GRPC")
  605. if !shouldFlush {
  606. shouldFlush = self.shouldFlush(after: write.request)
  607. }
  608. self.write(write.request, promise: write.promise, flush: false)
  609. }
  610. // Okay, flush now.
  611. if shouldFlush {
  612. shouldFlush = false
  613. channel.flush()
  614. }
  615. }
  616. if self.writeBuffer.isEmpty {
  617. self.logger.debug("request buffer drained", source: "GRPC")
  618. } else {
  619. self.logger.notice(
  620. "unbuffering aborted",
  621. metadata: ["call_state": self.stateForLogging],
  622. source: "GRPC"
  623. )
  624. }
  625. // We're unbuffered. What now?
  626. switch self.state.unbuffered() {
  627. case .doNothing:
  628. ()
  629. case .succeedChannelPromise:
  630. self.channelPromise?.succeed(channel)
  631. }
  632. }
  633. /// Fails any promises that come with buffered writes with `error`.
  634. /// - Parameter error: The `Error` to fail promises with.
  635. private func failBufferedWrites(with error: Error) {
  636. self.logger.debug("failing buffered writes", metadata: [
  637. "call_state": self.stateForLogging,
  638. ], source: "GRPC")
  639. while let write = self.writeBuffer.popFirst() {
  640. write.promise?.fail(error)
  641. }
  642. }
  643. /// Write a request part to the `Channel`.
  644. /// - Parameters:
  645. /// - part: The request part to write.
  646. /// - channel: The `Channel` to write `part` in to.
  647. /// - promise: A promise to complete once the write has been completed.
  648. /// - flush: Whether to flush the `Channel` after writing.
  649. private func write(
  650. _ part: GRPCClientRequestPart<Request>,
  651. promise: EventLoopPromise<Void>?,
  652. flush: Bool
  653. ) {
  654. guard let context = self.context else {
  655. promise?.fail(GRPCError.AlreadyComplete())
  656. return
  657. }
  658. switch part {
  659. case let .metadata(headers):
  660. let head = self.makeRequestHead(with: headers)
  661. context.channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  662. case let .message(request, metadata):
  663. let message = _MessageContext<Request>(request, compressed: metadata.compress)
  664. context.channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  665. case .end:
  666. context.channel.write(self.wrapOutboundOut(.end), promise: promise)
  667. }
  668. if flush {
  669. context.channel.flush()
  670. }
  671. }
  672. /// Forward the response part to the interceptor pipeline.
  673. /// - Parameter part: The response part to forward.
  674. private func forwardToInterceptors(_ part: _GRPCClientResponsePart<Response>) {
  675. switch part {
  676. case let .initialMetadata(metadata):
  677. self._pipeline?.receive(.metadata(metadata))
  678. case let .message(context):
  679. self._pipeline?.receive(.message(context.message))
  680. case let .trailingMetadata(trailers):
  681. // The `Channel` delivers trailers and `GRPCStatus`, we want to emit them together in the
  682. // interceptor pipeline.
  683. self.trailers = trailers
  684. case let .status(status):
  685. let trailers = self.trailers ?? [:]
  686. self.trailers = nil
  687. self._pipeline?.receive(.end(status, trailers))
  688. }
  689. }
  690. /// Forward the error to the interceptor pipeline.
  691. /// - Parameter error: The error to forward.
  692. private func forwardErrorToInterceptors(_ error: Error) {
  693. self._pipeline?.errorCaught(error)
  694. }
  695. }
  696. // MARK: - Helpers
  697. extension ClientTransport {
  698. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  699. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  700. switch part {
  701. case .metadata:
  702. // If we're not streaming requests then we hold off on the flush until we see end.
  703. return self.isStreamingRequests
  704. case let .message(_, metadata):
  705. // Message flushing is determined by caller preference.
  706. return metadata.flush
  707. case .end:
  708. // Always flush at the end of the request stream.
  709. return true
  710. }
  711. }
  712. /// Make a `_GRPCRequestHead` with the provided metadata.
  713. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  714. return _GRPCRequestHead(
  715. method: self.callDetails.options.cacheable ? "GET" : "POST",
  716. scheme: self.callDetails.scheme,
  717. path: self.callDetails.path,
  718. host: self.callDetails.authority,
  719. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  720. customMetadata: metadata,
  721. encoding: self.callDetails.options.messageEncoding
  722. )
  723. }
  724. }
  725. extension GRPCClientRequestPart {
  726. /// The name of the request part, used for logging.
  727. fileprivate var name: String {
  728. switch self {
  729. case .metadata:
  730. return "metadata"
  731. case .message:
  732. return "message"
  733. case .end:
  734. return "end"
  735. }
  736. }
  737. }
  738. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  739. // well as extract a 'GRPCStatus' with code '.unavailable'.
  740. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  741. /// The reason the connection failed.
  742. var reason: Error
  743. init(reason: Error) {
  744. self.reason = reason
  745. }
  746. var description: String {
  747. return String(describing: self.reason)
  748. }
  749. func makeGRPCStatus() -> GRPCStatus {
  750. return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
  751. }
  752. }