2
0

ClientTransport.swift 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `eventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` this transport is running on.
  39. @usableFromInline
  40. internal let eventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: ClientTransportState = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// The request serializer.
  51. private let serializer: AnySerializer<Request>
  52. /// The response deserializer.
  53. private let deserializer: AnyDeserializer<Response>
  54. /// A request part and a promise.
  55. private struct RequestAndPromise {
  56. var request: GRPCClientRequestPart<Request>
  57. var promise: EventLoopPromise<Void>?
  58. }
  59. /// Details about the call.
  60. internal let callDetails: CallDetails
  61. /// A logger.
  62. internal var logger: Logger {
  63. return self.callDetails.options.logger
  64. }
  65. /// Is the call streaming requests?
  66. private var isStreamingRequests: Bool {
  67. switch self.callDetails.type {
  68. case .unary, .serverStreaming:
  69. return false
  70. case .clientStreaming, .bidirectionalStreaming:
  71. return true
  72. }
  73. }
  74. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  75. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  76. // trailers here and only forward them when we receive the status.
  77. private var trailers: HPACKHeaders?
  78. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  79. /// from the `ChannelPipeline` in order to break reference cycles.
  80. @usableFromInline
  81. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  82. /// The 'ChannelHandlerContext'.
  83. private var context: ChannelHandlerContext?
  84. /// Our current state as logging metadata.
  85. private var stateForLogging: Logger.MetadataValue {
  86. if self.state.mayBuffer {
  87. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  88. } else {
  89. return "\(self.state)"
  90. }
  91. }
  92. internal init(
  93. details: CallDetails,
  94. eventLoop: EventLoop,
  95. interceptors: [ClientInterceptor<Request, Response>],
  96. serializer: AnySerializer<Request>,
  97. deserializer: AnyDeserializer<Response>,
  98. errorDelegate: ClientErrorDelegate?,
  99. onError: @escaping (Error) -> Void,
  100. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  101. ) {
  102. self.eventLoop = eventLoop
  103. self.callDetails = details
  104. self.serializer = serializer
  105. self.deserializer = deserializer
  106. self._pipeline = ClientInterceptorPipeline(
  107. eventLoop: eventLoop,
  108. details: details,
  109. interceptors: interceptors,
  110. errorDelegate: errorDelegate,
  111. onError: onError,
  112. onCancel: self.cancelFromPipeline(promise:),
  113. onRequestPart: self.sendFromPipeline(_:promise:),
  114. onResponsePart: onResponsePart
  115. )
  116. }
  117. // MARK: - Call Object API
  118. /// Configure the transport to communicate with the server.
  119. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  120. /// - Important: This *must* to be called from the `eventLoop`.
  121. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  122. self.eventLoop.assertInEventLoop()
  123. if self.state.configureTransport() {
  124. self.configure(using: configurator)
  125. }
  126. }
  127. /// Send a request part – via the interceptor pipeline – to the server.
  128. /// - Parameters:
  129. /// - part: The part to send.
  130. /// - promise: A promise which will be completed when the request part has been handled.
  131. /// - Important: This *must* to be called from the `eventLoop`.
  132. @inlinable
  133. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  134. self.eventLoop.assertInEventLoop()
  135. if let pipeline = self._pipeline {
  136. pipeline.send(part, promise: promise)
  137. } else {
  138. promise?.fail(GRPCError.AlreadyComplete())
  139. }
  140. }
  141. /// Attempt to cancel the RPC notifying any interceptors.
  142. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  143. /// been handled.
  144. internal func cancel(promise: EventLoopPromise<Void>?) {
  145. self.eventLoop.assertInEventLoop()
  146. if let pipeline = self._pipeline {
  147. pipeline.cancel(promise: promise)
  148. } else {
  149. promise?.fail(GRPCError.AlreadyComplete())
  150. }
  151. }
  152. /// A request for the underlying `Channel`.
  153. internal func channel() -> EventLoopFuture<Channel> {
  154. self.eventLoop.assertInEventLoop()
  155. // Do we already have a promise?
  156. if let promise = self.channelPromise {
  157. return promise.futureResult
  158. } else {
  159. // Make and store the promise.
  160. let promise = self.eventLoop.makePromise(of: Channel.self)
  161. self.channelPromise = promise
  162. // Ask the state machine if we can have it.
  163. switch self.state.getChannel() {
  164. case .succeed:
  165. if let channel = self.context?.channel {
  166. promise.succeed(channel)
  167. }
  168. case .fail:
  169. promise.fail(GRPCError.AlreadyComplete())
  170. case .doNothing:
  171. ()
  172. }
  173. return promise.futureResult
  174. }
  175. }
  176. }
  177. // MARK: - Pipeline API
  178. extension ClientTransport {
  179. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  180. /// - Parameters:
  181. /// - part: The request part to send.
  182. /// - promise: A promise which will be completed when the part has been handled.
  183. /// - Important: This *must* to be called from the `eventLoop`.
  184. private func sendFromPipeline(
  185. _ part: GRPCClientRequestPart<Request>,
  186. promise: EventLoopPromise<Void>?
  187. ) {
  188. self.eventLoop.assertInEventLoop()
  189. switch self.state.send() {
  190. case .writeToBuffer:
  191. self.buffer(part, promise: promise)
  192. case .writeToChannel:
  193. self.write(part, promise: promise, flush: self.shouldFlush(after: part))
  194. case .alreadyComplete:
  195. promise?.fail(GRPCError.AlreadyComplete())
  196. }
  197. }
  198. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  199. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  200. /// - Important: This *must* to be called from the `eventLoop`.
  201. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  202. self.eventLoop.assertInEventLoop()
  203. if self.state.cancel() {
  204. let error = GRPCError.RPCCancelledByClient()
  205. self.forwardErrorToInterceptors(error)
  206. self.failBufferedWrites(with: error)
  207. self.context?.channel.close(mode: .all, promise: nil)
  208. self.channelPromise?.fail(error)
  209. promise?.succeed(())
  210. } else {
  211. promise?.fail(GRPCError.AlreadyComplete())
  212. }
  213. }
  214. }
  215. // MARK: - ChannelHandler API
  216. extension ClientTransport: ChannelInboundHandler {
  217. @usableFromInline
  218. typealias InboundIn = _RawGRPCClientResponsePart
  219. @usableFromInline
  220. typealias OutboundOut = _RawGRPCClientRequestPart
  221. @usableFromInline
  222. func handlerAdded(context: ChannelHandlerContext) {
  223. self.context = context
  224. }
  225. @usableFromInline
  226. internal func handlerRemoved(context: ChannelHandlerContext) {
  227. self.eventLoop.assertInEventLoop()
  228. self.context = nil
  229. // Break the reference cycle.
  230. self._pipeline = nil
  231. }
  232. internal func channelError(_ error: Error) {
  233. self.eventLoop.assertInEventLoop()
  234. switch self.state.channelError() {
  235. case .doNothing:
  236. ()
  237. case .propagateError:
  238. self.forwardErrorToInterceptors(error)
  239. self.failBufferedWrites(with: error)
  240. case .propagateErrorAndClose:
  241. self.forwardErrorToInterceptors(error)
  242. self.failBufferedWrites(with: error)
  243. self.context?.close(mode: .all, promise: nil)
  244. }
  245. }
  246. @usableFromInline
  247. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  248. self.channelError(error)
  249. }
  250. @usableFromInline
  251. internal func channelActive(context: ChannelHandlerContext) {
  252. self.eventLoop.assertInEventLoop()
  253. self.logger.debug("activated stream channel", source: "GRPC")
  254. if self.state.channelActive() {
  255. self.unbuffer(to: context.channel)
  256. } else {
  257. context.close(mode: .all, promise: nil)
  258. }
  259. }
  260. @usableFromInline
  261. internal func channelInactive(context: ChannelHandlerContext) {
  262. self.eventLoop.assertInEventLoop()
  263. switch self.state.channelInactive() {
  264. case .doNothing:
  265. ()
  266. case .tearDown:
  267. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  268. self.forwardErrorToInterceptors(status)
  269. self.failBufferedWrites(with: status)
  270. self.channelPromise?.fail(status)
  271. case .failChannelPromise:
  272. self.channelPromise?.fail(GRPCError.AlreadyComplete())
  273. }
  274. }
  275. @usableFromInline
  276. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  277. self.eventLoop.assertInEventLoop()
  278. let part = self.unwrapInboundIn(data)
  279. switch part {
  280. case let .initialMetadata(headers):
  281. if self.state.channelRead(isEnd: false) {
  282. self.forwardToInterceptors(.metadata(headers))
  283. }
  284. case let .message(context):
  285. do {
  286. let message = try self.deserializer.deserialize(byteBuffer: context.message)
  287. if self.state.channelRead(isEnd: false) {
  288. self.forwardToInterceptors(.message(message))
  289. }
  290. } catch {
  291. self.channelError(error)
  292. }
  293. case let .trailingMetadata(trailers):
  294. // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
  295. // in the interceptor pipeline.
  296. self.trailers = trailers
  297. case let .status(status):
  298. if self.state.channelRead(isEnd: true) {
  299. self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
  300. self.trailers = nil
  301. }
  302. }
  303. // (We're the end of the channel. No need to forward anything.)
  304. }
  305. }
  306. // MARK: - State Handling
  307. private enum ClientTransportState {
  308. /// Idle. We're waiting for the RPC to be configured.
  309. ///
  310. /// Valid transitions:
  311. /// - `awaitingTransport` (the transport is being configured)
  312. /// - `closed` (the RPC cancels)
  313. case idle
  314. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  315. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  316. /// transport in this state is an error.
  317. ///
  318. /// Valid transitions:
  319. /// - `activatingTransport` (the channel becomes active)
  320. /// - `closing` (the RPC cancels)
  321. /// - `closed` (the channel fails to become active)
  322. case awaitingTransport
  323. /// The transport is active but we're unbuffering any requests to write on that transport.
  324. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  325. /// is okay.
  326. ///
  327. /// Valid transitions:
  328. /// - `active` (we finish unbuffering)
  329. /// - `closing` (the RPC cancels, the channel encounters an error)
  330. /// - `closed` (the channel becomes inactive)
  331. case activatingTransport
  332. /// Fully active. An RPC is in progress and is communicating over an active transport.
  333. ///
  334. /// Valid transitions:
  335. /// - `closing` (the RPC cancels, the channel encounters an error)
  336. /// - `closed` (the channel becomes inactive)
  337. case active
  338. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  339. /// become inactive yet.
  340. ///
  341. /// Valid transitions:
  342. /// - `closed` (the channel becomes inactive)
  343. case closing
  344. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  345. /// be ignored.
  346. ///
  347. /// Valid transitions:
  348. /// - none: this state is terminal.
  349. case closed
  350. /// Whether writes may be unbuffered in this state.
  351. internal var isUnbuffering: Bool {
  352. switch self {
  353. case .activatingTransport:
  354. return true
  355. case .idle, .awaitingTransport, .active, .closing, .closed:
  356. return false
  357. }
  358. }
  359. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  360. internal var mayBuffer: Bool {
  361. switch self {
  362. case .idle, .activatingTransport, .awaitingTransport:
  363. return true
  364. case .active, .closing, .closed:
  365. return false
  366. }
  367. }
  368. }
  369. extension ClientTransportState {
  370. /// The caller would like to configure the transport. Returns a boolean indicating whether we
  371. /// should configure it or not.
  372. mutating func configureTransport() -> Bool {
  373. switch self {
  374. // We're idle until we configure. Anything else is just a repeat request to configure.
  375. case .idle:
  376. self = .awaitingTransport
  377. return true
  378. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  379. return false
  380. }
  381. }
  382. enum SendAction {
  383. /// Write the request into the buffer.
  384. case writeToBuffer
  385. /// Write the request into the channel.
  386. case writeToChannel
  387. /// The RPC has already completed, fail any promise associated with the write.
  388. case alreadyComplete
  389. }
  390. /// The pipeline would like to send a request part to the transport.
  391. mutating func send() -> SendAction {
  392. switch self {
  393. // We don't have any transport yet, just buffer the part.
  394. case .idle, .awaitingTransport, .activatingTransport:
  395. return .writeToBuffer
  396. // We have a `Channel`, we can pipe the write straight through.
  397. case .active:
  398. return .writeToChannel
  399. // The transport is going or has gone away. Fail the promise.
  400. case .closing, .closed:
  401. return .alreadyComplete
  402. }
  403. }
  404. enum UnbufferedAction {
  405. /// Nothing needs to be done.
  406. case doNothing
  407. /// Succeed the channel promise associated with the transport.
  408. case succeedChannelPromise
  409. }
  410. /// We finished dealing with the buffered writes.
  411. mutating func unbuffered() -> UnbufferedAction {
  412. switch self {
  413. // These can't happen since we only begin unbuffering when we transition to
  414. // '.activatingTransport', which must come after these two states..
  415. case .idle, .awaitingTransport:
  416. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  417. // We dealt with any buffered writes. We can become active now. This is the only way to become
  418. // active.
  419. case .activatingTransport:
  420. self = .active
  421. return .succeedChannelPromise
  422. case .active:
  423. preconditionFailure("Unbuffering completed but the transport is already active")
  424. // Something caused us to close while unbuffering, that's okay, we won't take any further
  425. // action.
  426. case .closing, .closed:
  427. return .doNothing
  428. }
  429. }
  430. /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
  431. /// cancellation can go ahead (and also whether the channel should be torn down).
  432. mutating func cancel() -> Bool {
  433. switch self {
  434. case .idle:
  435. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  436. // we're done, fail any writes, and then deal with the cancellation promise.
  437. self = .closed
  438. return true
  439. case .awaitingTransport:
  440. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  441. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  442. // the `Channel` becoming active (see `channelActive(context:)`).
  443. self = .closing
  444. return true
  445. case .activatingTransport:
  446. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  447. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  448. // any buffered writes and then complete the cancellation promise.
  449. self = .closing
  450. return true
  451. case .active:
  452. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  453. self = .closing
  454. return true
  455. case .closing, .closed:
  456. // We're already closing or closing. The cancellation is too late.
  457. return false
  458. }
  459. }
  460. /// `channelActive` was invoked on the transport by the `Channel`.
  461. mutating func channelActive() -> Bool {
  462. // The channel has become active: what now?
  463. switch self {
  464. case .idle:
  465. preconditionFailure("Can't activate an idle transport")
  466. case .awaitingTransport:
  467. self = .activatingTransport
  468. return true
  469. case .activatingTransport, .active:
  470. preconditionFailure("Invalid state: stream is already active")
  471. case .closing:
  472. // We remain in closing: we only transition to closed on 'channelInactive'.
  473. return false
  474. case .closed:
  475. preconditionFailure("Invalid state: stream is already inactive")
  476. }
  477. }
  478. enum ChannelInactiveAction {
  479. /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
  480. case tearDown
  481. /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
  482. case failChannelPromise
  483. /// Do nothing.
  484. case doNothing
  485. }
  486. /// `channelInactive` was invoked on the transport by the `Channel`.
  487. mutating func channelInactive() -> ChannelInactiveAction {
  488. switch self {
  489. case .idle:
  490. // We can't become inactive before we've requested a `Channel`.
  491. preconditionFailure("Can't deactivate an idle transport")
  492. case .awaitingTransport, .activatingTransport, .active:
  493. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  494. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  495. // synthesize an error status to fail the RPC with.
  496. self = .closed
  497. return .tearDown
  498. case .closing:
  499. // We were already closing, now we're fully closed.
  500. self = .closed
  501. return .failChannelPromise
  502. case .closed:
  503. // We're already closed.
  504. return .doNothing
  505. }
  506. }
  507. /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
  508. /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
  509. mutating func channelRead(isEnd: Bool) -> Bool {
  510. switch self {
  511. case .idle, .awaitingTransport:
  512. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  513. preconditionFailure("Can't receive response part on idle transport")
  514. case .activatingTransport, .active:
  515. // We have an active `Channel`, we can forward the request part but we may need to start
  516. // closing if we see the status, since it indicates the call is terminating.
  517. if isEnd {
  518. self = .closing
  519. }
  520. return true
  521. case .closing, .closed:
  522. // We closed early, ignore any reads.
  523. return false
  524. }
  525. }
  526. enum ChannelErrorAction {
  527. /// Propagate the error to the interceptor pipeline and fail any buffered writes.
  528. case propagateError
  529. /// As above, but close the 'Channel' as well.
  530. case propagateErrorAndClose
  531. /// No action is required.
  532. case doNothing
  533. }
  534. /// We received an error from the `Channel`.
  535. mutating func channelError() -> ChannelErrorAction {
  536. switch self {
  537. case .idle:
  538. // The `Channel` can't error if it doesn't exist.
  539. preconditionFailure("Can't catch error on idle transport")
  540. case .awaitingTransport:
  541. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  542. // buffered writes along the way.
  543. self = .closing
  544. return .propagateError
  545. case .activatingTransport,
  546. .active:
  547. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  548. self = .closing
  549. return .propagateErrorAndClose
  550. case .closing, .closed:
  551. // We're already closing/closed, we can ignore this.
  552. return .doNothing
  553. }
  554. }
  555. enum GetChannelAction {
  556. /// No action is required.
  557. case doNothing
  558. /// Succeed the Channel promise.
  559. case succeed
  560. /// Fail the 'Channel' promise, the RPC is already complete.
  561. case fail
  562. }
  563. /// The caller has asked for the underlying `Channel`.
  564. mutating func getChannel() -> GetChannelAction {
  565. switch self {
  566. case .idle, .awaitingTransport, .activatingTransport:
  567. // Do nothing, we'll complete the promise when we become active or closed.
  568. return .doNothing
  569. case .active:
  570. // We're already active, so there was no promise to succeed when we made this transition. We
  571. // can complete it now.
  572. return .succeed
  573. case .closing:
  574. // We'll complete the promise when we transition to closed.
  575. return .doNothing
  576. case .closed:
  577. // We're already closed; there was no promise to fail when we made this transition. We can go
  578. // ahead and fail it now though.
  579. return .fail
  580. }
  581. }
  582. }
  583. // MARK: - State Actions
  584. extension ClientTransport {
  585. /// Configures this transport with the `configurator`.
  586. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  587. configurator(self).whenFailure { error in
  588. if error is GRPCStatus || error is GRPCStatusTransformable {
  589. self.channelError(error)
  590. } else {
  591. // Fallback to something which will mark the RPC as 'unavailable'.
  592. self.channelError(ConnectionFailure(reason: error))
  593. }
  594. }
  595. }
  596. /// Append a request part to the write buffer.
  597. /// - Parameters:
  598. /// - part: The request part to buffer.
  599. /// - promise: A promise to complete when the request part has been sent.
  600. private func buffer(
  601. _ part: GRPCClientRequestPart<Request>,
  602. promise: EventLoopPromise<Void>?
  603. ) {
  604. self.logger.debug("buffering request part", metadata: [
  605. "request_part": "\(part.name)",
  606. "call_state": self.stateForLogging,
  607. ], source: "GRPC")
  608. self.writeBuffer.append(.init(request: part, promise: promise))
  609. }
  610. /// Writes any buffered request parts to the `Channel`.
  611. /// - Parameter channel: The `Channel` to write any buffered request parts to.
  612. private func unbuffer(to channel: Channel) {
  613. // Save any flushing until we're done writing.
  614. var shouldFlush = false
  615. self.logger.debug("unbuffering request parts", metadata: [
  616. "request_parts": "\(self.writeBuffer.count)",
  617. ], source: "GRPC")
  618. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  619. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  620. // may miss more buffered writes.
  621. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  622. // Pull out as many writes as possible.
  623. while let write = self.writeBuffer.popFirst() {
  624. self.logger.debug("unbuffering request part", metadata: [
  625. "request_part": "\(write.request.name)",
  626. ], source: "GRPC")
  627. if !shouldFlush {
  628. shouldFlush = self.shouldFlush(after: write.request)
  629. }
  630. self.write(write.request, promise: write.promise, flush: false)
  631. }
  632. // Okay, flush now.
  633. if shouldFlush {
  634. shouldFlush = false
  635. channel.flush()
  636. }
  637. }
  638. if self.writeBuffer.isEmpty {
  639. self.logger.debug("request buffer drained", source: "GRPC")
  640. } else {
  641. self.logger.notice(
  642. "unbuffering aborted",
  643. metadata: ["call_state": self.stateForLogging],
  644. source: "GRPC"
  645. )
  646. }
  647. // We're unbuffered. What now?
  648. switch self.state.unbuffered() {
  649. case .doNothing:
  650. ()
  651. case .succeedChannelPromise:
  652. self.channelPromise?.succeed(channel)
  653. }
  654. }
  655. /// Fails any promises that come with buffered writes with `error`.
  656. /// - Parameter error: The `Error` to fail promises with.
  657. private func failBufferedWrites(with error: Error) {
  658. self.logger.debug("failing buffered writes", metadata: [
  659. "call_state": self.stateForLogging,
  660. ], source: "GRPC")
  661. while let write = self.writeBuffer.popFirst() {
  662. write.promise?.fail(error)
  663. }
  664. }
  665. /// Write a request part to the `Channel`.
  666. /// - Parameters:
  667. /// - part: The request part to write.
  668. /// - channel: The `Channel` to write `part` in to.
  669. /// - promise: A promise to complete once the write has been completed.
  670. /// - flush: Whether to flush the `Channel` after writing.
  671. private func write(
  672. _ part: GRPCClientRequestPart<Request>,
  673. promise: EventLoopPromise<Void>?,
  674. flush: Bool
  675. ) {
  676. guard let context = self.context else {
  677. promise?.fail(GRPCError.AlreadyComplete())
  678. return
  679. }
  680. switch part {
  681. case let .metadata(headers):
  682. let head = self.makeRequestHead(with: headers)
  683. context.channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  684. case let .message(request, metadata):
  685. do {
  686. let bytes = try self.serializer.serialize(request, allocator: context.channel.allocator)
  687. let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
  688. context.channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  689. } catch {
  690. self.channelError(error)
  691. }
  692. case .end:
  693. context.channel.write(self.wrapOutboundOut(.end), promise: promise)
  694. }
  695. if flush {
  696. context.channel.flush()
  697. }
  698. }
  699. /// Forward the response part to the interceptor pipeline.
  700. /// - Parameter part: The response part to forward.
  701. private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
  702. self._pipeline?.receive(part)
  703. }
  704. /// Forward the error to the interceptor pipeline.
  705. /// - Parameter error: The error to forward.
  706. private func forwardErrorToInterceptors(_ error: Error) {
  707. self._pipeline?.errorCaught(error)
  708. }
  709. }
  710. // MARK: - Helpers
  711. extension ClientTransport {
  712. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  713. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  714. switch part {
  715. case .metadata:
  716. // If we're not streaming requests then we hold off on the flush until we see end.
  717. return self.isStreamingRequests
  718. case let .message(_, metadata):
  719. // Message flushing is determined by caller preference.
  720. return metadata.flush
  721. case .end:
  722. // Always flush at the end of the request stream.
  723. return true
  724. }
  725. }
  726. /// Make a `_GRPCRequestHead` with the provided metadata.
  727. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  728. return _GRPCRequestHead(
  729. method: self.callDetails.options.cacheable ? "GET" : "POST",
  730. scheme: self.callDetails.scheme,
  731. path: self.callDetails.path,
  732. host: self.callDetails.authority,
  733. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  734. customMetadata: metadata,
  735. encoding: self.callDetails.options.messageEncoding
  736. )
  737. }
  738. }
  739. extension GRPCClientRequestPart {
  740. /// The name of the request part, used for logging.
  741. fileprivate var name: String {
  742. switch self {
  743. case .metadata:
  744. return "metadata"
  745. case .message:
  746. return "message"
  747. case .end:
  748. return "end"
  749. }
  750. }
  751. }
  752. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  753. // well as extract a 'GRPCStatus' with code '.unavailable'.
  754. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  755. /// The reason the connection failed.
  756. var reason: Error
  757. init(reason: Error) {
  758. self.reason = reason
  759. }
  760. var description: String {
  761. return String(describing: self.reason)
  762. }
  763. func makeGRPCStatus() -> GRPCStatus {
  764. return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
  765. }
  766. }