ClientTransport.swift 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `eventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` this transport is running on.
  39. @usableFromInline
  40. internal let eventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: State = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// A request part and a promise.
  51. private struct RequestAndPromise {
  52. var request: ClientRequestPart<Request>
  53. var promise: EventLoopPromise<Void>?
  54. }
  55. /// Details about the call.
  56. internal let callDetails: CallDetails
  57. /// A logger.
  58. internal var logger: Logger {
  59. return self.callDetails.options.logger
  60. }
  61. /// Is the call streaming requests?
  62. private var isStreamingRequests: Bool {
  63. switch self.callDetails.type {
  64. case .unary, .serverStreaming:
  65. return false
  66. case .clientStreaming, .bidirectionalStreaming:
  67. return true
  68. }
  69. }
  70. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  71. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  72. // trailers here and only forward them when we receive the status.
  73. private var trailers: HPACKHeaders?
  74. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  75. /// from the `ChannelPipeline` in order to break reference cycles.
  76. @usableFromInline
  77. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  78. /// Our current state as logging metadata.
  79. private var stateForLogging: Logger.MetadataValue {
  80. if self.state.mayBuffer {
  81. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  82. } else {
  83. return "\(self.state)"
  84. }
  85. }
  86. internal init(
  87. details: CallDetails,
  88. eventLoop: EventLoop,
  89. interceptors: [ClientInterceptor<Request, Response>],
  90. errorDelegate: ClientErrorDelegate?,
  91. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  92. ) {
  93. self.eventLoop = eventLoop
  94. self.callDetails = details
  95. self._pipeline = ClientInterceptorPipeline(
  96. eventLoop: eventLoop,
  97. details: details,
  98. interceptors: interceptors,
  99. errorDelegate: errorDelegate,
  100. onCancel: self.cancelFromPipeline(promise:),
  101. onRequestPart: self.sendFromPipeline(_:promise:),
  102. onResponsePart: onResponsePart
  103. )
  104. }
  105. // MARK: - Call Object API
  106. /// Configure the transport to communicate with the server.
  107. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  108. /// - Important: This *must* to be called from the `eventLoop`.
  109. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  110. self.eventLoop.assertInEventLoop()
  111. self.act(on: self.state.configureTransport(with: configurator))
  112. }
  113. /// Send a request part – via the interceptor pipeline – to the server.
  114. /// - Parameters:
  115. /// - part: The part to send.
  116. /// - promise: A promise which will be completed when the request part has been handled.
  117. /// - Important: This *must* to be called from the `eventLoop`.
  118. @inlinable
  119. internal func send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  120. self.eventLoop.assertInEventLoop()
  121. if let pipeline = self._pipeline {
  122. pipeline.send(part, promise: promise)
  123. } else {
  124. promise?.fail(GRPCError.AlreadyComplete())
  125. }
  126. }
  127. /// Attempt to cancel the RPC notifying any interceptors.
  128. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  129. /// been handled.
  130. internal func cancel(promise: EventLoopPromise<Void>?) {
  131. self.eventLoop.assertInEventLoop()
  132. if let pipeline = self._pipeline {
  133. pipeline.cancel(promise: promise)
  134. } else {
  135. promise?.fail(GRPCError.AlreadyComplete())
  136. }
  137. }
  138. /// A request for the underlying `Channel`.
  139. internal func channel() -> EventLoopFuture<Channel> {
  140. self.eventLoop.assertInEventLoop()
  141. // Do we already have a promise?
  142. if let promise = self.channelPromise {
  143. return promise.futureResult
  144. } else {
  145. // Make and store the promise.
  146. let promise = self.eventLoop.makePromise(of: Channel.self)
  147. self.channelPromise = promise
  148. // Ask the state machine if we can have it.
  149. self.act(on: self.state.getChannel())
  150. return promise.futureResult
  151. }
  152. }
  153. }
  154. // MARK: - Pipeline API
  155. extension ClientTransport {
  156. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  157. /// - Parameters:
  158. /// - part: The request part to send.
  159. /// - promise: A promise which will be completed when the part has been handled.
  160. /// - Important: This *must* to be called from the `eventLoop`.
  161. private func sendFromPipeline(
  162. _ part: ClientRequestPart<Request>,
  163. promise: EventLoopPromise<Void>?
  164. ) {
  165. self.eventLoop.assertInEventLoop()
  166. self.act(on: self.state.send(part, promise: promise))
  167. }
  168. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  169. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  170. /// - Important: This *must* to be called from the `eventLoop`.
  171. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  172. self.eventLoop.assertInEventLoop()
  173. self.act(on: self.state.cancel(promise: promise))
  174. }
  175. }
  176. // MARK: - ChannelHandler API
  177. extension ClientTransport: ChannelInboundHandler {
  178. @usableFromInline
  179. typealias InboundIn = _GRPCClientResponsePart<Response>
  180. @usableFromInline
  181. typealias OutboundOut = _GRPCClientRequestPart<Request>
  182. @usableFromInline
  183. internal func handlerRemoved(context: ChannelHandlerContext) {
  184. self.eventLoop.assertInEventLoop()
  185. // Break the reference cycle.
  186. self._pipeline = nil
  187. }
  188. internal func channelError(_ error: Error) {
  189. self.eventLoop.assertInEventLoop()
  190. self.act(on: self.state.channelError(error))
  191. }
  192. @usableFromInline
  193. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  194. self.channelError(error)
  195. }
  196. @usableFromInline
  197. internal func channelActive(context: ChannelHandlerContext) {
  198. self.eventLoop.assertInEventLoop()
  199. self.logger.debug("activated stream channel", source: "GRPC")
  200. self.act(on: self.state.channelActive(context: context))
  201. }
  202. @usableFromInline
  203. internal func channelInactive(context: ChannelHandlerContext) {
  204. self.eventLoop.assertInEventLoop()
  205. self.act(on: self.state.channelInactive(context: context))
  206. }
  207. @usableFromInline
  208. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  209. self.eventLoop.assertInEventLoop()
  210. let part = self.unwrapInboundIn(data)
  211. self.act(on: self.state.channelRead(part))
  212. // (We're the end of the channel. No need to forward anything.)
  213. }
  214. }
  215. // MARK: - State Handling
  216. extension ClientTransport {
  217. fileprivate enum State {
  218. /// Idle. We're waiting for the RPC to be configured.
  219. ///
  220. /// Valid transitions:
  221. /// - `awaitingTransport` (the transport is being configured)
  222. /// - `closed` (the RPC cancels)
  223. case idle
  224. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  225. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  226. /// transport in this state is an error.
  227. ///
  228. /// Valid transitions:
  229. /// - `activatingTransport` (the channel becomes active)
  230. /// - `closing` (the RPC cancels)
  231. /// - `closed` (the channel fails to become active)
  232. case awaitingTransport
  233. /// The transport is active but we're unbuffering any requests to write on that transport.
  234. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  235. /// is okay.
  236. ///
  237. /// Valid transitions:
  238. /// - `active` (we finish unbuffering)
  239. /// - `closing` (the RPC cancels, the channel encounters an error)
  240. /// - `closed` (the channel becomes inactive)
  241. case activatingTransport(Channel)
  242. /// Fully active. An RPC is in progress and is communicating over an active transport.
  243. ///
  244. /// Valid transitions:
  245. /// - `closing` (the RPC cancels, the channel encounters an error)
  246. /// - `closed` (the channel becomes inactive)
  247. case active(Channel)
  248. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  249. /// become inactive yet.
  250. ///
  251. /// Valid transitions:
  252. /// - `closed` (the channel becomes inactive)
  253. case closing
  254. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  255. /// be ignored.
  256. ///
  257. /// Valid transitions:
  258. /// - none: this state is terminal.
  259. case closed
  260. /// Whether writes may be unbuffered in this state.
  261. internal var isUnbuffering: Bool {
  262. switch self {
  263. case .activatingTransport:
  264. return true
  265. case .idle, .awaitingTransport, .active, .closing, .closed:
  266. return false
  267. }
  268. }
  269. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  270. internal var mayBuffer: Bool {
  271. switch self {
  272. case .idle, .activatingTransport, .awaitingTransport:
  273. return true
  274. case .active, .closing, .closed:
  275. return false
  276. }
  277. }
  278. }
  279. }
  280. extension ClientTransport.State {
  281. /// Actions which should be performed as a result telling the state machine something changed.
  282. fileprivate enum Action {
  283. /// Do nothing.
  284. case none
  285. /// Configure a `Channel` with the configurator.
  286. case configure(with: (ChannelHandler) -> EventLoopFuture<Void>)
  287. /// Append the request part and promise to the write buffer.
  288. case buffer(ClientRequestPart<Request>, EventLoopPromise<Void>?)
  289. /// Write - and flush if necessary – any request parts in the buffer to the `Channel`.
  290. case unbufferToChannel(Channel)
  291. /// Fail any buffered writes with the error.
  292. case failBufferedWrites(with: Error)
  293. /// Write the given operation to the channel.
  294. case writeToChannel(Channel, ClientRequestPart<Request>, EventLoopPromise<Void>?)
  295. /// Write the response part to the RPC.
  296. case forwardToInterceptors(_GRPCClientResponsePart<Response>)
  297. /// Fail the RPC with the given error. This includes failing any outstanding writes.
  298. case forwardErrorToInterceptors(Error)
  299. /// Close the given channel.
  300. case close(Channel)
  301. /// Fail the given promise with the error provided.
  302. case completePromise(EventLoopPromise<Void>?, with: Result<Void, Error>)
  303. /// Complete the lazy channel promise with this result.
  304. case completeChannelPromise(with: Result<Channel, Error>)
  305. /// Perform multiple actions.
  306. indirect case multiple([Action])
  307. }
  308. }
  309. extension ClientTransport.State {
  310. /// The caller would like to configure the transport.
  311. mutating func configureTransport(
  312. with configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>
  313. ) -> Action {
  314. switch self {
  315. // We're idle until we configure. Anything else is just a repeat request to configure.
  316. case .idle:
  317. self = .awaitingTransport
  318. return .configure(with: configurator)
  319. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  320. return .none
  321. }
  322. }
  323. /// The pipeline would like to send a request part to the transport.
  324. mutating func send(
  325. _ part: ClientRequestPart<Request>,
  326. promise: EventLoopPromise<Void>?
  327. ) -> Action {
  328. switch self {
  329. // We don't have any transport yet, just buffer the part.
  330. case .idle, .awaitingTransport, .activatingTransport:
  331. return .buffer(part, promise)
  332. // We have a `Channel`, we can pipe the write straight through.
  333. case let .active(channel):
  334. return .writeToChannel(channel, part, promise)
  335. // The transport is going or has gone away. Fail the promise.
  336. case .closing, .closed:
  337. return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
  338. }
  339. }
  340. /// We finished dealing with the buffered writes.
  341. mutating func unbuffered() -> Action {
  342. switch self {
  343. // These can't happen since we only begin unbuffering when we transition to
  344. // '.activatingTransport', which must come after these two states..
  345. case .idle, .awaitingTransport:
  346. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  347. // We dealt with any buffered writes. We can become active now. This is the only way to become
  348. // active.
  349. case let .activatingTransport(channel):
  350. self = .active(channel)
  351. return .completeChannelPromise(with: .success(channel))
  352. case .active:
  353. preconditionFailure("Unbuffering completed but the transport is already active")
  354. // Something caused us to close while unbuffering, that's okay, we won't take any further
  355. // action.
  356. case .closing, .closed:
  357. return .none
  358. }
  359. }
  360. /// Cancel the RPC and associated `Channel`, if possible.
  361. mutating func cancel(promise: EventLoopPromise<Void>?) -> Action {
  362. switch self {
  363. case .idle:
  364. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  365. // we're done, fail any writes, and then deal with the cancellation promise.
  366. self = .closed
  367. let error = GRPCError.RPCCancelledByClient().captureContext()
  368. return .multiple([
  369. .forwardErrorToInterceptors(error),
  370. .failBufferedWrites(with: error.error),
  371. .completePromise(promise, with: .success(())),
  372. .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete())),
  373. ])
  374. case .awaitingTransport:
  375. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  376. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  377. // the `Channel` becoming active (see `channelActive(context:)`).
  378. self = .closing
  379. let error = GRPCError.RPCCancelledByClient().captureContext()
  380. return .multiple([
  381. .forwardErrorToInterceptors(error),
  382. .failBufferedWrites(with: error.error),
  383. .completePromise(promise, with: .success(())),
  384. ])
  385. case let .activatingTransport(channel):
  386. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  387. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  388. // any buffered writes and then complete the cancellatiion promise.
  389. self = .closing
  390. let error = GRPCError.RPCCancelledByClient().captureContext()
  391. return .multiple([
  392. .forwardErrorToInterceptors(error),
  393. .close(channel),
  394. .failBufferedWrites(with: error.error),
  395. .completePromise(promise, with: .success(())),
  396. ])
  397. case let .active(channel):
  398. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  399. self = .closing
  400. let error = GRPCError.RPCCancelledByClient().captureContext()
  401. return .multiple([
  402. .forwardErrorToInterceptors(error),
  403. .close(channel),
  404. .completePromise(promise, with: .success(())),
  405. ])
  406. case .closing, .closed:
  407. // We're already closing or closing. The cancellation is too late.
  408. return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
  409. }
  410. }
  411. /// `channelActive` was invoked on the transport by the `Channel`.
  412. mutating func channelActive(context: ChannelHandlerContext) -> Action {
  413. // The channel has become active: what now?
  414. switch self {
  415. case .idle:
  416. preconditionFailure("Can't activate an idle transport")
  417. case .awaitingTransport:
  418. self = .activatingTransport(context.channel)
  419. return .unbufferToChannel(context.channel)
  420. case .activatingTransport, .active:
  421. preconditionFailure("Invalid state: stream is already active")
  422. case .closing:
  423. // We remain in closing: we only transition to closed on 'channelInactive'.
  424. return .close(context.channel)
  425. case .closed:
  426. preconditionFailure("Invalid state: stream is already inactive")
  427. }
  428. }
  429. /// `channelInactive` was invoked on the transport by the `Channel`.
  430. mutating func channelInactive(context: ChannelHandlerContext) -> Action {
  431. switch self {
  432. case .idle:
  433. // We can't become inactive before we've requested a `Channel`.
  434. preconditionFailure("Can't deactivate an idle transport")
  435. case .awaitingTransport, .activatingTransport, .active:
  436. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  437. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  438. // synthesize an error status to fail the RPC with.
  439. self = .closed
  440. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  441. return .multiple([
  442. .forwardErrorToInterceptors(status),
  443. .failBufferedWrites(with: status),
  444. .completeChannelPromise(with: .failure(status)),
  445. ])
  446. case .closing:
  447. // We were already closing, now we're fully closed.
  448. self = .closed
  449. return .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete()))
  450. case .closed:
  451. // We're already closed.
  452. return .none
  453. }
  454. }
  455. /// `channelRead` was invoked on the transport by the `Channel`.
  456. mutating func channelRead(_ part: _GRPCClientResponsePart<Response>) -> Action {
  457. switch self {
  458. case .idle, .awaitingTransport:
  459. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  460. preconditionFailure("Can't receive response part on idle transport")
  461. case .activatingTransport, .active:
  462. // We have an active `Channel`, we can forward the request part but we may need to start
  463. // closing if we see the status, since it indicates the call is terminating.
  464. switch part {
  465. case .initialMetadata, .message, .trailingMetadata:
  466. ()
  467. case .status:
  468. // The status is the final part of the RPC. We will become inactive soon.
  469. self = .closing
  470. }
  471. return .forwardToInterceptors(part)
  472. case .closing, .closed:
  473. // We closed early, ignore are reads.
  474. return .none
  475. }
  476. }
  477. /// We received an error from the `Channel`.
  478. mutating func channelError(_ error: Error) -> Action {
  479. switch self {
  480. case .idle:
  481. // The `Channel` can't error if it doesn't exist.
  482. preconditionFailure("Can't catch error on idle transport")
  483. case .awaitingTransport:
  484. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  485. // buffered writes along the way.
  486. self = .closing
  487. return .multiple([
  488. .forwardErrorToInterceptors(error),
  489. .failBufferedWrites(with: error),
  490. ])
  491. case let .activatingTransport(channel),
  492. let .active(channel):
  493. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  494. self = .closing
  495. return .multiple([
  496. .forwardErrorToInterceptors(error),
  497. .failBufferedWrites(with: error),
  498. .close(channel),
  499. ])
  500. case .closing, .closed:
  501. // We're already closing/closed, we can ignore this.
  502. return .none
  503. }
  504. }
  505. /// The caller has asked for the underlying `Channel`.
  506. mutating func getChannel() -> Action {
  507. switch self {
  508. case .idle, .awaitingTransport, .activatingTransport:
  509. // Do nothing, we'll complete the promise when we become active or closed.
  510. return .none
  511. case let .active(channel):
  512. // We're already active, so there was no promise to succeed when we made this transition. We
  513. // can complete it now.
  514. return .completeChannelPromise(with: .success(channel))
  515. case .closing:
  516. // We'll complete the promise when we transition to closed.
  517. return .none
  518. case .closed:
  519. // We're already closed; there was no promise to fail when we made this transition. We can go
  520. // ahead and fail it now though.
  521. return .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete()))
  522. }
  523. }
  524. }
  525. // MARK: - State Actions
  526. extension ClientTransport {
  527. /// Act on the action which resulted from prodding the state machine.
  528. /// - Parameter action: The action to act on.
  529. private func act(on action: State.Action) {
  530. switch action {
  531. case .none:
  532. ()
  533. case let .configure(configurator):
  534. self.configure(using: configurator)
  535. case let .buffer(part, promise):
  536. self.buffer(part, promise: promise)
  537. case let .unbufferToChannel(channel):
  538. self.unbuffer(to: channel)
  539. case let .failBufferedWrites(with: error):
  540. self.failBufferedWrites(with: error)
  541. case let .writeToChannel(channel, part, promise):
  542. self.write(part, to: channel, promise: promise, flush: self.shouldFlush(after: part))
  543. case let .forwardToInterceptors(response: part):
  544. self.forwardToInterceptors(part)
  545. case let .forwardErrorToInterceptors(error: error):
  546. self.forwardErrorToInterceptors(error)
  547. case let .completePromise(promise, result):
  548. promise?.completeWith(result)
  549. case let .completeChannelPromise(result):
  550. self.channelPromise?.completeWith(result)
  551. case let .close(channel):
  552. channel.close(mode: .all, promise: nil)
  553. case let .multiple(actions):
  554. for action in actions {
  555. self.act(on: action)
  556. }
  557. }
  558. }
  559. /// Configures this transport with the `configurator`.
  560. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  561. configurator(self).whenFailure { error in
  562. if error is GRPCStatus || error is GRPCStatusTransformable {
  563. self.channelError(error)
  564. } else {
  565. // Fallback to something which will mark the RPC as 'unavailable'.
  566. self.channelError(ConnectionFailure(reason: error))
  567. }
  568. }
  569. }
  570. /// Append a request part to the write buffer.
  571. /// - Parameters:
  572. /// - part: The request part to buffer.
  573. /// - promise: A promise to complete when the request part has been sent.
  574. private func buffer(
  575. _ part: ClientRequestPart<Request>,
  576. promise: EventLoopPromise<Void>?
  577. ) {
  578. self.logger.debug("buffering request part", metadata: [
  579. "request_part": "\(part.name)",
  580. "call_state": self.stateForLogging,
  581. ], source: "GRPC")
  582. self.writeBuffer.append(.init(request: part, promise: promise))
  583. }
  584. /// Writes any buffered request parts to the `Channel`.
  585. /// - Parameter channel: The `Channel` to write any buffered request parts to.
  586. private func unbuffer(to channel: Channel) {
  587. // Save any flushing until we're done writing.
  588. var shouldFlush = false
  589. self.logger.debug("unbuffering request parts", metadata: [
  590. "request_parts": "\(self.writeBuffer.count)",
  591. ], source: "GRPC")
  592. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  593. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  594. // may miss more buffered writes.
  595. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  596. // Pull out as many writes as possible.
  597. while let write = self.writeBuffer.popFirst() {
  598. self.logger.debug("unbuffering request part", metadata: [
  599. "request_part": "\(write.request.name)",
  600. ], source: "GRPC")
  601. if !shouldFlush {
  602. shouldFlush = self.shouldFlush(after: write.request)
  603. }
  604. self.write(write.request, to: channel, promise: write.promise, flush: false)
  605. }
  606. // Okay, flush now.
  607. if shouldFlush {
  608. shouldFlush = false
  609. channel.flush()
  610. }
  611. }
  612. if self.writeBuffer.isEmpty {
  613. self.logger.debug("request buffer drained", source: "GRPC")
  614. } else {
  615. self.logger.notice(
  616. "unbuffering aborted",
  617. metadata: ["call_state": self.stateForLogging],
  618. source: "GRPC"
  619. )
  620. }
  621. // We're unbuffered. What now?
  622. self.act(on: self.state.unbuffered())
  623. }
  624. /// Fails any promises that come with buffered writes with `error`.
  625. /// - Parameter error: The `Error` to fail promises with.
  626. private func failBufferedWrites(with error: Error) {
  627. self.logger.debug("failing buffered writes", metadata: [
  628. "call_state": self.stateForLogging,
  629. ], source: "GRPC")
  630. while let write = self.writeBuffer.popFirst() {
  631. write.promise?.fail(error)
  632. }
  633. }
  634. /// Write a request part to the `Channel`.
  635. /// - Parameters:
  636. /// - part: The request part to write.
  637. /// - channel: The `Channel` to write `part` in to.
  638. /// - promise: A promise to complete once the write has been completed.
  639. /// - flush: Whether to flush the `Channel` after writing.
  640. private func write(
  641. _ part: ClientRequestPart<Request>,
  642. to channel: Channel,
  643. promise: EventLoopPromise<Void>?,
  644. flush: Bool
  645. ) {
  646. switch part {
  647. case let .metadata(headers):
  648. let head = self.makeRequestHead(with: headers)
  649. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  650. case let .message(request, metadata):
  651. let message = _MessageContext<Request>(request, compressed: metadata.compress)
  652. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  653. case .end:
  654. channel.write(self.wrapOutboundOut(.end), promise: promise)
  655. }
  656. if flush {
  657. channel.flush()
  658. }
  659. }
  660. /// Forward the response part to the interceptor pipeline.
  661. /// - Parameter part: The response part to forward.
  662. private func forwardToInterceptors(_ part: _GRPCClientResponsePart<Response>) {
  663. switch part {
  664. case let .initialMetadata(metadata):
  665. self._pipeline?.receive(.metadata(metadata))
  666. case let .message(context):
  667. self._pipeline?.receive(.message(context.message))
  668. case let .trailingMetadata(trailers):
  669. // The `Channel` delivers trailers and `GRPCStatus`, we want to emit them together in the
  670. // interceptor pipeline.
  671. self.trailers = trailers
  672. case let .status(status):
  673. let trailers = self.trailers ?? [:]
  674. self.trailers = nil
  675. self._pipeline?.receive(.end(status, trailers))
  676. }
  677. }
  678. /// Forward the error to the interceptor pipeline.
  679. /// - Parameter error: The error to forward.
  680. private func forwardErrorToInterceptors(_ error: Error) {
  681. self._pipeline?.receive(.error(error))
  682. }
  683. }
  684. // MARK: - Helpers
  685. extension ClientTransport {
  686. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  687. private func shouldFlush(after part: ClientRequestPart<Request>) -> Bool {
  688. switch part {
  689. case .metadata:
  690. // If we're not streaming requests then we hold off on the flush until we see end.
  691. return self.isStreamingRequests
  692. case let .message(_, metadata):
  693. // Message flushing is determined by caller preference.
  694. return metadata.flush
  695. case .end:
  696. // Always flush at the end of the request stream.
  697. return true
  698. }
  699. }
  700. /// Make a `_GRPCRequestHead` with the provided metadata.
  701. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  702. return _GRPCRequestHead(
  703. method: self.callDetails.options.cacheable ? "GET" : "POST",
  704. scheme: self.callDetails.scheme,
  705. path: self.callDetails.path,
  706. host: self.callDetails.authority,
  707. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  708. customMetadata: metadata,
  709. encoding: self.callDetails.options.messageEncoding
  710. )
  711. }
  712. }
  713. extension ClientRequestPart {
  714. /// The name of the request part, used for logging.
  715. fileprivate var name: String {
  716. switch self {
  717. case .metadata:
  718. return "metadata"
  719. case .message:
  720. return "message"
  721. case .end:
  722. return "end"
  723. }
  724. }
  725. }