ClientTransport.swift 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` the call is running on. State must be accessed from this event loop.
  39. @usableFromInline
  40. internal let callEventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: ClientTransportState = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// The request serializer.
  51. private let serializer: AnySerializer<Request>
  52. /// The response deserializer.
  53. private let deserializer: AnyDeserializer<Response>
  54. /// A request part and a promise.
  55. private struct RequestAndPromise {
  56. var request: GRPCClientRequestPart<Request>
  57. var promise: EventLoopPromise<Void>?
  58. }
  59. /// Details about the call.
  60. internal let callDetails: CallDetails
  61. /// A logger.
  62. internal var logger: GRPCLogger
  63. /// Is the call streaming requests?
  64. private var isStreamingRequests: Bool {
  65. switch self.callDetails.type {
  66. case .unary, .serverStreaming:
  67. return false
  68. case .clientStreaming, .bidirectionalStreaming:
  69. return true
  70. }
  71. }
  72. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  73. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  74. // trailers here and only forward them when we receive the status.
  75. private var trailers: HPACKHeaders?
  76. /// The interceptor pipeline connected to this transport. The pipeline also holds references
  77. /// to `self` which are dropped when the interceptor pipeline is closed.
  78. @usableFromInline
  79. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  80. /// The `NIO.Channel` used by the transport, if it is available.
  81. private var channel: Channel?
  82. /// Our current state as logging metadata.
  83. private var stateForLogging: Logger.MetadataValue {
  84. if self.state.mayBuffer {
  85. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  86. } else {
  87. return "\(self.state)"
  88. }
  89. }
  90. internal init(
  91. details: CallDetails,
  92. eventLoop: EventLoop,
  93. interceptors: [ClientInterceptor<Request, Response>],
  94. serializer: AnySerializer<Request>,
  95. deserializer: AnyDeserializer<Response>,
  96. errorDelegate: ClientErrorDelegate?,
  97. onError: @escaping (Error) -> Void,
  98. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  99. ) {
  100. self.callEventLoop = eventLoop
  101. self.callDetails = details
  102. let logger = GRPCLogger(wrapping: details.options.logger)
  103. self.logger = logger
  104. self.serializer = serializer
  105. self.deserializer = deserializer
  106. // The references to self held by the pipeline are dropped when it is closed.
  107. self._pipeline = ClientInterceptorPipeline(
  108. eventLoop: eventLoop,
  109. details: details,
  110. logger: logger,
  111. interceptors: interceptors,
  112. errorDelegate: errorDelegate,
  113. onError: onError,
  114. onCancel: self.cancelFromPipeline(promise:),
  115. onRequestPart: self.sendFromPipeline(_:promise:),
  116. onResponsePart: onResponsePart
  117. )
  118. }
  119. // MARK: - Call Object API
  120. /// Configure the transport to communicate with the server.
  121. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  122. /// - Important: This *must* to be called from the `callEventLoop`.
  123. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  124. self.callEventLoop.assertInEventLoop()
  125. if self.state.configureTransport() {
  126. self.configure(using: configurator)
  127. }
  128. }
  129. /// Send a request part – via the interceptor pipeline – to the server.
  130. /// - Parameters:
  131. /// - part: The part to send.
  132. /// - promise: A promise which will be completed when the request part has been handled.
  133. /// - Important: This *must* to be called from the `callEventLoop`.
  134. @inlinable
  135. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  136. self.callEventLoop.assertInEventLoop()
  137. if let pipeline = self._pipeline {
  138. pipeline.send(part, promise: promise)
  139. } else {
  140. promise?.fail(GRPCError.AlreadyComplete())
  141. }
  142. }
  143. /// Attempt to cancel the RPC notifying any interceptors.
  144. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  145. /// been handled.
  146. internal func cancel(promise: EventLoopPromise<Void>?) {
  147. self.callEventLoop.assertInEventLoop()
  148. if let pipeline = self._pipeline {
  149. pipeline.cancel(promise: promise)
  150. } else {
  151. promise?.fail(GRPCError.AlreadyComplete())
  152. }
  153. }
  154. /// A request for the underlying `Channel`.
  155. internal func getChannel() -> EventLoopFuture<Channel> {
  156. self.callEventLoop.assertInEventLoop()
  157. // Do we already have a promise?
  158. if let promise = self.channelPromise {
  159. return promise.futureResult
  160. } else {
  161. // Make and store the promise.
  162. let promise = self.callEventLoop.makePromise(of: Channel.self)
  163. self.channelPromise = promise
  164. // Ask the state machine if we can have it.
  165. switch self.state.getChannel() {
  166. case .succeed:
  167. if let channel = self.channel {
  168. promise.succeed(channel)
  169. }
  170. case .fail:
  171. promise.fail(GRPCError.AlreadyComplete())
  172. case .doNothing:
  173. ()
  174. }
  175. return promise.futureResult
  176. }
  177. }
  178. }
  179. // MARK: - Pipeline API
  180. extension ClientTransport {
  181. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  182. /// - Parameters:
  183. /// - part: The request part to send.
  184. /// - promise: A promise which will be completed when the part has been handled.
  185. /// - Important: This *must* to be called from the `callEventLoop`.
  186. private func sendFromPipeline(
  187. _ part: GRPCClientRequestPart<Request>,
  188. promise: EventLoopPromise<Void>?
  189. ) {
  190. self.callEventLoop.assertInEventLoop()
  191. switch self.state.send() {
  192. case .writeToBuffer:
  193. self.buffer(part, promise: promise)
  194. case .writeToChannel:
  195. // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the
  196. // correct state, the requirements of that state are having an active `Channel`.
  197. self.writeToChannel(
  198. self.channel!,
  199. part: part,
  200. promise: promise,
  201. flush: self.shouldFlush(after: part)
  202. )
  203. case .alreadyComplete:
  204. promise?.fail(GRPCError.AlreadyComplete())
  205. }
  206. }
  207. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  208. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  209. /// - Important: This *must* to be called from the `callEventLoop`.
  210. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  211. self.callEventLoop.assertInEventLoop()
  212. if self.state.cancel() {
  213. let error = GRPCError.RPCCancelledByClient()
  214. let status = error.makeGRPCStatus()
  215. self.forwardToInterceptors(.end(status, [:]))
  216. self.failBufferedWrites(with: error)
  217. self.channel?.close(mode: .all, promise: nil)
  218. self.channelPromise?.fail(error)
  219. promise?.succeed(())
  220. } else {
  221. promise?.fail(GRPCError.AlreadyComplete())
  222. }
  223. }
  224. }
  225. // MARK: - ChannelHandler API
  226. extension ClientTransport: ChannelInboundHandler {
  227. @usableFromInline
  228. typealias InboundIn = _RawGRPCClientResponsePart
  229. @usableFromInline
  230. typealias OutboundOut = _RawGRPCClientRequestPart
  231. @usableFromInline
  232. internal func handlerRemoved(context: ChannelHandlerContext) {
  233. self.dropReferences()
  234. }
  235. @usableFromInline
  236. internal func handlerAdded(context: ChannelHandlerContext) {
  237. if context.channel.isActive {
  238. self.transportActivated(channel: context.channel)
  239. }
  240. }
  241. @usableFromInline
  242. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  243. self.handleError(error)
  244. }
  245. @usableFromInline
  246. internal func channelActive(context: ChannelHandlerContext) {
  247. self.transportActivated(channel: context.channel)
  248. }
  249. @usableFromInline
  250. internal func channelInactive(context: ChannelHandlerContext) {
  251. self.transportDeactivated()
  252. }
  253. @usableFromInline
  254. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  255. switch self.unwrapInboundIn(data) {
  256. case let .initialMetadata(headers):
  257. self.receiveFromChannel(initialMetadata: headers)
  258. case let .message(box):
  259. self.receiveFromChannel(message: box.message)
  260. case let .trailingMetadata(trailers):
  261. self.receiveFromChannel(trailingMetadata: trailers)
  262. case let .status(status):
  263. self.receiveFromChannel(status: status)
  264. }
  265. // (We're the end of the channel. No need to forward anything.)
  266. }
  267. }
  268. extension ClientTransport {
  269. /// The `Channel` became active. Send out any buffered requests.
  270. private func transportActivated(channel: Channel) {
  271. if self.callEventLoop.inEventLoop {
  272. self._transportActivated(channel: channel)
  273. } else {
  274. self.callEventLoop.execute {
  275. self._transportActivated(channel: channel)
  276. }
  277. }
  278. }
  279. /// On-loop implementation of `transportActivated(channel:)`.
  280. private func _transportActivated(channel: Channel) {
  281. self.callEventLoop.assertInEventLoop()
  282. switch self.state.activate() {
  283. case .unbuffer:
  284. self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
  285. self._pipeline?.logger = self.logger
  286. self.logger.debug("activated stream channel")
  287. self.channel = channel
  288. self.unbuffer()
  289. case .close:
  290. channel.close(mode: .all, promise: nil)
  291. case .doNothing:
  292. ()
  293. }
  294. }
  295. /// The `Channel` became inactive. Fail any buffered writes and forward an error to the
  296. /// interceptor pipeline if necessary.
  297. private func transportDeactivated() {
  298. if self.callEventLoop.inEventLoop {
  299. self._transportDeactivated()
  300. } else {
  301. self.callEventLoop.execute {
  302. self._transportDeactivated()
  303. }
  304. }
  305. }
  306. /// On-loop implementation of `transportDeactivated()`.
  307. private func _transportDeactivated() {
  308. self.callEventLoop.assertInEventLoop()
  309. switch self.state.deactivate() {
  310. case .doNothing:
  311. ()
  312. case .tearDown:
  313. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  314. self.forwardErrorToInterceptors(status)
  315. self.failBufferedWrites(with: status)
  316. self.channelPromise?.fail(status)
  317. case .failChannelPromise:
  318. self.channelPromise?.fail(GRPCError.AlreadyComplete())
  319. }
  320. }
  321. /// Drops any references to the `Channel` and interceptor pipeline.
  322. private func dropReferences() {
  323. if self.callEventLoop.inEventLoop {
  324. self.channel = nil
  325. } else {
  326. self.callEventLoop.execute {
  327. self.channel = nil
  328. }
  329. }
  330. }
  331. /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the
  332. /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will
  333. /// also be closed.
  334. internal func handleError(_ error: Error) {
  335. if self.callEventLoop.inEventLoop {
  336. self._handleError(error)
  337. } else {
  338. self.callEventLoop.execute {
  339. self._handleError(error)
  340. }
  341. }
  342. }
  343. /// On-loop implementation of `handleError(_:)`.
  344. private func _handleError(_ error: Error) {
  345. self.callEventLoop.assertInEventLoop()
  346. switch self.state.handleError() {
  347. case .doNothing:
  348. ()
  349. case .propagateError:
  350. self.forwardErrorToInterceptors(error)
  351. self.failBufferedWrites(with: error)
  352. case .propagateErrorAndClose:
  353. self.forwardErrorToInterceptors(error)
  354. self.failBufferedWrites(with: error)
  355. self.channel?.close(mode: .all, promise: nil)
  356. }
  357. }
  358. /// Receive initial metadata from the `Channel`.
  359. private func receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  360. if self.callEventLoop.inEventLoop {
  361. self._receiveFromChannel(initialMetadata: headers)
  362. } else {
  363. self.callEventLoop.execute {
  364. self._receiveFromChannel(initialMetadata: headers)
  365. }
  366. }
  367. }
  368. /// On-loop implementation of `receiveFromChannel(initialMetadata:)`.
  369. private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  370. self.callEventLoop.assertInEventLoop()
  371. if self.state.channelRead(isEnd: false) {
  372. self.forwardToInterceptors(.metadata(headers))
  373. }
  374. }
  375. /// Receive response message bytes from the `Channel`.
  376. private func receiveFromChannel(message buffer: ByteBuffer) {
  377. if self.callEventLoop.inEventLoop {
  378. self._receiveFromChannel(message: buffer)
  379. } else {
  380. self.callEventLoop.execute {
  381. self._receiveFromChannel(message: buffer)
  382. }
  383. }
  384. }
  385. /// On-loop implementation of `receiveFromChannel(message:)`.
  386. private func _receiveFromChannel(message buffer: ByteBuffer) {
  387. self.callEventLoop.assertInEventLoop()
  388. do {
  389. let message = try self.deserializer.deserialize(byteBuffer: buffer)
  390. if self.state.channelRead(isEnd: false) {
  391. self.forwardToInterceptors(.message(message))
  392. }
  393. } catch {
  394. self.handleError(error)
  395. }
  396. }
  397. /// Receive trailing metadata from the `Channel`.
  398. private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) {
  399. // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
  400. // in the interceptor pipeline.
  401. if self.callEventLoop.inEventLoop {
  402. self.trailers = trailers
  403. } else {
  404. self.callEventLoop.execute {
  405. self.trailers = trailers
  406. }
  407. }
  408. }
  409. /// Receive the final status from the `Channel`.
  410. private func receiveFromChannel(status: GRPCStatus) {
  411. if self.callEventLoop.inEventLoop {
  412. self._receiveFromChannel(status: status)
  413. } else {
  414. self.callEventLoop.execute {
  415. self._receiveFromChannel(status: status)
  416. }
  417. }
  418. }
  419. /// On-loop implementation of `receiveFromChannel(status:)`.
  420. private func _receiveFromChannel(status: GRPCStatus) {
  421. self.callEventLoop.assertInEventLoop()
  422. if self.state.channelRead(isEnd: true) {
  423. self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
  424. self.trailers = nil
  425. }
  426. }
  427. }
  428. // MARK: - State Handling
  429. private enum ClientTransportState {
  430. /// Idle. We're waiting for the RPC to be configured.
  431. ///
  432. /// Valid transitions:
  433. /// - `awaitingTransport` (the transport is being configured)
  434. /// - `closed` (the RPC cancels)
  435. case idle
  436. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  437. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  438. /// transport in this state is an error.
  439. ///
  440. /// Valid transitions:
  441. /// - `activatingTransport` (the channel becomes active)
  442. /// - `closing` (the RPC cancels)
  443. /// - `closed` (the channel fails to become active)
  444. case awaitingTransport
  445. /// The transport is active but we're unbuffering any requests to write on that transport.
  446. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  447. /// is okay.
  448. ///
  449. /// Valid transitions:
  450. /// - `active` (we finish unbuffering)
  451. /// - `closing` (the RPC cancels, the channel encounters an error)
  452. /// - `closed` (the channel becomes inactive)
  453. case activatingTransport
  454. /// Fully active. An RPC is in progress and is communicating over an active transport.
  455. ///
  456. /// Valid transitions:
  457. /// - `closing` (the RPC cancels, the channel encounters an error)
  458. /// - `closed` (the channel becomes inactive)
  459. case active
  460. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  461. /// become inactive yet.
  462. ///
  463. /// Valid transitions:
  464. /// - `closed` (the channel becomes inactive)
  465. case closing
  466. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  467. /// be ignored.
  468. ///
  469. /// Valid transitions:
  470. /// - none: this state is terminal.
  471. case closed
  472. /// Whether writes may be unbuffered in this state.
  473. internal var isUnbuffering: Bool {
  474. switch self {
  475. case .activatingTransport:
  476. return true
  477. case .idle, .awaitingTransport, .active, .closing, .closed:
  478. return false
  479. }
  480. }
  481. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  482. internal var mayBuffer: Bool {
  483. switch self {
  484. case .idle, .activatingTransport, .awaitingTransport:
  485. return true
  486. case .active, .closing, .closed:
  487. return false
  488. }
  489. }
  490. }
  491. extension ClientTransportState {
  492. /// The caller would like to configure the transport. Returns a boolean indicating whether we
  493. /// should configure it or not.
  494. mutating func configureTransport() -> Bool {
  495. switch self {
  496. // We're idle until we configure. Anything else is just a repeat request to configure.
  497. case .idle:
  498. self = .awaitingTransport
  499. return true
  500. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  501. return false
  502. }
  503. }
  504. enum SendAction {
  505. /// Write the request into the buffer.
  506. case writeToBuffer
  507. /// Write the request into the channel.
  508. case writeToChannel
  509. /// The RPC has already completed, fail any promise associated with the write.
  510. case alreadyComplete
  511. }
  512. /// The pipeline would like to send a request part to the transport.
  513. mutating func send() -> SendAction {
  514. switch self {
  515. // We don't have any transport yet, just buffer the part.
  516. case .idle, .awaitingTransport, .activatingTransport:
  517. return .writeToBuffer
  518. // We have a `Channel`, we can pipe the write straight through.
  519. case .active:
  520. return .writeToChannel
  521. // The transport is going or has gone away. Fail the promise.
  522. case .closing, .closed:
  523. return .alreadyComplete
  524. }
  525. }
  526. enum UnbufferedAction {
  527. /// Nothing needs to be done.
  528. case doNothing
  529. /// Succeed the channel promise associated with the transport.
  530. case succeedChannelPromise
  531. }
  532. /// We finished dealing with the buffered writes.
  533. mutating func unbuffered() -> UnbufferedAction {
  534. switch self {
  535. // These can't happen since we only begin unbuffering when we transition to
  536. // '.activatingTransport', which must come after these two states..
  537. case .idle, .awaitingTransport:
  538. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  539. // We dealt with any buffered writes. We can become active now. This is the only way to become
  540. // active.
  541. case .activatingTransport:
  542. self = .active
  543. return .succeedChannelPromise
  544. case .active:
  545. preconditionFailure("Unbuffering completed but the transport is already active")
  546. // Something caused us to close while unbuffering, that's okay, we won't take any further
  547. // action.
  548. case .closing, .closed:
  549. return .doNothing
  550. }
  551. }
  552. /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
  553. /// cancellation can go ahead (and also whether the channel should be torn down).
  554. mutating func cancel() -> Bool {
  555. switch self {
  556. case .idle:
  557. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  558. // we're done, fail any writes, and then deal with the cancellation promise.
  559. self = .closed
  560. return true
  561. case .awaitingTransport:
  562. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  563. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  564. // the `Channel` becoming active (see `channelActive(context:)`).
  565. self = .closing
  566. return true
  567. case .activatingTransport:
  568. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  569. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  570. // any buffered writes and then complete the cancellation promise.
  571. self = .closing
  572. return true
  573. case .active:
  574. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  575. self = .closing
  576. return true
  577. case .closing, .closed:
  578. // We're already closing or closing. The cancellation is too late.
  579. return false
  580. }
  581. }
  582. enum ActivateAction {
  583. case unbuffer
  584. case close
  585. case doNothing
  586. }
  587. /// `channelActive` was invoked on the transport by the `Channel`.
  588. mutating func activate() -> ActivateAction {
  589. // The channel has become active: what now?
  590. switch self {
  591. case .idle:
  592. preconditionFailure("Can't activate an idle transport")
  593. case .awaitingTransport:
  594. self = .activatingTransport
  595. return .unbuffer
  596. case .activatingTransport, .active:
  597. // Already activated.
  598. return .doNothing
  599. case .closing:
  600. // We remain in closing: we only transition to closed on 'channelInactive'.
  601. return .close
  602. case .closed:
  603. preconditionFailure("Invalid state: stream is already inactive")
  604. }
  605. }
  606. enum ChannelInactiveAction {
  607. /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
  608. case tearDown
  609. /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
  610. case failChannelPromise
  611. /// Do nothing.
  612. case doNothing
  613. }
  614. /// `channelInactive` was invoked on the transport by the `Channel`.
  615. mutating func deactivate() -> ChannelInactiveAction {
  616. switch self {
  617. case .idle:
  618. // We can't become inactive before we've requested a `Channel`.
  619. preconditionFailure("Can't deactivate an idle transport")
  620. case .awaitingTransport, .activatingTransport, .active:
  621. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  622. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  623. // synthesize an error status to fail the RPC with.
  624. self = .closed
  625. return .tearDown
  626. case .closing:
  627. // We were already closing, now we're fully closed.
  628. self = .closed
  629. return .failChannelPromise
  630. case .closed:
  631. // We're already closed.
  632. return .doNothing
  633. }
  634. }
  635. /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
  636. /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
  637. mutating func channelRead(isEnd: Bool) -> Bool {
  638. switch self {
  639. case .idle, .awaitingTransport:
  640. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  641. preconditionFailure("Can't receive response part on idle transport")
  642. case .activatingTransport, .active:
  643. // We have an active `Channel`, we can forward the request part but we may need to start
  644. // closing if we see the status, since it indicates the call is terminating.
  645. if isEnd {
  646. self = .closing
  647. }
  648. return true
  649. case .closing, .closed:
  650. // We closed early, ignore any reads.
  651. return false
  652. }
  653. }
  654. enum HandleErrorAction {
  655. /// Propagate the error to the interceptor pipeline and fail any buffered writes.
  656. case propagateError
  657. /// As above, but close the 'Channel' as well.
  658. case propagateErrorAndClose
  659. /// No action is required.
  660. case doNothing
  661. }
  662. /// An error was caught.
  663. mutating func handleError() -> HandleErrorAction {
  664. switch self {
  665. case .idle:
  666. // The `Channel` can't error if it doesn't exist.
  667. preconditionFailure("Can't catch error on idle transport")
  668. case .awaitingTransport:
  669. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  670. // buffered writes along the way.
  671. self = .closing
  672. return .propagateError
  673. case .activatingTransport,
  674. .active:
  675. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  676. self = .closing
  677. return .propagateErrorAndClose
  678. case .closing, .closed:
  679. // We're already closing/closed, we can ignore this.
  680. return .doNothing
  681. }
  682. }
  683. enum GetChannelAction {
  684. /// No action is required.
  685. case doNothing
  686. /// Succeed the Channel promise.
  687. case succeed
  688. /// Fail the 'Channel' promise, the RPC is already complete.
  689. case fail
  690. }
  691. /// The caller has asked for the underlying `Channel`.
  692. mutating func getChannel() -> GetChannelAction {
  693. switch self {
  694. case .idle, .awaitingTransport, .activatingTransport:
  695. // Do nothing, we'll complete the promise when we become active or closed.
  696. return .doNothing
  697. case .active:
  698. // We're already active, so there was no promise to succeed when we made this transition. We
  699. // can complete it now.
  700. return .succeed
  701. case .closing:
  702. // We'll complete the promise when we transition to closed.
  703. return .doNothing
  704. case .closed:
  705. // We're already closed; there was no promise to fail when we made this transition. We can go
  706. // ahead and fail it now though.
  707. return .fail
  708. }
  709. }
  710. }
  711. // MARK: - State Actions
  712. extension ClientTransport {
  713. /// Configures this transport with the `configurator`.
  714. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  715. configurator(self).whenFailure { error in
  716. // We might be on a different EL, but `handleError` will sort that out for us, so no need to
  717. // hop.
  718. if error is GRPCStatus || error is GRPCStatusTransformable {
  719. self.handleError(error)
  720. } else {
  721. // Fallback to something which will mark the RPC as 'unavailable'.
  722. self.handleError(ConnectionFailure(reason: error))
  723. }
  724. }
  725. }
  726. /// Append a request part to the write buffer.
  727. /// - Parameters:
  728. /// - part: The request part to buffer.
  729. /// - promise: A promise to complete when the request part has been sent.
  730. private func buffer(
  731. _ part: GRPCClientRequestPart<Request>,
  732. promise: EventLoopPromise<Void>?
  733. ) {
  734. self.callEventLoop.assertInEventLoop()
  735. self.logger.trace("buffering request part", metadata: [
  736. "request_part": "\(part.name)",
  737. "call_state": self.stateForLogging,
  738. ])
  739. self.writeBuffer.append(.init(request: part, promise: promise))
  740. }
  741. /// Writes any buffered request parts to the `Channel`.
  742. private func unbuffer() {
  743. self.callEventLoop.assertInEventLoop()
  744. guard let channel = self.channel else {
  745. return
  746. }
  747. // Save any flushing until we're done writing.
  748. var shouldFlush = false
  749. self.logger.trace("unbuffering request parts", metadata: [
  750. "request_parts": "\(self.writeBuffer.count)",
  751. ])
  752. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  753. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  754. // may miss more buffered writes.
  755. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  756. // Pull out as many writes as possible.
  757. while let write = self.writeBuffer.popFirst() {
  758. self.logger.trace("unbuffering request part", metadata: [
  759. "request_part": "\(write.request.name)",
  760. ])
  761. if !shouldFlush {
  762. shouldFlush = self.shouldFlush(after: write.request)
  763. }
  764. self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false)
  765. }
  766. // Okay, flush now.
  767. if shouldFlush {
  768. shouldFlush = false
  769. channel.flush()
  770. }
  771. }
  772. if self.writeBuffer.isEmpty {
  773. self.logger.trace("request buffer drained")
  774. } else {
  775. self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging])
  776. }
  777. // We're unbuffered. What now?
  778. switch self.state.unbuffered() {
  779. case .doNothing:
  780. ()
  781. case .succeedChannelPromise:
  782. self.channelPromise?.succeed(channel)
  783. }
  784. }
  785. /// Fails any promises that come with buffered writes with `error`.
  786. /// - Parameter error: The `Error` to fail promises with.
  787. private func failBufferedWrites(with error: Error) {
  788. self.logger.trace("failing buffered writes", metadata: ["call_state": self.stateForLogging])
  789. while let write = self.writeBuffer.popFirst() {
  790. write.promise?.fail(error)
  791. }
  792. }
  793. /// Write a request part to the `Channel`.
  794. /// - Parameters:
  795. /// - channel: The `Channel` to write `part` to.
  796. /// - part: The request part to write.
  797. /// - promise: A promise to complete once the write has been completed.
  798. /// - flush: Whether to flush the `Channel` after writing.
  799. private func writeToChannel(
  800. _ channel: Channel,
  801. part: GRPCClientRequestPart<Request>,
  802. promise: EventLoopPromise<Void>?,
  803. flush: Bool
  804. ) {
  805. switch part {
  806. case let .metadata(headers):
  807. let head = self.makeRequestHead(with: headers)
  808. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  809. case let .message(request, metadata):
  810. do {
  811. let bytes = try self.serializer.serialize(request, allocator: channel.allocator)
  812. let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
  813. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  814. } catch {
  815. self.handleError(error)
  816. }
  817. case .end:
  818. channel.write(self.wrapOutboundOut(.end), promise: promise)
  819. }
  820. if flush {
  821. channel.flush()
  822. }
  823. }
  824. /// Forward the response part to the interceptor pipeline.
  825. /// - Parameter part: The response part to forward.
  826. private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
  827. self.callEventLoop.assertInEventLoop()
  828. self._pipeline?.receive(part)
  829. }
  830. /// Forward the error to the interceptor pipeline.
  831. /// - Parameter error: The error to forward.
  832. private func forwardErrorToInterceptors(_ error: Error) {
  833. self.callEventLoop.assertInEventLoop()
  834. self._pipeline?.errorCaught(error)
  835. }
  836. }
  837. // MARK: - Helpers
  838. extension ClientTransport {
  839. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  840. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  841. switch part {
  842. case .metadata:
  843. // If we're not streaming requests then we hold off on the flush until we see end.
  844. return self.isStreamingRequests
  845. case let .message(_, metadata):
  846. // Message flushing is determined by caller preference.
  847. return metadata.flush
  848. case .end:
  849. // Always flush at the end of the request stream.
  850. return true
  851. }
  852. }
  853. /// Make a `_GRPCRequestHead` with the provided metadata.
  854. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  855. return _GRPCRequestHead(
  856. method: self.callDetails.options.cacheable ? "GET" : "POST",
  857. scheme: self.callDetails.scheme,
  858. path: self.callDetails.path,
  859. host: self.callDetails.authority,
  860. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  861. customMetadata: metadata,
  862. encoding: self.callDetails.options.messageEncoding
  863. )
  864. }
  865. }
  866. extension GRPCClientRequestPart {
  867. /// The name of the request part, used for logging.
  868. fileprivate var name: String {
  869. switch self {
  870. case .metadata:
  871. return "metadata"
  872. case .message:
  873. return "message"
  874. case .end:
  875. return "end"
  876. }
  877. }
  878. }
  879. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  880. // well as extract a 'GRPCStatus' with code '.unavailable'.
  881. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  882. /// The reason the connection failed.
  883. var reason: Error
  884. init(reason: Error) {
  885. self.reason = reason
  886. }
  887. var description: String {
  888. return String(describing: self.reason)
  889. }
  890. func makeGRPCStatus() -> GRPCStatus {
  891. return GRPCStatus(
  892. code: .unavailable,
  893. message: String(describing: self.reason),
  894. cause: self.reason
  895. )
  896. }
  897. }