ClientTransport.swift 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `eventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` this transport is running on.
  39. @usableFromInline
  40. internal let eventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: State = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// A request part and a promise.
  51. private struct RequestAndPromise {
  52. var request: GRPCClientRequestPart<Request>
  53. var promise: EventLoopPromise<Void>?
  54. }
  55. /// Details about the call.
  56. internal let callDetails: CallDetails
  57. /// A logger.
  58. internal var logger: Logger {
  59. return self.callDetails.options.logger
  60. }
  61. /// Is the call streaming requests?
  62. private var isStreamingRequests: Bool {
  63. switch self.callDetails.type {
  64. case .unary, .serverStreaming:
  65. return false
  66. case .clientStreaming, .bidirectionalStreaming:
  67. return true
  68. }
  69. }
  70. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  71. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  72. // trailers here and only forward them when we receive the status.
  73. private var trailers: HPACKHeaders?
  74. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  75. /// from the `ChannelPipeline` in order to break reference cycles.
  76. @usableFromInline
  77. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  78. /// Our current state as logging metadata.
  79. private var stateForLogging: Logger.MetadataValue {
  80. if self.state.mayBuffer {
  81. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  82. } else {
  83. return "\(self.state)"
  84. }
  85. }
  86. internal init(
  87. details: CallDetails,
  88. eventLoop: EventLoop,
  89. interceptors: [ClientInterceptor<Request, Response>],
  90. errorDelegate: ClientErrorDelegate?,
  91. onError: @escaping (Error) -> Void,
  92. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  93. ) {
  94. self.eventLoop = eventLoop
  95. self.callDetails = details
  96. self._pipeline = ClientInterceptorPipeline(
  97. eventLoop: eventLoop,
  98. details: details,
  99. interceptors: interceptors,
  100. errorDelegate: errorDelegate,
  101. onError: onError,
  102. onCancel: self.cancelFromPipeline(promise:),
  103. onRequestPart: self.sendFromPipeline(_:promise:),
  104. onResponsePart: onResponsePart
  105. )
  106. }
  107. // MARK: - Call Object API
  108. /// Configure the transport to communicate with the server.
  109. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  110. /// - Important: This *must* to be called from the `eventLoop`.
  111. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  112. self.eventLoop.assertInEventLoop()
  113. self.act(on: self.state.configureTransport(with: configurator))
  114. }
  115. /// Send a request part – via the interceptor pipeline – to the server.
  116. /// - Parameters:
  117. /// - part: The part to send.
  118. /// - promise: A promise which will be completed when the request part has been handled.
  119. /// - Important: This *must* to be called from the `eventLoop`.
  120. @inlinable
  121. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  122. self.eventLoop.assertInEventLoop()
  123. if let pipeline = self._pipeline {
  124. pipeline.send(part, promise: promise)
  125. } else {
  126. promise?.fail(GRPCError.AlreadyComplete())
  127. }
  128. }
  129. /// Attempt to cancel the RPC notifying any interceptors.
  130. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  131. /// been handled.
  132. internal func cancel(promise: EventLoopPromise<Void>?) {
  133. self.eventLoop.assertInEventLoop()
  134. if let pipeline = self._pipeline {
  135. pipeline.cancel(promise: promise)
  136. } else {
  137. promise?.fail(GRPCError.AlreadyComplete())
  138. }
  139. }
  140. /// A request for the underlying `Channel`.
  141. internal func channel() -> EventLoopFuture<Channel> {
  142. self.eventLoop.assertInEventLoop()
  143. // Do we already have a promise?
  144. if let promise = self.channelPromise {
  145. return promise.futureResult
  146. } else {
  147. // Make and store the promise.
  148. let promise = self.eventLoop.makePromise(of: Channel.self)
  149. self.channelPromise = promise
  150. // Ask the state machine if we can have it.
  151. self.act(on: self.state.getChannel())
  152. return promise.futureResult
  153. }
  154. }
  155. }
  156. // MARK: - Pipeline API
  157. extension ClientTransport {
  158. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  159. /// - Parameters:
  160. /// - part: The request part to send.
  161. /// - promise: A promise which will be completed when the part has been handled.
  162. /// - Important: This *must* to be called from the `eventLoop`.
  163. private func sendFromPipeline(
  164. _ part: GRPCClientRequestPart<Request>,
  165. promise: EventLoopPromise<Void>?
  166. ) {
  167. self.eventLoop.assertInEventLoop()
  168. self.act(on: self.state.send(part, promise: promise))
  169. }
  170. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  171. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  172. /// - Important: This *must* to be called from the `eventLoop`.
  173. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  174. self.eventLoop.assertInEventLoop()
  175. self.act(on: self.state.cancel(promise: promise))
  176. }
  177. }
  178. // MARK: - ChannelHandler API
  179. extension ClientTransport: ChannelInboundHandler {
  180. @usableFromInline
  181. typealias InboundIn = _GRPCClientResponsePart<Response>
  182. @usableFromInline
  183. typealias OutboundOut = _GRPCClientRequestPart<Request>
  184. @usableFromInline
  185. internal func handlerRemoved(context: ChannelHandlerContext) {
  186. self.eventLoop.assertInEventLoop()
  187. // Break the reference cycle.
  188. self._pipeline = nil
  189. }
  190. internal func channelError(_ error: Error) {
  191. self.eventLoop.assertInEventLoop()
  192. self.act(on: self.state.channelError(error))
  193. }
  194. @usableFromInline
  195. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  196. self.channelError(error)
  197. }
  198. @usableFromInline
  199. internal func channelActive(context: ChannelHandlerContext) {
  200. self.eventLoop.assertInEventLoop()
  201. self.logger.debug("activated stream channel", source: "GRPC")
  202. self.act(on: self.state.channelActive(context: context))
  203. }
  204. @usableFromInline
  205. internal func channelInactive(context: ChannelHandlerContext) {
  206. self.eventLoop.assertInEventLoop()
  207. self.act(on: self.state.channelInactive(context: context))
  208. }
  209. @usableFromInline
  210. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  211. self.eventLoop.assertInEventLoop()
  212. let part = self.unwrapInboundIn(data)
  213. self.act(on: self.state.channelRead(part))
  214. // (We're the end of the channel. No need to forward anything.)
  215. }
  216. }
  217. // MARK: - State Handling
  218. extension ClientTransport {
  219. fileprivate enum State {
  220. /// Idle. We're waiting for the RPC to be configured.
  221. ///
  222. /// Valid transitions:
  223. /// - `awaitingTransport` (the transport is being configured)
  224. /// - `closed` (the RPC cancels)
  225. case idle
  226. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  227. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  228. /// transport in this state is an error.
  229. ///
  230. /// Valid transitions:
  231. /// - `activatingTransport` (the channel becomes active)
  232. /// - `closing` (the RPC cancels)
  233. /// - `closed` (the channel fails to become active)
  234. case awaitingTransport
  235. /// The transport is active but we're unbuffering any requests to write on that transport.
  236. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  237. /// is okay.
  238. ///
  239. /// Valid transitions:
  240. /// - `active` (we finish unbuffering)
  241. /// - `closing` (the RPC cancels, the channel encounters an error)
  242. /// - `closed` (the channel becomes inactive)
  243. case activatingTransport(Channel)
  244. /// Fully active. An RPC is in progress and is communicating over an active transport.
  245. ///
  246. /// Valid transitions:
  247. /// - `closing` (the RPC cancels, the channel encounters an error)
  248. /// - `closed` (the channel becomes inactive)
  249. case active(Channel)
  250. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  251. /// become inactive yet.
  252. ///
  253. /// Valid transitions:
  254. /// - `closed` (the channel becomes inactive)
  255. case closing
  256. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  257. /// be ignored.
  258. ///
  259. /// Valid transitions:
  260. /// - none: this state is terminal.
  261. case closed
  262. /// Whether writes may be unbuffered in this state.
  263. internal var isUnbuffering: Bool {
  264. switch self {
  265. case .activatingTransport:
  266. return true
  267. case .idle, .awaitingTransport, .active, .closing, .closed:
  268. return false
  269. }
  270. }
  271. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  272. internal var mayBuffer: Bool {
  273. switch self {
  274. case .idle, .activatingTransport, .awaitingTransport:
  275. return true
  276. case .active, .closing, .closed:
  277. return false
  278. }
  279. }
  280. }
  281. }
  282. extension ClientTransport.State {
  283. /// Actions which should be performed as a result telling the state machine something changed.
  284. fileprivate enum Action {
  285. /// Do nothing.
  286. case none
  287. /// Configure a `Channel` with the configurator.
  288. case configure(with: (ChannelHandler) -> EventLoopFuture<Void>)
  289. /// Append the request part and promise to the write buffer.
  290. case buffer(GRPCClientRequestPart<Request>, EventLoopPromise<Void>?)
  291. /// Write - and flush if necessary – any request parts in the buffer to the `Channel`.
  292. case unbufferToChannel(Channel)
  293. /// Fail any buffered writes with the error.
  294. case failBufferedWrites(with: Error)
  295. /// Write the given operation to the channel.
  296. case writeToChannel(Channel, GRPCClientRequestPart<Request>, EventLoopPromise<Void>?)
  297. /// Write the response part to the RPC.
  298. case forwardToInterceptors(_GRPCClientResponsePart<Response>)
  299. /// Fail the RPC with the given error. This includes failing any outstanding writes.
  300. case forwardErrorToInterceptors(Error)
  301. /// Close the given channel.
  302. case close(Channel)
  303. /// Fail the given promise with the error provided.
  304. case completePromise(EventLoopPromise<Void>?, with: Result<Void, Error>)
  305. /// Complete the lazy channel promise with this result.
  306. case completeChannelPromise(with: Result<Channel, Error>)
  307. /// Perform multiple actions.
  308. indirect case multiple([Action])
  309. }
  310. }
  311. extension ClientTransport.State {
  312. /// The caller would like to configure the transport.
  313. mutating func configureTransport(
  314. with configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>
  315. ) -> Action {
  316. switch self {
  317. // We're idle until we configure. Anything else is just a repeat request to configure.
  318. case .idle:
  319. self = .awaitingTransport
  320. return .configure(with: configurator)
  321. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  322. return .none
  323. }
  324. }
  325. /// The pipeline would like to send a request part to the transport.
  326. mutating func send(
  327. _ part: GRPCClientRequestPart<Request>,
  328. promise: EventLoopPromise<Void>?
  329. ) -> Action {
  330. switch self {
  331. // We don't have any transport yet, just buffer the part.
  332. case .idle, .awaitingTransport, .activatingTransport:
  333. return .buffer(part, promise)
  334. // We have a `Channel`, we can pipe the write straight through.
  335. case let .active(channel):
  336. return .writeToChannel(channel, part, promise)
  337. // The transport is going or has gone away. Fail the promise.
  338. case .closing, .closed:
  339. return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
  340. }
  341. }
  342. /// We finished dealing with the buffered writes.
  343. mutating func unbuffered() -> Action {
  344. switch self {
  345. // These can't happen since we only begin unbuffering when we transition to
  346. // '.activatingTransport', which must come after these two states..
  347. case .idle, .awaitingTransport:
  348. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  349. // We dealt with any buffered writes. We can become active now. This is the only way to become
  350. // active.
  351. case let .activatingTransport(channel):
  352. self = .active(channel)
  353. return .completeChannelPromise(with: .success(channel))
  354. case .active:
  355. preconditionFailure("Unbuffering completed but the transport is already active")
  356. // Something caused us to close while unbuffering, that's okay, we won't take any further
  357. // action.
  358. case .closing, .closed:
  359. return .none
  360. }
  361. }
  362. /// Cancel the RPC and associated `Channel`, if possible.
  363. mutating func cancel(promise: EventLoopPromise<Void>?) -> Action {
  364. switch self {
  365. case .idle:
  366. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  367. // we're done, fail any writes, and then deal with the cancellation promise.
  368. self = .closed
  369. let error = GRPCError.RPCCancelledByClient().captureContext()
  370. return .multiple([
  371. .forwardErrorToInterceptors(error),
  372. .failBufferedWrites(with: error.error),
  373. .completePromise(promise, with: .success(())),
  374. .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete())),
  375. ])
  376. case .awaitingTransport:
  377. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  378. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  379. // the `Channel` becoming active (see `channelActive(context:)`).
  380. self = .closing
  381. let error = GRPCError.RPCCancelledByClient().captureContext()
  382. return .multiple([
  383. .forwardErrorToInterceptors(error),
  384. .failBufferedWrites(with: error.error),
  385. .completePromise(promise, with: .success(())),
  386. ])
  387. case let .activatingTransport(channel):
  388. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  389. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  390. // any buffered writes and then complete the cancellatiion promise.
  391. self = .closing
  392. let error = GRPCError.RPCCancelledByClient().captureContext()
  393. return .multiple([
  394. .forwardErrorToInterceptors(error),
  395. .close(channel),
  396. .failBufferedWrites(with: error.error),
  397. .completePromise(promise, with: .success(())),
  398. ])
  399. case let .active(channel):
  400. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  401. self = .closing
  402. let error = GRPCError.RPCCancelledByClient().captureContext()
  403. return .multiple([
  404. .forwardErrorToInterceptors(error),
  405. .close(channel),
  406. .completePromise(promise, with: .success(())),
  407. ])
  408. case .closing, .closed:
  409. // We're already closing or closing. The cancellation is too late.
  410. return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
  411. }
  412. }
  413. /// `channelActive` was invoked on the transport by the `Channel`.
  414. mutating func channelActive(context: ChannelHandlerContext) -> Action {
  415. // The channel has become active: what now?
  416. switch self {
  417. case .idle:
  418. preconditionFailure("Can't activate an idle transport")
  419. case .awaitingTransport:
  420. self = .activatingTransport(context.channel)
  421. return .unbufferToChannel(context.channel)
  422. case .activatingTransport, .active:
  423. preconditionFailure("Invalid state: stream is already active")
  424. case .closing:
  425. // We remain in closing: we only transition to closed on 'channelInactive'.
  426. return .close(context.channel)
  427. case .closed:
  428. preconditionFailure("Invalid state: stream is already inactive")
  429. }
  430. }
  431. /// `channelInactive` was invoked on the transport by the `Channel`.
  432. mutating func channelInactive(context: ChannelHandlerContext) -> Action {
  433. switch self {
  434. case .idle:
  435. // We can't become inactive before we've requested a `Channel`.
  436. preconditionFailure("Can't deactivate an idle transport")
  437. case .awaitingTransport, .activatingTransport, .active:
  438. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  439. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  440. // synthesize an error status to fail the RPC with.
  441. self = .closed
  442. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  443. return .multiple([
  444. .forwardErrorToInterceptors(status),
  445. .failBufferedWrites(with: status),
  446. .completeChannelPromise(with: .failure(status)),
  447. ])
  448. case .closing:
  449. // We were already closing, now we're fully closed.
  450. self = .closed
  451. return .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete()))
  452. case .closed:
  453. // We're already closed.
  454. return .none
  455. }
  456. }
  457. /// `channelRead` was invoked on the transport by the `Channel`.
  458. mutating func channelRead(_ part: _GRPCClientResponsePart<Response>) -> Action {
  459. switch self {
  460. case .idle, .awaitingTransport:
  461. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  462. preconditionFailure("Can't receive response part on idle transport")
  463. case .activatingTransport, .active:
  464. // We have an active `Channel`, we can forward the request part but we may need to start
  465. // closing if we see the status, since it indicates the call is terminating.
  466. switch part {
  467. case .initialMetadata, .message, .trailingMetadata:
  468. ()
  469. case .status:
  470. // The status is the final part of the RPC. We will become inactive soon.
  471. self = .closing
  472. }
  473. return .forwardToInterceptors(part)
  474. case .closing, .closed:
  475. // We closed early, ignore are reads.
  476. return .none
  477. }
  478. }
  479. /// We received an error from the `Channel`.
  480. mutating func channelError(_ error: Error) -> Action {
  481. switch self {
  482. case .idle:
  483. // The `Channel` can't error if it doesn't exist.
  484. preconditionFailure("Can't catch error on idle transport")
  485. case .awaitingTransport:
  486. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  487. // buffered writes along the way.
  488. self = .closing
  489. return .multiple([
  490. .forwardErrorToInterceptors(error),
  491. .failBufferedWrites(with: error),
  492. ])
  493. case let .activatingTransport(channel),
  494. let .active(channel):
  495. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  496. self = .closing
  497. return .multiple([
  498. .forwardErrorToInterceptors(error),
  499. .failBufferedWrites(with: error),
  500. .close(channel),
  501. ])
  502. case .closing, .closed:
  503. // We're already closing/closed, we can ignore this.
  504. return .none
  505. }
  506. }
  507. /// The caller has asked for the underlying `Channel`.
  508. mutating func getChannel() -> Action {
  509. switch self {
  510. case .idle, .awaitingTransport, .activatingTransport:
  511. // Do nothing, we'll complete the promise when we become active or closed.
  512. return .none
  513. case let .active(channel):
  514. // We're already active, so there was no promise to succeed when we made this transition. We
  515. // can complete it now.
  516. return .completeChannelPromise(with: .success(channel))
  517. case .closing:
  518. // We'll complete the promise when we transition to closed.
  519. return .none
  520. case .closed:
  521. // We're already closed; there was no promise to fail when we made this transition. We can go
  522. // ahead and fail it now though.
  523. return .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete()))
  524. }
  525. }
  526. }
  527. // MARK: - State Actions
  528. extension ClientTransport {
  529. /// Act on the action which resulted from prodding the state machine.
  530. /// - Parameter action: The action to act on.
  531. private func act(on action: State.Action) {
  532. switch action {
  533. case .none:
  534. ()
  535. case let .configure(configurator):
  536. self.configure(using: configurator)
  537. case let .buffer(part, promise):
  538. self.buffer(part, promise: promise)
  539. case let .unbufferToChannel(channel):
  540. self.unbuffer(to: channel)
  541. case let .failBufferedWrites(with: error):
  542. self.failBufferedWrites(with: error)
  543. case let .writeToChannel(channel, part, promise):
  544. self.write(part, to: channel, promise: promise, flush: self.shouldFlush(after: part))
  545. case let .forwardToInterceptors(response: part):
  546. self.forwardToInterceptors(part)
  547. case let .forwardErrorToInterceptors(error: error):
  548. self.forwardErrorToInterceptors(error)
  549. case let .completePromise(promise, result):
  550. promise?.completeWith(result)
  551. case let .completeChannelPromise(result):
  552. self.channelPromise?.completeWith(result)
  553. case let .close(channel):
  554. channel.close(mode: .all, promise: nil)
  555. case let .multiple(actions):
  556. for action in actions {
  557. self.act(on: action)
  558. }
  559. }
  560. }
  561. /// Configures this transport with the `configurator`.
  562. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  563. configurator(self).whenFailure { error in
  564. if error is GRPCStatus || error is GRPCStatusTransformable {
  565. self.channelError(error)
  566. } else {
  567. // Fallback to something which will mark the RPC as 'unavailable'.
  568. self.channelError(ConnectionFailure(reason: error))
  569. }
  570. }
  571. }
  572. /// Append a request part to the write buffer.
  573. /// - Parameters:
  574. /// - part: The request part to buffer.
  575. /// - promise: A promise to complete when the request part has been sent.
  576. private func buffer(
  577. _ part: GRPCClientRequestPart<Request>,
  578. promise: EventLoopPromise<Void>?
  579. ) {
  580. self.logger.debug("buffering request part", metadata: [
  581. "request_part": "\(part.name)",
  582. "call_state": self.stateForLogging,
  583. ], source: "GRPC")
  584. self.writeBuffer.append(.init(request: part, promise: promise))
  585. }
  586. /// Writes any buffered request parts to the `Channel`.
  587. /// - Parameter channel: The `Channel` to write any buffered request parts to.
  588. private func unbuffer(to channel: Channel) {
  589. // Save any flushing until we're done writing.
  590. var shouldFlush = false
  591. self.logger.debug("unbuffering request parts", metadata: [
  592. "request_parts": "\(self.writeBuffer.count)",
  593. ], source: "GRPC")
  594. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  595. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  596. // may miss more buffered writes.
  597. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  598. // Pull out as many writes as possible.
  599. while let write = self.writeBuffer.popFirst() {
  600. self.logger.debug("unbuffering request part", metadata: [
  601. "request_part": "\(write.request.name)",
  602. ], source: "GRPC")
  603. if !shouldFlush {
  604. shouldFlush = self.shouldFlush(after: write.request)
  605. }
  606. self.write(write.request, to: channel, promise: write.promise, flush: false)
  607. }
  608. // Okay, flush now.
  609. if shouldFlush {
  610. shouldFlush = false
  611. channel.flush()
  612. }
  613. }
  614. if self.writeBuffer.isEmpty {
  615. self.logger.debug("request buffer drained", source: "GRPC")
  616. } else {
  617. self.logger.notice(
  618. "unbuffering aborted",
  619. metadata: ["call_state": self.stateForLogging],
  620. source: "GRPC"
  621. )
  622. }
  623. // We're unbuffered. What now?
  624. self.act(on: self.state.unbuffered())
  625. }
  626. /// Fails any promises that come with buffered writes with `error`.
  627. /// - Parameter error: The `Error` to fail promises with.
  628. private func failBufferedWrites(with error: Error) {
  629. self.logger.debug("failing buffered writes", metadata: [
  630. "call_state": self.stateForLogging,
  631. ], source: "GRPC")
  632. while let write = self.writeBuffer.popFirst() {
  633. write.promise?.fail(error)
  634. }
  635. }
  636. /// Write a request part to the `Channel`.
  637. /// - Parameters:
  638. /// - part: The request part to write.
  639. /// - channel: The `Channel` to write `part` in to.
  640. /// - promise: A promise to complete once the write has been completed.
  641. /// - flush: Whether to flush the `Channel` after writing.
  642. private func write(
  643. _ part: GRPCClientRequestPart<Request>,
  644. to channel: Channel,
  645. promise: EventLoopPromise<Void>?,
  646. flush: Bool
  647. ) {
  648. switch part {
  649. case let .metadata(headers):
  650. let head = self.makeRequestHead(with: headers)
  651. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  652. case let .message(request, metadata):
  653. let message = _MessageContext<Request>(request, compressed: metadata.compress)
  654. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  655. case .end:
  656. channel.write(self.wrapOutboundOut(.end), promise: promise)
  657. }
  658. if flush {
  659. channel.flush()
  660. }
  661. }
  662. /// Forward the response part to the interceptor pipeline.
  663. /// - Parameter part: The response part to forward.
  664. private func forwardToInterceptors(_ part: _GRPCClientResponsePart<Response>) {
  665. switch part {
  666. case let .initialMetadata(metadata):
  667. self._pipeline?.receive(.metadata(metadata))
  668. case let .message(context):
  669. self._pipeline?.receive(.message(context.message))
  670. case let .trailingMetadata(trailers):
  671. // The `Channel` delivers trailers and `GRPCStatus`, we want to emit them together in the
  672. // interceptor pipeline.
  673. self.trailers = trailers
  674. case let .status(status):
  675. let trailers = self.trailers ?? [:]
  676. self.trailers = nil
  677. self._pipeline?.receive(.end(status, trailers))
  678. }
  679. }
  680. /// Forward the error to the interceptor pipeline.
  681. /// - Parameter error: The error to forward.
  682. private func forwardErrorToInterceptors(_ error: Error) {
  683. self._pipeline?.errorCaught(error)
  684. }
  685. }
  686. // MARK: - Helpers
  687. extension ClientTransport {
  688. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  689. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  690. switch part {
  691. case .metadata:
  692. // If we're not streaming requests then we hold off on the flush until we see end.
  693. return self.isStreamingRequests
  694. case let .message(_, metadata):
  695. // Message flushing is determined by caller preference.
  696. return metadata.flush
  697. case .end:
  698. // Always flush at the end of the request stream.
  699. return true
  700. }
  701. }
  702. /// Make a `_GRPCRequestHead` with the provided metadata.
  703. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  704. return _GRPCRequestHead(
  705. method: self.callDetails.options.cacheable ? "GET" : "POST",
  706. scheme: self.callDetails.scheme,
  707. path: self.callDetails.path,
  708. host: self.callDetails.authority,
  709. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  710. customMetadata: metadata,
  711. encoding: self.callDetails.options.messageEncoding
  712. )
  713. }
  714. }
  715. extension GRPCClientRequestPart {
  716. /// The name of the request part, used for logging.
  717. fileprivate var name: String {
  718. switch self {
  719. case .metadata:
  720. return "metadata"
  721. case .message:
  722. return "message"
  723. case .end:
  724. return "end"
  725. }
  726. }
  727. }
  728. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  729. // well as extract a 'GRPCStatus' with code '.unavailable'.
  730. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  731. /// The reason the connection failed.
  732. var reason: Error
  733. init(reason: Error) {
  734. self.reason = reason
  735. }
  736. var description: String {
  737. return String(describing: self.reason)
  738. }
  739. func makeGRPCStatus() -> GRPCStatus {
  740. return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
  741. }
  742. }