ClientTransport.swift 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` the call is running on. State must be accessed from this event loop.
  39. @usableFromInline
  40. internal let callEventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: ClientTransportState = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// The request serializer.
  51. private let serializer: AnySerializer<Request>
  52. /// The response deserializer.
  53. private let deserializer: AnyDeserializer<Response>
  54. /// A request part and a promise.
  55. private struct RequestAndPromise {
  56. var request: GRPCClientRequestPart<Request>
  57. var promise: EventLoopPromise<Void>?
  58. }
  59. /// Details about the call.
  60. internal let callDetails: CallDetails
  61. /// A logger.
  62. internal var logger: GRPCLogger
  63. /// Is the call streaming requests?
  64. private var isStreamingRequests: Bool {
  65. switch self.callDetails.type {
  66. case .unary, .serverStreaming:
  67. return false
  68. case .clientStreaming, .bidirectionalStreaming:
  69. return true
  70. }
  71. }
  72. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  73. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  74. // trailers here and only forward them when we receive the status.
  75. private var trailers: HPACKHeaders?
  76. /// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
  77. /// from the `ChannelPipeline` in order to break reference cycles.
  78. @usableFromInline
  79. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  80. /// The `NIO.Channel` used by the transport, if it is available.
  81. private var channel: Channel?
  82. /// Our current state as logging metadata.
  83. private var stateForLogging: Logger.MetadataValue {
  84. if self.state.mayBuffer {
  85. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  86. } else {
  87. return "\(self.state)"
  88. }
  89. }
  90. internal init(
  91. details: CallDetails,
  92. eventLoop: EventLoop,
  93. interceptors: [ClientInterceptor<Request, Response>],
  94. serializer: AnySerializer<Request>,
  95. deserializer: AnyDeserializer<Response>,
  96. errorDelegate: ClientErrorDelegate?,
  97. onError: @escaping (Error) -> Void,
  98. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  99. ) {
  100. self.callEventLoop = eventLoop
  101. self.callDetails = details
  102. let logger = GRPCLogger(wrapping: details.options.logger)
  103. self.logger = logger
  104. self.serializer = serializer
  105. self.deserializer = deserializer
  106. self._pipeline = ClientInterceptorPipeline(
  107. eventLoop: eventLoop,
  108. details: details,
  109. logger: logger,
  110. interceptors: interceptors,
  111. errorDelegate: errorDelegate,
  112. onError: onError,
  113. onCancel: self.cancelFromPipeline(promise:),
  114. onRequestPart: self.sendFromPipeline(_:promise:),
  115. onResponsePart: onResponsePart
  116. )
  117. }
  118. // MARK: - Call Object API
  119. /// Configure the transport to communicate with the server.
  120. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  121. /// - Important: This *must* to be called from the `callEventLoop`.
  122. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  123. self.callEventLoop.assertInEventLoop()
  124. if self.state.configureTransport() {
  125. self.configure(using: configurator)
  126. }
  127. }
  128. /// Send a request part – via the interceptor pipeline – to the server.
  129. /// - Parameters:
  130. /// - part: The part to send.
  131. /// - promise: A promise which will be completed when the request part has been handled.
  132. /// - Important: This *must* to be called from the `callEventLoop`.
  133. @inlinable
  134. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  135. self.callEventLoop.assertInEventLoop()
  136. if let pipeline = self._pipeline {
  137. pipeline.send(part, promise: promise)
  138. } else {
  139. promise?.fail(GRPCError.AlreadyComplete())
  140. }
  141. }
  142. /// Attempt to cancel the RPC notifying any interceptors.
  143. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  144. /// been handled.
  145. internal func cancel(promise: EventLoopPromise<Void>?) {
  146. self.callEventLoop.assertInEventLoop()
  147. if let pipeline = self._pipeline {
  148. pipeline.cancel(promise: promise)
  149. } else {
  150. promise?.fail(GRPCError.AlreadyComplete())
  151. }
  152. }
  153. /// A request for the underlying `Channel`.
  154. internal func getChannel() -> EventLoopFuture<Channel> {
  155. self.callEventLoop.assertInEventLoop()
  156. // Do we already have a promise?
  157. if let promise = self.channelPromise {
  158. return promise.futureResult
  159. } else {
  160. // Make and store the promise.
  161. let promise = self.callEventLoop.makePromise(of: Channel.self)
  162. self.channelPromise = promise
  163. // Ask the state machine if we can have it.
  164. switch self.state.getChannel() {
  165. case .succeed:
  166. if let channel = self.channel {
  167. promise.succeed(channel)
  168. }
  169. case .fail:
  170. promise.fail(GRPCError.AlreadyComplete())
  171. case .doNothing:
  172. ()
  173. }
  174. return promise.futureResult
  175. }
  176. }
  177. }
  178. // MARK: - Pipeline API
  179. extension ClientTransport {
  180. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  181. /// - Parameters:
  182. /// - part: The request part to send.
  183. /// - promise: A promise which will be completed when the part has been handled.
  184. /// - Important: This *must* to be called from the `callEventLoop`.
  185. private func sendFromPipeline(
  186. _ part: GRPCClientRequestPart<Request>,
  187. promise: EventLoopPromise<Void>?
  188. ) {
  189. self.callEventLoop.assertInEventLoop()
  190. switch self.state.send() {
  191. case .writeToBuffer:
  192. self.buffer(part, promise: promise)
  193. case .writeToChannel:
  194. // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the
  195. // correct state, the requirements of that state are having an active `Channel`.
  196. self.writeToChannel(
  197. self.channel!,
  198. part: part,
  199. promise: promise,
  200. flush: self.shouldFlush(after: part)
  201. )
  202. case .alreadyComplete:
  203. promise?.fail(GRPCError.AlreadyComplete())
  204. }
  205. }
  206. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  207. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  208. /// - Important: This *must* to be called from the `callEventLoop`.
  209. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  210. self.callEventLoop.assertInEventLoop()
  211. if self.state.cancel() {
  212. let error = GRPCError.RPCCancelledByClient()
  213. self.forwardErrorToInterceptors(error)
  214. self.failBufferedWrites(with: error)
  215. self.channel?.close(mode: .all, promise: nil)
  216. self.channelPromise?.fail(error)
  217. promise?.succeed(())
  218. } else {
  219. promise?.fail(GRPCError.AlreadyComplete())
  220. }
  221. }
  222. }
  223. // MARK: - ChannelHandler API
  224. extension ClientTransport: ChannelInboundHandler {
  225. @usableFromInline
  226. typealias InboundIn = _RawGRPCClientResponsePart
  227. @usableFromInline
  228. typealias OutboundOut = _RawGRPCClientRequestPart
  229. @usableFromInline
  230. internal func handlerRemoved(context: ChannelHandlerContext) {
  231. self.dropReferences()
  232. }
  233. @usableFromInline
  234. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  235. self.handleError(error)
  236. }
  237. @usableFromInline
  238. internal func channelActive(context: ChannelHandlerContext) {
  239. self.transportActivated(channel: context.channel)
  240. }
  241. @usableFromInline
  242. internal func channelInactive(context: ChannelHandlerContext) {
  243. self.transportDeactivated()
  244. }
  245. @usableFromInline
  246. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  247. switch self.unwrapInboundIn(data) {
  248. case let .initialMetadata(headers):
  249. self.receiveFromChannel(initialMetadata: headers)
  250. case let .message(box):
  251. self.receiveFromChannel(message: box.message)
  252. case let .trailingMetadata(trailers):
  253. self.receiveFromChannel(trailingMetadata: trailers)
  254. case let .status(status):
  255. self.receiveFromChannel(status: status)
  256. }
  257. // (We're the end of the channel. No need to forward anything.)
  258. }
  259. }
  260. extension ClientTransport {
  261. /// The `Channel` became active. Send out any buffered requests.
  262. private func transportActivated(channel: Channel) {
  263. if self.callEventLoop.inEventLoop {
  264. self._transportActivated(channel: channel)
  265. } else {
  266. self.callEventLoop.execute {
  267. self._transportActivated(channel: channel)
  268. }
  269. }
  270. }
  271. /// On-loop implementation of `transportActivated(channel:)`.
  272. private func _transportActivated(channel: Channel) {
  273. self.callEventLoop.assertInEventLoop()
  274. if self.state.activate() {
  275. self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
  276. self._pipeline?.logger = self.logger
  277. self.logger.debug("activated stream channel")
  278. self.channel = channel
  279. self.unbuffer()
  280. } else {
  281. channel.close(mode: .all, promise: nil)
  282. }
  283. }
  284. /// The `Channel` became inactive. Fail any buffered writes and forward an error to the
  285. /// interceptor pipeline if necessary.
  286. private func transportDeactivated() {
  287. if self.callEventLoop.inEventLoop {
  288. self._transportDeactivated()
  289. } else {
  290. self.callEventLoop.execute {
  291. self._transportDeactivated()
  292. }
  293. }
  294. }
  295. /// On-loop implementation of `transportDeactivated()`.
  296. private func _transportDeactivated() {
  297. self.callEventLoop.assertInEventLoop()
  298. switch self.state.deactivate() {
  299. case .doNothing:
  300. ()
  301. case .tearDown:
  302. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  303. self.forwardErrorToInterceptors(status)
  304. self.failBufferedWrites(with: status)
  305. self.channelPromise?.fail(status)
  306. case .failChannelPromise:
  307. self.channelPromise?.fail(GRPCError.AlreadyComplete())
  308. }
  309. }
  310. /// Drops any references to the `Channel` and interceptor pipeline.
  311. private func dropReferences() {
  312. if self.callEventLoop.inEventLoop {
  313. self.channel = nil
  314. self._pipeline = nil
  315. } else {
  316. self.callEventLoop.execute {
  317. self.channel = nil
  318. self._pipeline = nil
  319. }
  320. }
  321. }
  322. /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the
  323. /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will
  324. /// also be closed.
  325. internal func handleError(_ error: Error) {
  326. if self.callEventLoop.inEventLoop {
  327. self._handleError(error)
  328. } else {
  329. self.callEventLoop.execute {
  330. self._handleError(error)
  331. }
  332. }
  333. }
  334. /// On-loop implementation of `handleError(_:)`.
  335. private func _handleError(_ error: Error) {
  336. self.callEventLoop.assertInEventLoop()
  337. switch self.state.handleError() {
  338. case .doNothing:
  339. ()
  340. case .propagateError:
  341. self.forwardErrorToInterceptors(error)
  342. self.failBufferedWrites(with: error)
  343. case .propagateErrorAndClose:
  344. self.forwardErrorToInterceptors(error)
  345. self.failBufferedWrites(with: error)
  346. self.channel?.close(mode: .all, promise: nil)
  347. }
  348. }
  349. /// Receive initial metadata from the `Channel`.
  350. private func receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  351. if self.callEventLoop.inEventLoop {
  352. self._receiveFromChannel(initialMetadata: headers)
  353. } else {
  354. self.callEventLoop.execute {
  355. self._receiveFromChannel(initialMetadata: headers)
  356. }
  357. }
  358. }
  359. /// On-loop implementation of `receiveFromChannel(initialMetadata:)`.
  360. private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  361. self.callEventLoop.assertInEventLoop()
  362. if self.state.channelRead(isEnd: false) {
  363. self.forwardToInterceptors(.metadata(headers))
  364. }
  365. }
  366. /// Receive response message bytes from the `Channel`.
  367. private func receiveFromChannel(message buffer: ByteBuffer) {
  368. if self.callEventLoop.inEventLoop {
  369. self._receiveFromChannel(message: buffer)
  370. } else {
  371. self.callEventLoop.execute {
  372. self._receiveFromChannel(message: buffer)
  373. }
  374. }
  375. }
  376. /// On-loop implementation of `receiveFromChannel(message:)`.
  377. private func _receiveFromChannel(message buffer: ByteBuffer) {
  378. self.callEventLoop.assertInEventLoop()
  379. do {
  380. let message = try self.deserializer.deserialize(byteBuffer: buffer)
  381. if self.state.channelRead(isEnd: false) {
  382. self.forwardToInterceptors(.message(message))
  383. }
  384. } catch {
  385. self.handleError(error)
  386. }
  387. }
  388. /// Receive trailing metadata from the `Channel`.
  389. private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) {
  390. // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
  391. // in the interceptor pipeline.
  392. if self.callEventLoop.inEventLoop {
  393. self.trailers = trailers
  394. } else {
  395. self.callEventLoop.execute {
  396. self.trailers = trailers
  397. }
  398. }
  399. }
  400. /// Receive the final status from the `Channel`.
  401. private func receiveFromChannel(status: GRPCStatus) {
  402. if self.callEventLoop.inEventLoop {
  403. self._receiveFromChannel(status: status)
  404. } else {
  405. self.callEventLoop.execute {
  406. self._receiveFromChannel(status: status)
  407. }
  408. }
  409. }
  410. /// On-loop implementation of `receiveFromChannel(status:)`.
  411. private func _receiveFromChannel(status: GRPCStatus) {
  412. self.callEventLoop.assertInEventLoop()
  413. if self.state.channelRead(isEnd: true) {
  414. self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
  415. self.trailers = nil
  416. }
  417. }
  418. }
  419. // MARK: - State Handling
  420. private enum ClientTransportState {
  421. /// Idle. We're waiting for the RPC to be configured.
  422. ///
  423. /// Valid transitions:
  424. /// - `awaitingTransport` (the transport is being configured)
  425. /// - `closed` (the RPC cancels)
  426. case idle
  427. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  428. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  429. /// transport in this state is an error.
  430. ///
  431. /// Valid transitions:
  432. /// - `activatingTransport` (the channel becomes active)
  433. /// - `closing` (the RPC cancels)
  434. /// - `closed` (the channel fails to become active)
  435. case awaitingTransport
  436. /// The transport is active but we're unbuffering any requests to write on that transport.
  437. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  438. /// is okay.
  439. ///
  440. /// Valid transitions:
  441. /// - `active` (we finish unbuffering)
  442. /// - `closing` (the RPC cancels, the channel encounters an error)
  443. /// - `closed` (the channel becomes inactive)
  444. case activatingTransport
  445. /// Fully active. An RPC is in progress and is communicating over an active transport.
  446. ///
  447. /// Valid transitions:
  448. /// - `closing` (the RPC cancels, the channel encounters an error)
  449. /// - `closed` (the channel becomes inactive)
  450. case active
  451. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  452. /// become inactive yet.
  453. ///
  454. /// Valid transitions:
  455. /// - `closed` (the channel becomes inactive)
  456. case closing
  457. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  458. /// be ignored.
  459. ///
  460. /// Valid transitions:
  461. /// - none: this state is terminal.
  462. case closed
  463. /// Whether writes may be unbuffered in this state.
  464. internal var isUnbuffering: Bool {
  465. switch self {
  466. case .activatingTransport:
  467. return true
  468. case .idle, .awaitingTransport, .active, .closing, .closed:
  469. return false
  470. }
  471. }
  472. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  473. internal var mayBuffer: Bool {
  474. switch self {
  475. case .idle, .activatingTransport, .awaitingTransport:
  476. return true
  477. case .active, .closing, .closed:
  478. return false
  479. }
  480. }
  481. }
  482. extension ClientTransportState {
  483. /// The caller would like to configure the transport. Returns a boolean indicating whether we
  484. /// should configure it or not.
  485. mutating func configureTransport() -> Bool {
  486. switch self {
  487. // We're idle until we configure. Anything else is just a repeat request to configure.
  488. case .idle:
  489. self = .awaitingTransport
  490. return true
  491. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  492. return false
  493. }
  494. }
  495. enum SendAction {
  496. /// Write the request into the buffer.
  497. case writeToBuffer
  498. /// Write the request into the channel.
  499. case writeToChannel
  500. /// The RPC has already completed, fail any promise associated with the write.
  501. case alreadyComplete
  502. }
  503. /// The pipeline would like to send a request part to the transport.
  504. mutating func send() -> SendAction {
  505. switch self {
  506. // We don't have any transport yet, just buffer the part.
  507. case .idle, .awaitingTransport, .activatingTransport:
  508. return .writeToBuffer
  509. // We have a `Channel`, we can pipe the write straight through.
  510. case .active:
  511. return .writeToChannel
  512. // The transport is going or has gone away. Fail the promise.
  513. case .closing, .closed:
  514. return .alreadyComplete
  515. }
  516. }
  517. enum UnbufferedAction {
  518. /// Nothing needs to be done.
  519. case doNothing
  520. /// Succeed the channel promise associated with the transport.
  521. case succeedChannelPromise
  522. }
  523. /// We finished dealing with the buffered writes.
  524. mutating func unbuffered() -> UnbufferedAction {
  525. switch self {
  526. // These can't happen since we only begin unbuffering when we transition to
  527. // '.activatingTransport', which must come after these two states..
  528. case .idle, .awaitingTransport:
  529. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  530. // We dealt with any buffered writes. We can become active now. This is the only way to become
  531. // active.
  532. case .activatingTransport:
  533. self = .active
  534. return .succeedChannelPromise
  535. case .active:
  536. preconditionFailure("Unbuffering completed but the transport is already active")
  537. // Something caused us to close while unbuffering, that's okay, we won't take any further
  538. // action.
  539. case .closing, .closed:
  540. return .doNothing
  541. }
  542. }
  543. /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
  544. /// cancellation can go ahead (and also whether the channel should be torn down).
  545. mutating func cancel() -> Bool {
  546. switch self {
  547. case .idle:
  548. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  549. // we're done, fail any writes, and then deal with the cancellation promise.
  550. self = .closed
  551. return true
  552. case .awaitingTransport:
  553. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  554. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  555. // the `Channel` becoming active (see `channelActive(context:)`).
  556. self = .closing
  557. return true
  558. case .activatingTransport:
  559. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  560. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  561. // any buffered writes and then complete the cancellation promise.
  562. self = .closing
  563. return true
  564. case .active:
  565. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  566. self = .closing
  567. return true
  568. case .closing, .closed:
  569. // We're already closing or closing. The cancellation is too late.
  570. return false
  571. }
  572. }
  573. /// `channelActive` was invoked on the transport by the `Channel`.
  574. mutating func activate() -> Bool {
  575. // The channel has become active: what now?
  576. switch self {
  577. case .idle:
  578. preconditionFailure("Can't activate an idle transport")
  579. case .awaitingTransport:
  580. self = .activatingTransport
  581. return true
  582. case .activatingTransport, .active:
  583. preconditionFailure("Invalid state: stream is already active")
  584. case .closing:
  585. // We remain in closing: we only transition to closed on 'channelInactive'.
  586. return false
  587. case .closed:
  588. preconditionFailure("Invalid state: stream is already inactive")
  589. }
  590. }
  591. enum ChannelInactiveAction {
  592. /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
  593. case tearDown
  594. /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
  595. case failChannelPromise
  596. /// Do nothing.
  597. case doNothing
  598. }
  599. /// `channelInactive` was invoked on the transport by the `Channel`.
  600. mutating func deactivate() -> ChannelInactiveAction {
  601. switch self {
  602. case .idle:
  603. // We can't become inactive before we've requested a `Channel`.
  604. preconditionFailure("Can't deactivate an idle transport")
  605. case .awaitingTransport, .activatingTransport, .active:
  606. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  607. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  608. // synthesize an error status to fail the RPC with.
  609. self = .closed
  610. return .tearDown
  611. case .closing:
  612. // We were already closing, now we're fully closed.
  613. self = .closed
  614. return .failChannelPromise
  615. case .closed:
  616. // We're already closed.
  617. return .doNothing
  618. }
  619. }
  620. /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
  621. /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
  622. mutating func channelRead(isEnd: Bool) -> Bool {
  623. switch self {
  624. case .idle, .awaitingTransport:
  625. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  626. preconditionFailure("Can't receive response part on idle transport")
  627. case .activatingTransport, .active:
  628. // We have an active `Channel`, we can forward the request part but we may need to start
  629. // closing if we see the status, since it indicates the call is terminating.
  630. if isEnd {
  631. self = .closing
  632. }
  633. return true
  634. case .closing, .closed:
  635. // We closed early, ignore any reads.
  636. return false
  637. }
  638. }
  639. enum HandleErrorAction {
  640. /// Propagate the error to the interceptor pipeline and fail any buffered writes.
  641. case propagateError
  642. /// As above, but close the 'Channel' as well.
  643. case propagateErrorAndClose
  644. /// No action is required.
  645. case doNothing
  646. }
  647. /// An error was caught.
  648. mutating func handleError() -> HandleErrorAction {
  649. switch self {
  650. case .idle:
  651. // The `Channel` can't error if it doesn't exist.
  652. preconditionFailure("Can't catch error on idle transport")
  653. case .awaitingTransport:
  654. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  655. // buffered writes along the way.
  656. self = .closing
  657. return .propagateError
  658. case .activatingTransport,
  659. .active:
  660. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  661. self = .closing
  662. return .propagateErrorAndClose
  663. case .closing, .closed:
  664. // We're already closing/closed, we can ignore this.
  665. return .doNothing
  666. }
  667. }
  668. enum GetChannelAction {
  669. /// No action is required.
  670. case doNothing
  671. /// Succeed the Channel promise.
  672. case succeed
  673. /// Fail the 'Channel' promise, the RPC is already complete.
  674. case fail
  675. }
  676. /// The caller has asked for the underlying `Channel`.
  677. mutating func getChannel() -> GetChannelAction {
  678. switch self {
  679. case .idle, .awaitingTransport, .activatingTransport:
  680. // Do nothing, we'll complete the promise when we become active or closed.
  681. return .doNothing
  682. case .active:
  683. // We're already active, so there was no promise to succeed when we made this transition. We
  684. // can complete it now.
  685. return .succeed
  686. case .closing:
  687. // We'll complete the promise when we transition to closed.
  688. return .doNothing
  689. case .closed:
  690. // We're already closed; there was no promise to fail when we made this transition. We can go
  691. // ahead and fail it now though.
  692. return .fail
  693. }
  694. }
  695. }
  696. // MARK: - State Actions
  697. extension ClientTransport {
  698. /// Configures this transport with the `configurator`.
  699. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  700. configurator(self).whenFailure { error in
  701. // We might be on a different EL, but `handleError` will sort that out for us, so no need to
  702. // hop.
  703. if error is GRPCStatus || error is GRPCStatusTransformable {
  704. self.handleError(error)
  705. } else {
  706. // Fallback to something which will mark the RPC as 'unavailable'.
  707. self.handleError(ConnectionFailure(reason: error))
  708. }
  709. }
  710. }
  711. /// Append a request part to the write buffer.
  712. /// - Parameters:
  713. /// - part: The request part to buffer.
  714. /// - promise: A promise to complete when the request part has been sent.
  715. private func buffer(
  716. _ part: GRPCClientRequestPart<Request>,
  717. promise: EventLoopPromise<Void>?
  718. ) {
  719. self.callEventLoop.assertInEventLoop()
  720. self.logger.debug("buffering request part", metadata: [
  721. "request_part": "\(part.name)",
  722. "call_state": self.stateForLogging,
  723. ])
  724. self.writeBuffer.append(.init(request: part, promise: promise))
  725. }
  726. /// Writes any buffered request parts to the `Channel`.
  727. private func unbuffer() {
  728. self.callEventLoop.assertInEventLoop()
  729. guard let channel = self.channel else {
  730. return
  731. }
  732. // Save any flushing until we're done writing.
  733. var shouldFlush = false
  734. self.logger.debug("unbuffering request parts", metadata: [
  735. "request_parts": "\(self.writeBuffer.count)",
  736. ])
  737. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  738. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  739. // may miss more buffered writes.
  740. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  741. // Pull out as many writes as possible.
  742. while let write = self.writeBuffer.popFirst() {
  743. self.logger.debug("unbuffering request part", metadata: [
  744. "request_part": "\(write.request.name)",
  745. ])
  746. if !shouldFlush {
  747. shouldFlush = self.shouldFlush(after: write.request)
  748. }
  749. self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false)
  750. }
  751. // Okay, flush now.
  752. if shouldFlush {
  753. shouldFlush = false
  754. channel.flush()
  755. }
  756. }
  757. if self.writeBuffer.isEmpty {
  758. self.logger.debug("request buffer drained")
  759. } else {
  760. self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging])
  761. }
  762. // We're unbuffered. What now?
  763. switch self.state.unbuffered() {
  764. case .doNothing:
  765. ()
  766. case .succeedChannelPromise:
  767. self.channelPromise?.succeed(channel)
  768. }
  769. }
  770. /// Fails any promises that come with buffered writes with `error`.
  771. /// - Parameter error: The `Error` to fail promises with.
  772. private func failBufferedWrites(with error: Error) {
  773. self.logger.debug("failing buffered writes", metadata: ["call_state": self.stateForLogging])
  774. while let write = self.writeBuffer.popFirst() {
  775. write.promise?.fail(error)
  776. }
  777. }
  778. /// Write a request part to the `Channel`.
  779. /// - Parameters:
  780. /// - channel: The `Channel` to write `part` to.
  781. /// - part: The request part to write.
  782. /// - promise: A promise to complete once the write has been completed.
  783. /// - flush: Whether to flush the `Channel` after writing.
  784. private func writeToChannel(
  785. _ channel: Channel,
  786. part: GRPCClientRequestPart<Request>,
  787. promise: EventLoopPromise<Void>?,
  788. flush: Bool
  789. ) {
  790. switch part {
  791. case let .metadata(headers):
  792. let head = self.makeRequestHead(with: headers)
  793. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  794. case let .message(request, metadata):
  795. do {
  796. let bytes = try self.serializer.serialize(request, allocator: channel.allocator)
  797. let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
  798. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  799. } catch {
  800. self.handleError(error)
  801. }
  802. case .end:
  803. channel.write(self.wrapOutboundOut(.end), promise: promise)
  804. }
  805. if flush {
  806. channel.flush()
  807. }
  808. }
  809. /// Forward the response part to the interceptor pipeline.
  810. /// - Parameter part: The response part to forward.
  811. private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
  812. self.callEventLoop.assertInEventLoop()
  813. self._pipeline?.receive(part)
  814. }
  815. /// Forward the error to the interceptor pipeline.
  816. /// - Parameter error: The error to forward.
  817. private func forwardErrorToInterceptors(_ error: Error) {
  818. self.callEventLoop.assertInEventLoop()
  819. self._pipeline?.errorCaught(error)
  820. }
  821. }
  822. // MARK: - Helpers
  823. extension ClientTransport {
  824. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  825. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  826. switch part {
  827. case .metadata:
  828. // If we're not streaming requests then we hold off on the flush until we see end.
  829. return self.isStreamingRequests
  830. case let .message(_, metadata):
  831. // Message flushing is determined by caller preference.
  832. return metadata.flush
  833. case .end:
  834. // Always flush at the end of the request stream.
  835. return true
  836. }
  837. }
  838. /// Make a `_GRPCRequestHead` with the provided metadata.
  839. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  840. return _GRPCRequestHead(
  841. method: self.callDetails.options.cacheable ? "GET" : "POST",
  842. scheme: self.callDetails.scheme,
  843. path: self.callDetails.path,
  844. host: self.callDetails.authority,
  845. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  846. customMetadata: metadata,
  847. encoding: self.callDetails.options.messageEncoding
  848. )
  849. }
  850. }
  851. extension GRPCClientRequestPart {
  852. /// The name of the request part, used for logging.
  853. fileprivate var name: String {
  854. switch self {
  855. case .metadata:
  856. return "metadata"
  857. case .message:
  858. return "message"
  859. case .end:
  860. return "end"
  861. }
  862. }
  863. }
  864. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  865. // well as extract a 'GRPCStatus' with code '.unavailable'.
  866. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  867. /// The reason the connection failed.
  868. var reason: Error
  869. init(reason: Error) {
  870. self.reason = reason
  871. }
  872. var description: String {
  873. return String(describing: self.reason)
  874. }
  875. func makeGRPCStatus() -> GRPCStatus {
  876. return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
  877. }
  878. }