ClientTransport.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `eventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` this transport is running on.
  39. @usableFromInline
  40. internal let eventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: State = .idle
  43. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  44. // have 3 parts.
  45. /// A buffer to store request parts and promises in before the channel has become active.
  46. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  47. /// A request part and a promise.
  48. private struct RequestAndPromise {
  49. var request: ClientRequestPart<Request>
  50. var promise: EventLoopPromise<Void>?
  51. }
  52. /// Details about the call.
  53. internal let callDetails: CallDetails
  54. /// A logger.
  55. internal var logger: Logger {
  56. return self.callDetails.options.logger
  57. }
  58. /// Is the call streaming requests?
  59. private var isStreamingRequests: Bool {
  60. switch self.callDetails.type {
  61. case .unary, .serverStreaming:
  62. return false
  63. case .clientStreaming, .bidirectionalStreaming:
  64. return true
  65. }
  66. }
  67. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  68. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  69. // trailers here and only forward them when we receive the status.
  70. private var trailers: HPACKHeaders?
  71. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  72. /// from the `ChannelPipeline` in order to break reference cycles.
  73. @usableFromInline
  74. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  75. /// Our current state as logging metadata.
  76. private var stateForLogging: Logger.MetadataValue {
  77. if self.state.mayBuffer {
  78. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  79. } else {
  80. return "\(self.state)"
  81. }
  82. }
  83. internal init(
  84. details: CallDetails,
  85. eventLoop: EventLoop,
  86. interceptors: [ClientInterceptor<Request, Response>],
  87. errorDelegate: ClientErrorDelegate?,
  88. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  89. ) {
  90. self.eventLoop = eventLoop
  91. self.callDetails = details
  92. self._pipeline = ClientInterceptorPipeline(
  93. eventLoop: eventLoop,
  94. details: details,
  95. interceptors: interceptors,
  96. errorDelegate: errorDelegate,
  97. onCancel: self.cancelFromPipeline(promise:),
  98. onRequestPart: self.sendFromPipeline(_:promise:),
  99. onResponsePart: onResponsePart
  100. )
  101. }
  102. // MARK: - Call Object API
  103. /// Configure the transport to communicate with the server.
  104. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  105. /// - Important: This *must* to be called from the `eventLoop`.
  106. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  107. self.eventLoop.assertInEventLoop()
  108. self.act(on: self.state.configureTransport(with: configurator))
  109. }
  110. /// Send a request part – via the interceptor pipeline – to the server.
  111. /// - Parameters:
  112. /// - part: The part to send.
  113. /// - promise: A promise which will be completed when the request part has been handled.
  114. /// - Important: This *must* to be called from the `eventLoop`.
  115. @inlinable
  116. internal func send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  117. self.eventLoop.assertInEventLoop()
  118. if let pipeline = self._pipeline {
  119. pipeline.write(part, promise: promise)
  120. } else {
  121. promise?.fail(GRPCError.AlreadyComplete())
  122. }
  123. }
  124. /// Attempt to cancel the RPC notifying any interceptors.
  125. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  126. /// been handled.
  127. internal func cancel(promise: EventLoopPromise<Void>?) {
  128. self.eventLoop.assertInEventLoop()
  129. if let pipeline = self._pipeline {
  130. pipeline.cancel(promise: promise)
  131. } else {
  132. promise?.fail(GRPCError.AlreadyComplete())
  133. }
  134. }
  135. }
  136. // MARK: - Pipeline API
  137. extension ClientTransport {
  138. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  139. /// - Parameters:
  140. /// - part: The request part to send.
  141. /// - promise: A promise which will be completed when the part has been handled.
  142. /// - Important: This *must* to be called from the `eventLoop`.
  143. private func sendFromPipeline(
  144. _ part: ClientRequestPart<Request>,
  145. promise: EventLoopPromise<Void>?
  146. ) {
  147. self.eventLoop.assertInEventLoop()
  148. self.act(on: self.state.send(part, promise: promise))
  149. }
  150. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  151. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  152. /// - Important: This *must* to be called from the `eventLoop`.
  153. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  154. self.eventLoop.assertInEventLoop()
  155. self.act(on: self.state.cancel(promise: promise))
  156. }
  157. }
  158. // MARK: - ChannelHandler API
  159. extension ClientTransport: ChannelInboundHandler {
  160. @usableFromInline
  161. typealias InboundIn = _GRPCClientResponsePart<Response>
  162. @usableFromInline
  163. typealias OutboundOut = _GRPCClientRequestPart<Request>
  164. @usableFromInline
  165. internal func handlerRemoved(context: ChannelHandlerContext) {
  166. self.eventLoop.assertInEventLoop()
  167. // Break the reference cycle.
  168. self._pipeline = nil
  169. }
  170. internal func channelError(_ error: Error) {
  171. self.eventLoop.assertInEventLoop()
  172. self.act(on: self.state.channelError(error))
  173. }
  174. @usableFromInline
  175. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  176. self.channelError(error)
  177. }
  178. @usableFromInline
  179. internal func channelActive(context: ChannelHandlerContext) {
  180. self.eventLoop.assertInEventLoop()
  181. self.logger.debug("activated stream channel", source: "GRPC")
  182. self.act(on: self.state.channelActive(context: context))
  183. }
  184. @usableFromInline
  185. internal func channelInactive(context: ChannelHandlerContext) {
  186. self.eventLoop.assertInEventLoop()
  187. self.act(on: self.state.channelInactive(context: context))
  188. }
  189. @usableFromInline
  190. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  191. self.eventLoop.assertInEventLoop()
  192. let part = self.unwrapInboundIn(data)
  193. self.act(on: self.state.channelRead(part))
  194. // (We're the end of the channel. No need to forward anything.)
  195. }
  196. }
  197. // MARK: - State Handling
  198. extension ClientTransport {
  199. fileprivate enum State {
  200. /// Idle. We're waiting for the RPC to be configured.
  201. ///
  202. /// Valid transitions:
  203. /// - `awaitingTransport` (the transport is being configured)
  204. /// - `closed` (the RPC cancels)
  205. case idle
  206. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  207. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  208. /// transport in this state is an error.
  209. ///
  210. /// Valid transitions:
  211. /// - `activatingTransport` (the channel becomes active)
  212. /// - `closing` (the RPC cancels)
  213. /// - `closed` (the channel fails to become active)
  214. case awaitingTransport
  215. /// The transport is active but we're unbuffering any requests to write on that transport.
  216. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  217. /// is okay.
  218. ///
  219. /// Valid transitions:
  220. /// - `active` (we finish unbuffering)
  221. /// - `closing` (the RPC cancels, the channel encounters an error)
  222. /// - `closed` (the channel becomes inactive)
  223. case activatingTransport(Channel)
  224. /// Fully active. An RPC is in progress and is communicating over an active transport.
  225. ///
  226. /// Valid transitions:
  227. /// - `closing` (the RPC cancels, the channel encounters an error)
  228. /// - `closed` (the channel becomes inactive)
  229. case active(Channel)
  230. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  231. /// become inactive yet.
  232. ///
  233. /// Valid transitions:
  234. /// - `closed` (the channel becomes inactive)
  235. case closing
  236. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  237. /// be ignored.
  238. ///
  239. /// Valid transitions:
  240. /// - none: this state is terminal.
  241. case closed
  242. /// Whether writes may be unbuffered in this state.
  243. internal var isUnbuffering: Bool {
  244. switch self {
  245. case .activatingTransport:
  246. return true
  247. case .idle, .awaitingTransport, .active, .closing, .closed:
  248. return false
  249. }
  250. }
  251. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  252. internal var mayBuffer: Bool {
  253. switch self {
  254. case .idle, .activatingTransport, .awaitingTransport:
  255. return true
  256. case .active, .closing, .closed:
  257. return false
  258. }
  259. }
  260. }
  261. }
  262. extension ClientTransport.State {
  263. /// Actions which should be performed as a result telling the state machine something changed.
  264. fileprivate enum Action {
  265. /// Do nothing.
  266. case none
  267. /// Configure a `Channel` with the configurator.
  268. case configure(with: (ChannelHandler) -> EventLoopFuture<Void>)
  269. /// Append the request part and promise to the write buffer.
  270. case buffer(ClientRequestPart<Request>, EventLoopPromise<Void>?)
  271. /// Write - and flush if necessary – any request parts in the buffer to the `Channel`.
  272. case unbufferToChannel(Channel)
  273. /// Fail any buffered writes with the error.
  274. case failBufferedWrites(with: Error)
  275. /// Write the given operation to the channel.
  276. case writeToChannel(Channel, ClientRequestPart<Request>, EventLoopPromise<Void>?)
  277. /// Write the response part to the RPC.
  278. case forwardToInterceptors(_GRPCClientResponsePart<Response>)
  279. /// Fail the RPC with the given error. This includes failing any outstanding writes.
  280. case forwardErrorToInterceptors(Error)
  281. /// Close the given channel.
  282. case close(Channel)
  283. /// Fail the given promise with the error provided.
  284. case completePromise(EventLoopPromise<Void>?, with: Result<Void, Error>)
  285. /// Perform multiple actions.
  286. indirect case multiple([Action])
  287. }
  288. }
  289. extension ClientTransport.State {
  290. /// The caller would like to configure the transport.
  291. mutating func configureTransport(
  292. with configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>
  293. ) -> Action {
  294. switch self {
  295. // We're idle until we configure. Anything else is just a repeat request to configure.
  296. case .idle:
  297. self = .awaitingTransport
  298. return .configure(with: configurator)
  299. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  300. return .none
  301. }
  302. }
  303. /// The pipeline would like to send a request part to the transport.
  304. mutating func send(
  305. _ part: ClientRequestPart<Request>,
  306. promise: EventLoopPromise<Void>?
  307. ) -> Action {
  308. switch self {
  309. // We don't have any transport yet, just buffer the part.
  310. case .idle, .awaitingTransport, .activatingTransport:
  311. return .buffer(part, promise)
  312. // We have a `Channel`, we can pipe the write straight through.
  313. case let .active(channel):
  314. return .writeToChannel(channel, part, promise)
  315. // The transport is going or has gone away. Fail the promise.
  316. case .closing, .closed:
  317. return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
  318. }
  319. }
  320. /// We finished dealing with the buffered writes.
  321. mutating func unbuffered() -> Action {
  322. switch self {
  323. // These can't happen since we only begin unbuffering when we transition to
  324. // '.activatingTransport', which must come after these two states..
  325. case .idle, .awaitingTransport:
  326. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  327. // We dealt with any buffered writes. We can become active now. This is the only way to become
  328. // active.
  329. case let .activatingTransport(channel):
  330. self = .active(channel)
  331. return .none
  332. case .active:
  333. preconditionFailure("Unbuffering completed but the transport is already active")
  334. // Something caused us to close while unbuffering, that's okay, we won't take any further
  335. // action.
  336. case .closing, .closed:
  337. return .none
  338. }
  339. }
  340. /// Cancel the RPC and associated `Channel`, if possible.
  341. mutating func cancel(promise: EventLoopPromise<Void>?) -> Action {
  342. switch self {
  343. case .idle:
  344. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  345. // we're done, fail any writes, and then deal with the cancellation promise.
  346. self = .closed
  347. let error = GRPCError.RPCCancelledByClient().captureContext()
  348. return .multiple([
  349. .forwardErrorToInterceptors(error),
  350. .failBufferedWrites(with: error.error),
  351. .completePromise(promise, with: .success(())),
  352. ])
  353. case .awaitingTransport:
  354. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  355. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  356. // the `Channel` becoming active (see `channelActive(context:)`).
  357. self = .closing
  358. let error = GRPCError.RPCCancelledByClient().captureContext()
  359. return .multiple([
  360. .forwardErrorToInterceptors(error),
  361. .failBufferedWrites(with: error.error),
  362. .completePromise(promise, with: .success(())),
  363. ])
  364. case let .activatingTransport(channel):
  365. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  366. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  367. // any buffered writes and then complete the cancellatiion promise.
  368. self = .closing
  369. let error = GRPCError.RPCCancelledByClient().captureContext()
  370. return .multiple([
  371. .forwardErrorToInterceptors(error),
  372. .close(channel),
  373. .failBufferedWrites(with: error.error),
  374. .completePromise(promise, with: .success(())),
  375. ])
  376. case let .active(channel):
  377. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  378. self = .closing
  379. let error = GRPCError.RPCCancelledByClient().captureContext()
  380. return .multiple([
  381. .forwardErrorToInterceptors(error),
  382. .close(channel),
  383. .completePromise(promise, with: .success(())),
  384. ])
  385. case .closing, .closed:
  386. // We're already closing or closing. The cancellation is too late.
  387. return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
  388. }
  389. }
  390. /// `channelActive` was invoked on the transport by the `Channel`.
  391. mutating func channelActive(context: ChannelHandlerContext) -> Action {
  392. // The channel has become active: what now?
  393. switch self {
  394. case .idle:
  395. preconditionFailure("Can't activate an idle transport")
  396. case .awaitingTransport:
  397. self = .activatingTransport(context.channel)
  398. return .unbufferToChannel(context.channel)
  399. case .activatingTransport, .active:
  400. preconditionFailure("Invalid state: stream is already active")
  401. case .closing:
  402. // We remain in closing: we only transition to closed on 'channelInactive'.
  403. return .close(context.channel)
  404. case .closed:
  405. preconditionFailure("Invalid state: stream is already inactive")
  406. }
  407. }
  408. /// `channelInactive` was invoked on the transport by the `Channel`.
  409. mutating func channelInactive(context: ChannelHandlerContext) -> Action {
  410. switch self {
  411. case .idle:
  412. // We can't become inactive before we've requested a `Channel`.
  413. preconditionFailure("Can't deactivate an idle transport")
  414. case .awaitingTransport, .activatingTransport, .active:
  415. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  416. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  417. // synthesize an error status to fail the RPC with.
  418. self = .closed
  419. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  420. return .multiple([
  421. .forwardErrorToInterceptors(status),
  422. .failBufferedWrites(with: status),
  423. ])
  424. case .closing:
  425. // We were already closing, now we're fully closed.
  426. self = .closed
  427. return .none
  428. case .closed:
  429. // We're already closed.
  430. return .none
  431. }
  432. }
  433. /// `channelRead` was invoked on the transport by the `Channel`.
  434. mutating func channelRead(_ part: _GRPCClientResponsePart<Response>) -> Action {
  435. switch self {
  436. case .idle, .awaitingTransport:
  437. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  438. preconditionFailure("Can't receive response part on idle transport")
  439. case .activatingTransport, .active:
  440. // We have an active `Channel`, we can forward the request part but we may need to start
  441. // closing if we see the status, since it indicates the call is terminating.
  442. switch part {
  443. case .initialMetadata, .message, .trailingMetadata:
  444. ()
  445. case .status:
  446. // The status is the final part of the RPC. We will become inactive soon.
  447. self = .closing
  448. }
  449. return .forwardToInterceptors(part)
  450. case .closing, .closed:
  451. // We closed early, ignore are reads.
  452. return .none
  453. }
  454. }
  455. /// We received an error from the `Channel`.
  456. mutating func channelError(_ error: Error) -> Action {
  457. switch self {
  458. case .idle:
  459. // The `Channel` can't error if it doesn't exist.
  460. preconditionFailure("Can't catch error on idle transport")
  461. case .awaitingTransport:
  462. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  463. // buffered writes along the way.
  464. self = .closing
  465. return .multiple([
  466. .forwardErrorToInterceptors(error),
  467. .failBufferedWrites(with: error),
  468. ])
  469. case let .activatingTransport(channel),
  470. let .active(channel):
  471. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  472. self = .closing
  473. return .multiple([
  474. .forwardErrorToInterceptors(error),
  475. .failBufferedWrites(with: error),
  476. .close(channel),
  477. ])
  478. case .closing, .closed:
  479. // We're already closing/closed, we can ignore this.
  480. return .none
  481. }
  482. }
  483. }
  484. // MARK: - State Actions
  485. extension ClientTransport {
  486. /// Act on the action which resulted from prodding the state machine.
  487. /// - Parameter action: The action to act on.
  488. private func act(on action: State.Action) {
  489. switch action {
  490. case .none:
  491. ()
  492. case let .configure(configurator):
  493. self.configure(using: configurator)
  494. case let .buffer(part, promise):
  495. self.buffer(part, promise: promise)
  496. case let .unbufferToChannel(channel):
  497. self.unbuffer(to: channel)
  498. case let .failBufferedWrites(with: error):
  499. self.failBufferedWrites(with: error)
  500. case let .writeToChannel(channel, part, promise):
  501. self.write(part, to: channel, promise: promise, flush: self.shouldFlush(after: part))
  502. case let .forwardToInterceptors(response: part):
  503. self.forwardToInterceptors(part)
  504. case let .forwardErrorToInterceptors(error: error):
  505. self.forwardErrorToInterceptors(error)
  506. case let .completePromise(promise, result):
  507. promise?.completeWith(result)
  508. case let .close(channel):
  509. channel.close(mode: .all, promise: nil)
  510. case let .multiple(actions):
  511. for action in actions {
  512. self.act(on: action)
  513. }
  514. }
  515. }
  516. /// Configures this transport with the `configurator`.
  517. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  518. configurator(self).whenFailure { error in
  519. // We failed to configure; we need to handle this.
  520. self.channelError(error)
  521. }
  522. }
  523. /// Append a request part to the write buffer.
  524. /// - Parameters:
  525. /// - part: The request part to buffer.
  526. /// - promise: A promise to complete when the request part has been sent.
  527. private func buffer(
  528. _ part: ClientRequestPart<Request>,
  529. promise: EventLoopPromise<Void>?
  530. ) {
  531. self.logger.debug("buffering request part", metadata: [
  532. "request_part": "\(part.name)",
  533. "call_state": self.stateForLogging,
  534. ], source: "GRPC")
  535. self.writeBuffer.append(.init(request: part, promise: promise))
  536. }
  537. /// Writes any buffered request parts to the `Channel`.
  538. /// - Parameter channel: The `Channel` to write any buffered request parts to.
  539. private func unbuffer(to channel: Channel) {
  540. // Save any flushing until we're done writing.
  541. var shouldFlush = false
  542. self.logger.debug("unbuffering request parts", metadata: [
  543. "request_parts": "\(self.writeBuffer.count)",
  544. ], source: "GRPC")
  545. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  546. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  547. // may miss more buffered writes.
  548. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  549. // Pull out as many writes as possible.
  550. while let write = self.writeBuffer.popFirst() {
  551. self.logger.debug("unbuffering request part", metadata: [
  552. "request_part": "\(write.request.name)",
  553. ], source: "GRPC")
  554. if !shouldFlush {
  555. shouldFlush = self.shouldFlush(after: write.request)
  556. }
  557. self.write(write.request, to: channel, promise: write.promise, flush: false)
  558. }
  559. // Okay, flush now.
  560. if shouldFlush {
  561. shouldFlush = false
  562. channel.flush()
  563. }
  564. }
  565. if self.writeBuffer.isEmpty {
  566. self.logger.debug("request buffer drained", source: "GRPC")
  567. } else {
  568. self.logger.notice(
  569. "unbuffering aborted",
  570. metadata: ["call_state": self.stateForLogging],
  571. source: "GRPC"
  572. )
  573. }
  574. // We're unbuffered. What now?
  575. self.act(on: self.state.unbuffered())
  576. }
  577. /// Fails any promises that come with buffered writes with `error`.
  578. /// - Parameter error: The `Error` to fail promises with.
  579. private func failBufferedWrites(with error: Error) {
  580. self.logger.debug("failing buffered writes", metadata: [
  581. "call_state": self.stateForLogging,
  582. ], source: "GRPC")
  583. while let write = self.writeBuffer.popFirst() {
  584. write.promise?.fail(error)
  585. }
  586. }
  587. /// Write a request part to the `Channel`.
  588. /// - Parameters:
  589. /// - part: The request part to write.
  590. /// - channel: The `Channel` to write `part` in to.
  591. /// - promise: A promise to complete once the write has been completed.
  592. /// - flush: Whether to flush the `Channel` after writing.
  593. private func write(
  594. _ part: ClientRequestPart<Request>,
  595. to channel: Channel,
  596. promise: EventLoopPromise<Void>?,
  597. flush: Bool
  598. ) {
  599. switch part {
  600. case let .metadata(headers):
  601. let head = self.makeRequestHead(with: headers)
  602. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  603. case let .message(request, metadata):
  604. let message = _MessageContext<Request>(request, compressed: metadata.compress)
  605. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  606. case .end:
  607. channel.write(self.wrapOutboundOut(.end), promise: promise)
  608. }
  609. if flush {
  610. channel.flush()
  611. }
  612. }
  613. /// Forward the response part to the interceptor pipeline.
  614. /// - Parameter part: The response part to forward.
  615. private func forwardToInterceptors(_ part: _GRPCClientResponsePart<Response>) {
  616. switch part {
  617. case let .initialMetadata(metadata):
  618. self._pipeline?.read(.metadata(metadata))
  619. case let .message(context):
  620. self._pipeline?.read(.message(context.message))
  621. case let .trailingMetadata(trailers):
  622. // The `Channel` delivers trailers and `GRPCStatus`, we want to emit them together in the
  623. // interceptor pipeline.
  624. self.trailers = trailers
  625. case let .status(status):
  626. let trailers = self.trailers ?? [:]
  627. self.trailers = nil
  628. self._pipeline?.read(.end(status, trailers))
  629. }
  630. }
  631. /// Forward the error to the interceptor pipeline.
  632. /// - Parameter error: The error to forward.
  633. private func forwardErrorToInterceptors(_ error: Error) {
  634. self._pipeline?.read(.error(error))
  635. }
  636. }
  637. // MARK: - Helpers
  638. extension ClientTransport {
  639. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  640. private func shouldFlush(after part: ClientRequestPart<Request>) -> Bool {
  641. switch part {
  642. case .metadata:
  643. // If we're not streaming requests then we hold off on the flush until we see end.
  644. return self.isStreamingRequests
  645. case let .message(_, metadata):
  646. // Message flushing is determined by caller preference.
  647. return metadata.flush
  648. case .end:
  649. // Always flush at the end of the request stream.
  650. return true
  651. }
  652. }
  653. /// Make a `_GRPCRequestHead` with the provided metadata.
  654. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  655. return _GRPCRequestHead(
  656. method: self.callDetails.options.cacheable ? "GET" : "POST",
  657. scheme: self.callDetails.scheme,
  658. path: self.callDetails.path,
  659. host: self.callDetails.authority,
  660. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  661. customMetadata: metadata,
  662. encoding: self.callDetails.options.messageEncoding
  663. )
  664. }
  665. }
  666. extension ClientRequestPart {
  667. /// The name of the request part, used for logging.
  668. fileprivate var name: String {
  669. switch self {
  670. case .metadata:
  671. return "metadata"
  672. case .message:
  673. return "message"
  674. case .end:
  675. return "end"
  676. }
  677. }
  678. }