ClientTransport.swift 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` the call is running on. State must be accessed from this event loop.
  39. @usableFromInline
  40. internal let callEventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: ClientTransportState = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// The request serializer.
  51. private let serializer: AnySerializer<Request>
  52. /// The response deserializer.
  53. private let deserializer: AnyDeserializer<Response>
  54. /// A request part and a promise.
  55. private struct RequestAndPromise {
  56. var request: GRPCClientRequestPart<Request>
  57. var promise: EventLoopPromise<Void>?
  58. }
  59. /// Details about the call.
  60. internal let callDetails: CallDetails
  61. /// A logger.
  62. internal var logger: Logger {
  63. return self.callDetails.options.logger
  64. }
  65. /// Is the call streaming requests?
  66. private var isStreamingRequests: Bool {
  67. switch self.callDetails.type {
  68. case .unary, .serverStreaming:
  69. return false
  70. case .clientStreaming, .bidirectionalStreaming:
  71. return true
  72. }
  73. }
  74. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  75. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  76. // trailers here and only forward them when we receive the status.
  77. private var trailers: HPACKHeaders?
  78. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  79. /// from the `ChannelPipeline` in order to break reference cycles.
  80. @usableFromInline
  81. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  82. /// The `NIO.Channel` used by the transport, if it is available.
  83. private var channel: Channel?
  84. /// Our current state as logging metadata.
  85. private var stateForLogging: Logger.MetadataValue {
  86. if self.state.mayBuffer {
  87. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  88. } else {
  89. return "\(self.state)"
  90. }
  91. }
  92. internal init(
  93. details: CallDetails,
  94. eventLoop: EventLoop,
  95. interceptors: [ClientInterceptor<Request, Response>],
  96. serializer: AnySerializer<Request>,
  97. deserializer: AnyDeserializer<Response>,
  98. errorDelegate: ClientErrorDelegate?,
  99. onError: @escaping (Error) -> Void,
  100. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  101. ) {
  102. self.callEventLoop = eventLoop
  103. self.callDetails = details
  104. self.serializer = serializer
  105. self.deserializer = deserializer
  106. self._pipeline = ClientInterceptorPipeline(
  107. eventLoop: eventLoop,
  108. details: details,
  109. interceptors: interceptors,
  110. errorDelegate: errorDelegate,
  111. onError: onError,
  112. onCancel: self.cancelFromPipeline(promise:),
  113. onRequestPart: self.sendFromPipeline(_:promise:),
  114. onResponsePart: onResponsePart
  115. )
  116. }
  117. // MARK: - Call Object API
  118. /// Configure the transport to communicate with the server.
  119. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  120. /// - Important: This *must* to be called from the `callEventLoop`.
  121. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  122. self.callEventLoop.assertInEventLoop()
  123. if self.state.configureTransport() {
  124. self.configure(using: configurator)
  125. }
  126. }
  127. /// Send a request part – via the interceptor pipeline – to the server.
  128. /// - Parameters:
  129. /// - part: The part to send.
  130. /// - promise: A promise which will be completed when the request part has been handled.
  131. /// - Important: This *must* to be called from the `callEventLoop`.
  132. @inlinable
  133. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  134. self.callEventLoop.assertInEventLoop()
  135. if let pipeline = self._pipeline {
  136. pipeline.send(part, promise: promise)
  137. } else {
  138. promise?.fail(GRPCError.AlreadyComplete())
  139. }
  140. }
  141. /// Attempt to cancel the RPC notifying any interceptors.
  142. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  143. /// been handled.
  144. internal func cancel(promise: EventLoopPromise<Void>?) {
  145. self.callEventLoop.assertInEventLoop()
  146. if let pipeline = self._pipeline {
  147. pipeline.cancel(promise: promise)
  148. } else {
  149. promise?.fail(GRPCError.AlreadyComplete())
  150. }
  151. }
  152. /// A request for the underlying `Channel`.
  153. internal func getChannel() -> EventLoopFuture<Channel> {
  154. self.callEventLoop.assertInEventLoop()
  155. // Do we already have a promise?
  156. if let promise = self.channelPromise {
  157. return promise.futureResult
  158. } else {
  159. // Make and store the promise.
  160. let promise = self.callEventLoop.makePromise(of: Channel.self)
  161. self.channelPromise = promise
  162. // Ask the state machine if we can have it.
  163. switch self.state.getChannel() {
  164. case .succeed:
  165. if let channel = self.channel {
  166. promise.succeed(channel)
  167. }
  168. case .fail:
  169. promise.fail(GRPCError.AlreadyComplete())
  170. case .doNothing:
  171. ()
  172. }
  173. return promise.futureResult
  174. }
  175. }
  176. }
  177. // MARK: - Pipeline API
  178. extension ClientTransport {
  179. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  180. /// - Parameters:
  181. /// - part: The request part to send.
  182. /// - promise: A promise which will be completed when the part has been handled.
  183. /// - Important: This *must* to be called from the `callEventLoop`.
  184. private func sendFromPipeline(
  185. _ part: GRPCClientRequestPart<Request>,
  186. promise: EventLoopPromise<Void>?
  187. ) {
  188. self.callEventLoop.assertInEventLoop()
  189. switch self.state.send() {
  190. case .writeToBuffer:
  191. self.buffer(part, promise: promise)
  192. case .writeToChannel:
  193. // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the
  194. // correct state, the requirements of that state are having an active `Channel`.
  195. self.writeToChannel(
  196. self.channel!,
  197. part: part,
  198. promise: promise,
  199. flush: self.shouldFlush(after: part)
  200. )
  201. case .alreadyComplete:
  202. promise?.fail(GRPCError.AlreadyComplete())
  203. }
  204. }
  205. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  206. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  207. /// - Important: This *must* to be called from the `callEventLoop`.
  208. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  209. self.callEventLoop.assertInEventLoop()
  210. if self.state.cancel() {
  211. let error = GRPCError.RPCCancelledByClient()
  212. self.forwardErrorToInterceptors(error)
  213. self.failBufferedWrites(with: error)
  214. self.channel?.close(mode: .all, promise: nil)
  215. self.channelPromise?.fail(error)
  216. promise?.succeed(())
  217. } else {
  218. promise?.fail(GRPCError.AlreadyComplete())
  219. }
  220. }
  221. }
  222. // MARK: - ChannelHandler API
  223. extension ClientTransport: ChannelInboundHandler {
  224. @usableFromInline
  225. typealias InboundIn = _RawGRPCClientResponsePart
  226. @usableFromInline
  227. typealias OutboundOut = _RawGRPCClientRequestPart
  228. @usableFromInline
  229. internal func handlerRemoved(context: ChannelHandlerContext) {
  230. self.dropReferences()
  231. }
  232. @usableFromInline
  233. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  234. self.handleError(error)
  235. }
  236. @usableFromInline
  237. internal func channelActive(context: ChannelHandlerContext) {
  238. self.transportActivated(channel: context.channel)
  239. }
  240. @usableFromInline
  241. internal func channelInactive(context: ChannelHandlerContext) {
  242. self.transportDeactivated()
  243. }
  244. @usableFromInline
  245. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  246. switch self.unwrapInboundIn(data) {
  247. case let .initialMetadata(headers):
  248. self.receiveFromChannel(initialMetadata: headers)
  249. case let .message(box):
  250. self.receiveFromChannel(message: box.message)
  251. case let .trailingMetadata(trailers):
  252. self.receiveFromChannel(trailingMetadata: trailers)
  253. case let .status(status):
  254. self.receiveFromChannel(status: status)
  255. }
  256. // (We're the end of the channel. No need to forward anything.)
  257. }
  258. }
  259. extension ClientTransport {
  260. /// The `Channel` became active. Send out any buffered requests.
  261. private func transportActivated(channel: Channel) {
  262. if self.callEventLoop.inEventLoop {
  263. self._transportActivated(channel: channel)
  264. } else {
  265. self.callEventLoop.execute {
  266. self._transportActivated(channel: channel)
  267. }
  268. }
  269. }
  270. /// On-loop implementation of `transportActivated(channel:)`.
  271. private func _transportActivated(channel: Channel) {
  272. self.callEventLoop.assertInEventLoop()
  273. self.logger.debug("activated stream channel", source: "GRPC")
  274. if self.state.activate() {
  275. self.channel = channel
  276. self.unbuffer()
  277. } else {
  278. channel.close(mode: .all, promise: nil)
  279. }
  280. }
  281. /// The `Channel` became inactive. Fail any buffered writes and forward an error to the
  282. /// interceptor pipeline if necessary.
  283. private func transportDeactivated() {
  284. if self.callEventLoop.inEventLoop {
  285. self._transportDeactivated()
  286. } else {
  287. self.callEventLoop.execute {
  288. self._transportDeactivated()
  289. }
  290. }
  291. }
  292. /// On-loop implementation of `transportDeactivated()`.
  293. private func _transportDeactivated() {
  294. self.callEventLoop.assertInEventLoop()
  295. switch self.state.deactivate() {
  296. case .doNothing:
  297. ()
  298. case .tearDown:
  299. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  300. self.forwardErrorToInterceptors(status)
  301. self.failBufferedWrites(with: status)
  302. self.channelPromise?.fail(status)
  303. case .failChannelPromise:
  304. self.channelPromise?.fail(GRPCError.AlreadyComplete())
  305. }
  306. }
  307. /// Drops any references to the `Channel` and interceptor pipeline.
  308. private func dropReferences() {
  309. if self.callEventLoop.inEventLoop {
  310. self.channel = nil
  311. self._pipeline = nil
  312. } else {
  313. self.callEventLoop.execute {
  314. self.channel = nil
  315. self._pipeline = nil
  316. }
  317. }
  318. }
  319. /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the
  320. /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will
  321. /// also be closed.
  322. internal func handleError(_ error: Error) {
  323. if self.callEventLoop.inEventLoop {
  324. self._handleError(error)
  325. } else {
  326. self.callEventLoop.execute {
  327. self._handleError(error)
  328. }
  329. }
  330. }
  331. /// On-loop implementation of `handleError(_:)`.
  332. private func _handleError(_ error: Error) {
  333. self.callEventLoop.assertInEventLoop()
  334. switch self.state.handleError() {
  335. case .doNothing:
  336. ()
  337. case .propagateError:
  338. self.forwardErrorToInterceptors(error)
  339. self.failBufferedWrites(with: error)
  340. case .propagateErrorAndClose:
  341. self.forwardErrorToInterceptors(error)
  342. self.failBufferedWrites(with: error)
  343. self.channel?.close(mode: .all, promise: nil)
  344. }
  345. }
  346. /// Receive initial metadata from the `Channel`.
  347. private func receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  348. if self.callEventLoop.inEventLoop {
  349. self._receiveFromChannel(initialMetadata: headers)
  350. } else {
  351. self.callEventLoop.execute {
  352. self._receiveFromChannel(initialMetadata: headers)
  353. }
  354. }
  355. }
  356. /// On-loop implementation of `receiveFromChannel(initialMetadata:)`.
  357. private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  358. self.callEventLoop.assertInEventLoop()
  359. if self.state.channelRead(isEnd: false) {
  360. self.forwardToInterceptors(.metadata(headers))
  361. }
  362. }
  363. /// Receive response message bytes from the `Channel`.
  364. private func receiveFromChannel(message buffer: ByteBuffer) {
  365. if self.callEventLoop.inEventLoop {
  366. self._receiveFromChannel(message: buffer)
  367. } else {
  368. self.callEventLoop.execute {
  369. self._receiveFromChannel(message: buffer)
  370. }
  371. }
  372. }
  373. /// On-loop implementation of `receiveFromChannel(message:)`.
  374. private func _receiveFromChannel(message buffer: ByteBuffer) {
  375. self.callEventLoop.assertInEventLoop()
  376. do {
  377. let message = try self.deserializer.deserialize(byteBuffer: buffer)
  378. if self.state.channelRead(isEnd: false) {
  379. self.forwardToInterceptors(.message(message))
  380. }
  381. } catch {
  382. self.handleError(error)
  383. }
  384. }
  385. /// Receive trailing metadata from the `Channel`.
  386. private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) {
  387. // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
  388. // in the interceptor pipeline.
  389. if self.callEventLoop.inEventLoop {
  390. self.trailers = trailers
  391. } else {
  392. self.callEventLoop.execute {
  393. self.trailers = trailers
  394. }
  395. }
  396. }
  397. /// Receive the final status from the `Channel`.
  398. private func receiveFromChannel(status: GRPCStatus) {
  399. if self.callEventLoop.inEventLoop {
  400. self._receiveFromChannel(status: status)
  401. } else {
  402. self.callEventLoop.execute {
  403. self._receiveFromChannel(status: status)
  404. }
  405. }
  406. }
  407. /// On-loop implementation of `receiveFromChannel(status:)`.
  408. private func _receiveFromChannel(status: GRPCStatus) {
  409. self.callEventLoop.assertInEventLoop()
  410. if self.state.channelRead(isEnd: true) {
  411. self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
  412. self.trailers = nil
  413. }
  414. }
  415. }
  416. // MARK: - State Handling
  417. private enum ClientTransportState {
  418. /// Idle. We're waiting for the RPC to be configured.
  419. ///
  420. /// Valid transitions:
  421. /// - `awaitingTransport` (the transport is being configured)
  422. /// - `closed` (the RPC cancels)
  423. case idle
  424. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  425. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  426. /// transport in this state is an error.
  427. ///
  428. /// Valid transitions:
  429. /// - `activatingTransport` (the channel becomes active)
  430. /// - `closing` (the RPC cancels)
  431. /// - `closed` (the channel fails to become active)
  432. case awaitingTransport
  433. /// The transport is active but we're unbuffering any requests to write on that transport.
  434. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  435. /// is okay.
  436. ///
  437. /// Valid transitions:
  438. /// - `active` (we finish unbuffering)
  439. /// - `closing` (the RPC cancels, the channel encounters an error)
  440. /// - `closed` (the channel becomes inactive)
  441. case activatingTransport
  442. /// Fully active. An RPC is in progress and is communicating over an active transport.
  443. ///
  444. /// Valid transitions:
  445. /// - `closing` (the RPC cancels, the channel encounters an error)
  446. /// - `closed` (the channel becomes inactive)
  447. case active
  448. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  449. /// become inactive yet.
  450. ///
  451. /// Valid transitions:
  452. /// - `closed` (the channel becomes inactive)
  453. case closing
  454. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  455. /// be ignored.
  456. ///
  457. /// Valid transitions:
  458. /// - none: this state is terminal.
  459. case closed
  460. /// Whether writes may be unbuffered in this state.
  461. internal var isUnbuffering: Bool {
  462. switch self {
  463. case .activatingTransport:
  464. return true
  465. case .idle, .awaitingTransport, .active, .closing, .closed:
  466. return false
  467. }
  468. }
  469. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  470. internal var mayBuffer: Bool {
  471. switch self {
  472. case .idle, .activatingTransport, .awaitingTransport:
  473. return true
  474. case .active, .closing, .closed:
  475. return false
  476. }
  477. }
  478. }
  479. extension ClientTransportState {
  480. /// The caller would like to configure the transport. Returns a boolean indicating whether we
  481. /// should configure it or not.
  482. mutating func configureTransport() -> Bool {
  483. switch self {
  484. // We're idle until we configure. Anything else is just a repeat request to configure.
  485. case .idle:
  486. self = .awaitingTransport
  487. return true
  488. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  489. return false
  490. }
  491. }
  492. enum SendAction {
  493. /// Write the request into the buffer.
  494. case writeToBuffer
  495. /// Write the request into the channel.
  496. case writeToChannel
  497. /// The RPC has already completed, fail any promise associated with the write.
  498. case alreadyComplete
  499. }
  500. /// The pipeline would like to send a request part to the transport.
  501. mutating func send() -> SendAction {
  502. switch self {
  503. // We don't have any transport yet, just buffer the part.
  504. case .idle, .awaitingTransport, .activatingTransport:
  505. return .writeToBuffer
  506. // We have a `Channel`, we can pipe the write straight through.
  507. case .active:
  508. return .writeToChannel
  509. // The transport is going or has gone away. Fail the promise.
  510. case .closing, .closed:
  511. return .alreadyComplete
  512. }
  513. }
  514. enum UnbufferedAction {
  515. /// Nothing needs to be done.
  516. case doNothing
  517. /// Succeed the channel promise associated with the transport.
  518. case succeedChannelPromise
  519. }
  520. /// We finished dealing with the buffered writes.
  521. mutating func unbuffered() -> UnbufferedAction {
  522. switch self {
  523. // These can't happen since we only begin unbuffering when we transition to
  524. // '.activatingTransport', which must come after these two states..
  525. case .idle, .awaitingTransport:
  526. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  527. // We dealt with any buffered writes. We can become active now. This is the only way to become
  528. // active.
  529. case .activatingTransport:
  530. self = .active
  531. return .succeedChannelPromise
  532. case .active:
  533. preconditionFailure("Unbuffering completed but the transport is already active")
  534. // Something caused us to close while unbuffering, that's okay, we won't take any further
  535. // action.
  536. case .closing, .closed:
  537. return .doNothing
  538. }
  539. }
  540. /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
  541. /// cancellation can go ahead (and also whether the channel should be torn down).
  542. mutating func cancel() -> Bool {
  543. switch self {
  544. case .idle:
  545. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  546. // we're done, fail any writes, and then deal with the cancellation promise.
  547. self = .closed
  548. return true
  549. case .awaitingTransport:
  550. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  551. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  552. // the `Channel` becoming active (see `channelActive(context:)`).
  553. self = .closing
  554. return true
  555. case .activatingTransport:
  556. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  557. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  558. // any buffered writes and then complete the cancellation promise.
  559. self = .closing
  560. return true
  561. case .active:
  562. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  563. self = .closing
  564. return true
  565. case .closing, .closed:
  566. // We're already closing or closing. The cancellation is too late.
  567. return false
  568. }
  569. }
  570. /// `channelActive` was invoked on the transport by the `Channel`.
  571. mutating func activate() -> Bool {
  572. // The channel has become active: what now?
  573. switch self {
  574. case .idle:
  575. preconditionFailure("Can't activate an idle transport")
  576. case .awaitingTransport:
  577. self = .activatingTransport
  578. return true
  579. case .activatingTransport, .active:
  580. preconditionFailure("Invalid state: stream is already active")
  581. case .closing:
  582. // We remain in closing: we only transition to closed on 'channelInactive'.
  583. return false
  584. case .closed:
  585. preconditionFailure("Invalid state: stream is already inactive")
  586. }
  587. }
  588. enum ChannelInactiveAction {
  589. /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
  590. case tearDown
  591. /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
  592. case failChannelPromise
  593. /// Do nothing.
  594. case doNothing
  595. }
  596. /// `channelInactive` was invoked on the transport by the `Channel`.
  597. mutating func deactivate() -> ChannelInactiveAction {
  598. switch self {
  599. case .idle:
  600. // We can't become inactive before we've requested a `Channel`.
  601. preconditionFailure("Can't deactivate an idle transport")
  602. case .awaitingTransport, .activatingTransport, .active:
  603. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  604. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  605. // synthesize an error status to fail the RPC with.
  606. self = .closed
  607. return .tearDown
  608. case .closing:
  609. // We were already closing, now we're fully closed.
  610. self = .closed
  611. return .failChannelPromise
  612. case .closed:
  613. // We're already closed.
  614. return .doNothing
  615. }
  616. }
  617. /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
  618. /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
  619. mutating func channelRead(isEnd: Bool) -> Bool {
  620. switch self {
  621. case .idle, .awaitingTransport:
  622. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  623. preconditionFailure("Can't receive response part on idle transport")
  624. case .activatingTransport, .active:
  625. // We have an active `Channel`, we can forward the request part but we may need to start
  626. // closing if we see the status, since it indicates the call is terminating.
  627. if isEnd {
  628. self = .closing
  629. }
  630. return true
  631. case .closing, .closed:
  632. // We closed early, ignore any reads.
  633. return false
  634. }
  635. }
  636. enum HandleErrorAction {
  637. /// Propagate the error to the interceptor pipeline and fail any buffered writes.
  638. case propagateError
  639. /// As above, but close the 'Channel' as well.
  640. case propagateErrorAndClose
  641. /// No action is required.
  642. case doNothing
  643. }
  644. /// An error was caught.
  645. mutating func handleError() -> HandleErrorAction {
  646. switch self {
  647. case .idle:
  648. // The `Channel` can't error if it doesn't exist.
  649. preconditionFailure("Can't catch error on idle transport")
  650. case .awaitingTransport:
  651. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  652. // buffered writes along the way.
  653. self = .closing
  654. return .propagateError
  655. case .activatingTransport,
  656. .active:
  657. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  658. self = .closing
  659. return .propagateErrorAndClose
  660. case .closing, .closed:
  661. // We're already closing/closed, we can ignore this.
  662. return .doNothing
  663. }
  664. }
  665. enum GetChannelAction {
  666. /// No action is required.
  667. case doNothing
  668. /// Succeed the Channel promise.
  669. case succeed
  670. /// Fail the 'Channel' promise, the RPC is already complete.
  671. case fail
  672. }
  673. /// The caller has asked for the underlying `Channel`.
  674. mutating func getChannel() -> GetChannelAction {
  675. switch self {
  676. case .idle, .awaitingTransport, .activatingTransport:
  677. // Do nothing, we'll complete the promise when we become active or closed.
  678. return .doNothing
  679. case .active:
  680. // We're already active, so there was no promise to succeed when we made this transition. We
  681. // can complete it now.
  682. return .succeed
  683. case .closing:
  684. // We'll complete the promise when we transition to closed.
  685. return .doNothing
  686. case .closed:
  687. // We're already closed; there was no promise to fail when we made this transition. We can go
  688. // ahead and fail it now though.
  689. return .fail
  690. }
  691. }
  692. }
  693. // MARK: - State Actions
  694. extension ClientTransport {
  695. /// Configures this transport with the `configurator`.
  696. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  697. configurator(self).whenFailure { error in
  698. // We might be on a different EL, but `handleError` will sort that out for us, so no need to
  699. // hop.
  700. if error is GRPCStatus || error is GRPCStatusTransformable {
  701. self.handleError(error)
  702. } else {
  703. // Fallback to something which will mark the RPC as 'unavailable'.
  704. self.handleError(ConnectionFailure(reason: error))
  705. }
  706. }
  707. }
  708. /// Append a request part to the write buffer.
  709. /// - Parameters:
  710. /// - part: The request part to buffer.
  711. /// - promise: A promise to complete when the request part has been sent.
  712. private func buffer(
  713. _ part: GRPCClientRequestPart<Request>,
  714. promise: EventLoopPromise<Void>?
  715. ) {
  716. self.callEventLoop.assertInEventLoop()
  717. self.logger.debug("buffering request part", metadata: [
  718. "request_part": "\(part.name)",
  719. "call_state": self.stateForLogging,
  720. ], source: "GRPC")
  721. self.writeBuffer.append(.init(request: part, promise: promise))
  722. }
  723. /// Writes any buffered request parts to the `Channel`.
  724. private func unbuffer() {
  725. self.callEventLoop.assertInEventLoop()
  726. guard let channel = self.channel else {
  727. return
  728. }
  729. // Save any flushing until we're done writing.
  730. var shouldFlush = false
  731. self.logger.debug("unbuffering request parts", metadata: [
  732. "request_parts": "\(self.writeBuffer.count)",
  733. ], source: "GRPC")
  734. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  735. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  736. // may miss more buffered writes.
  737. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  738. // Pull out as many writes as possible.
  739. while let write = self.writeBuffer.popFirst() {
  740. self.logger.debug("unbuffering request part", metadata: [
  741. "request_part": "\(write.request.name)",
  742. ], source: "GRPC")
  743. if !shouldFlush {
  744. shouldFlush = self.shouldFlush(after: write.request)
  745. }
  746. self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false)
  747. }
  748. // Okay, flush now.
  749. if shouldFlush {
  750. shouldFlush = false
  751. channel.flush()
  752. }
  753. }
  754. if self.writeBuffer.isEmpty {
  755. self.logger.debug("request buffer drained", source: "GRPC")
  756. } else {
  757. self.logger.notice(
  758. "unbuffering aborted",
  759. metadata: ["call_state": self.stateForLogging],
  760. source: "GRPC"
  761. )
  762. }
  763. // We're unbuffered. What now?
  764. switch self.state.unbuffered() {
  765. case .doNothing:
  766. ()
  767. case .succeedChannelPromise:
  768. self.channelPromise?.succeed(channel)
  769. }
  770. }
  771. /// Fails any promises that come with buffered writes with `error`.
  772. /// - Parameter error: The `Error` to fail promises with.
  773. private func failBufferedWrites(with error: Error) {
  774. self.logger.debug("failing buffered writes", metadata: [
  775. "call_state": self.stateForLogging,
  776. ], source: "GRPC")
  777. while let write = self.writeBuffer.popFirst() {
  778. write.promise?.fail(error)
  779. }
  780. }
  781. /// Write a request part to the `Channel`.
  782. /// - Parameters:
  783. /// - channel: The `Channel` to write `part` to.
  784. /// - part: The request part to write.
  785. /// - promise: A promise to complete once the write has been completed.
  786. /// - flush: Whether to flush the `Channel` after writing.
  787. private func writeToChannel(
  788. _ channel: Channel,
  789. part: GRPCClientRequestPart<Request>,
  790. promise: EventLoopPromise<Void>?,
  791. flush: Bool
  792. ) {
  793. switch part {
  794. case let .metadata(headers):
  795. let head = self.makeRequestHead(with: headers)
  796. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  797. case let .message(request, metadata):
  798. do {
  799. let bytes = try self.serializer.serialize(request, allocator: channel.allocator)
  800. let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
  801. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  802. } catch {
  803. self.handleError(error)
  804. }
  805. case .end:
  806. channel.write(self.wrapOutboundOut(.end), promise: promise)
  807. }
  808. if flush {
  809. channel.flush()
  810. }
  811. }
  812. /// Forward the response part to the interceptor pipeline.
  813. /// - Parameter part: The response part to forward.
  814. private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
  815. self.callEventLoop.assertInEventLoop()
  816. self._pipeline?.receive(part)
  817. }
  818. /// Forward the error to the interceptor pipeline.
  819. /// - Parameter error: The error to forward.
  820. private func forwardErrorToInterceptors(_ error: Error) {
  821. self.callEventLoop.assertInEventLoop()
  822. self._pipeline?.errorCaught(error)
  823. }
  824. }
  825. // MARK: - Helpers
  826. extension ClientTransport {
  827. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  828. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  829. switch part {
  830. case .metadata:
  831. // If we're not streaming requests then we hold off on the flush until we see end.
  832. return self.isStreamingRequests
  833. case let .message(_, metadata):
  834. // Message flushing is determined by caller preference.
  835. return metadata.flush
  836. case .end:
  837. // Always flush at the end of the request stream.
  838. return true
  839. }
  840. }
  841. /// Make a `_GRPCRequestHead` with the provided metadata.
  842. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  843. return _GRPCRequestHead(
  844. method: self.callDetails.options.cacheable ? "GET" : "POST",
  845. scheme: self.callDetails.scheme,
  846. path: self.callDetails.path,
  847. host: self.callDetails.authority,
  848. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  849. customMetadata: metadata,
  850. encoding: self.callDetails.options.messageEncoding
  851. )
  852. }
  853. }
  854. extension GRPCClientRequestPart {
  855. /// The name of the request part, used for logging.
  856. fileprivate var name: String {
  857. switch self {
  858. case .metadata:
  859. return "metadata"
  860. case .message:
  861. return "message"
  862. case .end:
  863. return "end"
  864. }
  865. }
  866. }
  867. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  868. // well as extract a 'GRPCStatus' with code '.unavailable'.
  869. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  870. /// The reason the connection failed.
  871. var reason: Error
  872. init(reason: Error) {
  873. self.reason = reason
  874. }
  875. var description: String {
  876. return String(describing: self.reason)
  877. }
  878. func makeGRPCStatus() -> GRPCStatus {
  879. return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
  880. }
  881. }