ClientTransport.swift 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
  21. /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
  22. /// little API to use on this class: they may configure the transport by adding it to a
  23. /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
  24. /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
  25. /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
  26. ///
  27. /// In most instances the glue code is simple: transformations are applied to the request and
  28. /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
  29. /// transport keeps track of the state of the call and the `Channel`, taking appropriate action
  30. /// when these change. This includes buffering request parts from the interceptor pipeline until
  31. /// the `NIO.Channel` becomes active.
  32. ///
  33. /// ### Thread Safety
  34. ///
  35. /// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`.
  36. @usableFromInline
  37. internal final class ClientTransport<Request, Response> {
  38. /// The `EventLoop` the call is running on. State must be accessed from this event loop.
  39. @usableFromInline
  40. internal let callEventLoop: EventLoop
  41. /// The current state of the transport.
  42. private var state: ClientTransportState = .idle
  43. /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
  44. /// and fail it when we transition to `closed`.
  45. private var channelPromise: EventLoopPromise<Channel>?
  46. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  47. // have 3 parts.
  48. /// A buffer to store request parts and promises in before the channel has become active.
  49. private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
  50. /// The request serializer.
  51. private let serializer: AnySerializer<Request>
  52. /// The response deserializer.
  53. private let deserializer: AnyDeserializer<Response>
  54. /// A request part and a promise.
  55. private struct RequestAndPromise {
  56. var request: GRPCClientRequestPart<Request>
  57. var promise: EventLoopPromise<Void>?
  58. }
  59. /// Details about the call.
  60. internal let callDetails: CallDetails
  61. /// A logger.
  62. internal var logger: Logger
  63. /// Is the call streaming requests?
  64. private var isStreamingRequests: Bool {
  65. switch self.callDetails.type {
  66. case .unary, .serverStreaming:
  67. return false
  68. case .clientStreaming, .bidirectionalStreaming:
  69. return true
  70. }
  71. }
  72. // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
  73. // convenient to have both at the same time when intercepting response parts. We'll hold on to the
  74. // trailers here and only forward them when we receive the status.
  75. private var trailers: HPACKHeaders?
  76. /// The interceptor pipeline connected to this transport. The pipeline also holds references
  77. /// to `self` which are dropped when the interceptor pipeline is closed.
  78. @usableFromInline
  79. internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
  80. /// The `NIO.Channel` used by the transport, if it is available.
  81. private var channel: Channel?
  82. /// A callback which is invoked once when the stream channel becomes active.
  83. private var onStart: (() -> Void)?
  84. /// Our current state as logging metadata.
  85. private var stateForLogging: Logger.MetadataValue {
  86. if self.state.mayBuffer {
  87. return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
  88. } else {
  89. return "\(self.state)"
  90. }
  91. }
  92. internal init(
  93. details: CallDetails,
  94. eventLoop: EventLoop,
  95. interceptors: [ClientInterceptor<Request, Response>],
  96. serializer: AnySerializer<Request>,
  97. deserializer: AnyDeserializer<Response>,
  98. errorDelegate: ClientErrorDelegate?,
  99. onStart: @escaping () -> Void,
  100. onError: @escaping (Error) -> Void,
  101. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  102. ) {
  103. self.callEventLoop = eventLoop
  104. self.callDetails = details
  105. self.onStart = onStart
  106. self.logger = details.options.logger
  107. self.serializer = serializer
  108. self.deserializer = deserializer
  109. // The references to self held by the pipeline are dropped when it is closed.
  110. self._pipeline = ClientInterceptorPipeline(
  111. eventLoop: eventLoop,
  112. details: details,
  113. logger: details.options.logger,
  114. interceptors: interceptors,
  115. errorDelegate: errorDelegate,
  116. onError: onError,
  117. onCancel: self.cancelFromPipeline(promise:),
  118. onRequestPart: self.sendFromPipeline(_:promise:),
  119. onResponsePart: onResponsePart
  120. )
  121. }
  122. // MARK: - Call Object API
  123. /// Configure the transport to communicate with the server.
  124. /// - Parameter configurator: A callback to invoke in order to configure this transport.
  125. /// - Important: This *must* to be called from the `callEventLoop`.
  126. internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  127. self.callEventLoop.assertInEventLoop()
  128. if self.state.configureTransport() {
  129. self.configure(using: configurator)
  130. }
  131. }
  132. /// Send a request part – via the interceptor pipeline – to the server.
  133. /// - Parameters:
  134. /// - part: The part to send.
  135. /// - promise: A promise which will be completed when the request part has been handled.
  136. /// - Important: This *must* to be called from the `callEventLoop`.
  137. @inlinable
  138. internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
  139. self.callEventLoop.assertInEventLoop()
  140. if let pipeline = self._pipeline {
  141. pipeline.send(part, promise: promise)
  142. } else {
  143. promise?.fail(GRPCError.AlreadyComplete())
  144. }
  145. }
  146. /// Attempt to cancel the RPC notifying any interceptors.
  147. /// - Parameter promise: A promise which will be completed when the cancellation attempt has
  148. /// been handled.
  149. internal func cancel(promise: EventLoopPromise<Void>?) {
  150. self.callEventLoop.assertInEventLoop()
  151. if let pipeline = self._pipeline {
  152. pipeline.cancel(promise: promise)
  153. } else {
  154. promise?.fail(GRPCError.AlreadyComplete())
  155. }
  156. }
  157. /// A request for the underlying `Channel`.
  158. internal func getChannel() -> EventLoopFuture<Channel> {
  159. self.callEventLoop.assertInEventLoop()
  160. // Do we already have a promise?
  161. if let promise = self.channelPromise {
  162. return promise.futureResult
  163. } else {
  164. // Make and store the promise.
  165. let promise = self.callEventLoop.makePromise(of: Channel.self)
  166. self.channelPromise = promise
  167. // Ask the state machine if we can have it.
  168. switch self.state.getChannel() {
  169. case .succeed:
  170. if let channel = self.channel {
  171. promise.succeed(channel)
  172. }
  173. case .fail:
  174. promise.fail(GRPCError.AlreadyComplete())
  175. case .doNothing:
  176. ()
  177. }
  178. return promise.futureResult
  179. }
  180. }
  181. }
  182. // MARK: - Pipeline API
  183. extension ClientTransport {
  184. /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
  185. /// - Parameters:
  186. /// - part: The request part to send.
  187. /// - promise: A promise which will be completed when the part has been handled.
  188. /// - Important: This *must* to be called from the `callEventLoop`.
  189. private func sendFromPipeline(
  190. _ part: GRPCClientRequestPart<Request>,
  191. promise: EventLoopPromise<Void>?
  192. ) {
  193. self.callEventLoop.assertInEventLoop()
  194. switch self.state.send() {
  195. case .writeToBuffer:
  196. self.buffer(part, promise: promise)
  197. case .writeToChannel:
  198. // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the
  199. // correct state, the requirements of that state are having an active `Channel`.
  200. self.writeToChannel(
  201. self.channel!,
  202. part: part,
  203. promise: promise,
  204. flush: self.shouldFlush(after: part)
  205. )
  206. case .alreadyComplete:
  207. promise?.fail(GRPCError.AlreadyComplete())
  208. }
  209. }
  210. /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
  211. /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
  212. /// - Important: This *must* to be called from the `callEventLoop`.
  213. private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
  214. self.callEventLoop.assertInEventLoop()
  215. if self.state.cancel() {
  216. let error = GRPCError.RPCCancelledByClient()
  217. let status = error.makeGRPCStatus()
  218. self.forwardToInterceptors(.end(status, [:]))
  219. self.failBufferedWrites(with: error)
  220. self.channel?.close(mode: .all, promise: nil)
  221. self.channelPromise?.fail(error)
  222. promise?.succeed(())
  223. } else {
  224. promise?.succeed(())
  225. }
  226. }
  227. }
  228. // MARK: - ChannelHandler API
  229. extension ClientTransport: ChannelInboundHandler {
  230. @usableFromInline
  231. typealias InboundIn = _RawGRPCClientResponsePart
  232. @usableFromInline
  233. typealias OutboundOut = _RawGRPCClientRequestPart
  234. @usableFromInline
  235. internal func handlerRemoved(context: ChannelHandlerContext) {
  236. self.dropReferences()
  237. }
  238. @usableFromInline
  239. internal func handlerAdded(context: ChannelHandlerContext) {
  240. if context.channel.isActive {
  241. self.transportActivated(channel: context.channel)
  242. }
  243. }
  244. @usableFromInline
  245. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  246. self.handleError(error)
  247. }
  248. @usableFromInline
  249. internal func channelActive(context: ChannelHandlerContext) {
  250. self.transportActivated(channel: context.channel)
  251. }
  252. @usableFromInline
  253. internal func channelInactive(context: ChannelHandlerContext) {
  254. self.transportDeactivated()
  255. }
  256. @usableFromInline
  257. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  258. switch self.unwrapInboundIn(data) {
  259. case let .initialMetadata(headers):
  260. self.receiveFromChannel(initialMetadata: headers)
  261. case let .message(box):
  262. self.receiveFromChannel(message: box.message)
  263. case let .trailingMetadata(trailers):
  264. self.receiveFromChannel(trailingMetadata: trailers)
  265. case let .status(status):
  266. self.receiveFromChannel(status: status)
  267. }
  268. // (We're the end of the channel. No need to forward anything.)
  269. }
  270. }
  271. extension ClientTransport {
  272. /// The `Channel` became active. Send out any buffered requests.
  273. private func transportActivated(channel: Channel) {
  274. if self.callEventLoop.inEventLoop {
  275. self._transportActivated(channel: channel)
  276. } else {
  277. self.callEventLoop.execute {
  278. self._transportActivated(channel: channel)
  279. }
  280. }
  281. }
  282. /// On-loop implementation of `transportActivated(channel:)`.
  283. private func _transportActivated(channel: Channel) {
  284. self.callEventLoop.assertInEventLoop()
  285. switch self.state.activate() {
  286. case .unbuffer:
  287. self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
  288. self._pipeline?.logger = self.logger
  289. self.logger.debug("activated stream channel")
  290. self.channel = channel
  291. self.unbuffer()
  292. case .close:
  293. channel.close(mode: .all, promise: nil)
  294. case .doNothing:
  295. ()
  296. }
  297. }
  298. /// The `Channel` became inactive. Fail any buffered writes and forward an error to the
  299. /// interceptor pipeline if necessary.
  300. private func transportDeactivated() {
  301. if self.callEventLoop.inEventLoop {
  302. self._transportDeactivated()
  303. } else {
  304. self.callEventLoop.execute {
  305. self._transportDeactivated()
  306. }
  307. }
  308. }
  309. /// On-loop implementation of `transportDeactivated()`.
  310. private func _transportDeactivated() {
  311. self.callEventLoop.assertInEventLoop()
  312. switch self.state.deactivate() {
  313. case .doNothing:
  314. ()
  315. case .tearDown:
  316. let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
  317. self.forwardErrorToInterceptors(status)
  318. self.failBufferedWrites(with: status)
  319. self.channelPromise?.fail(status)
  320. case .failChannelPromise:
  321. self.channelPromise?.fail(GRPCError.AlreadyComplete())
  322. }
  323. }
  324. /// Drops any references to the `Channel` and interceptor pipeline.
  325. private func dropReferences() {
  326. if self.callEventLoop.inEventLoop {
  327. self.channel = nil
  328. } else {
  329. self.callEventLoop.execute {
  330. self.channel = nil
  331. }
  332. }
  333. }
  334. /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the
  335. /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will
  336. /// also be closed.
  337. internal func handleError(_ error: Error) {
  338. if self.callEventLoop.inEventLoop {
  339. self._handleError(error)
  340. } else {
  341. self.callEventLoop.execute {
  342. self._handleError(error)
  343. }
  344. }
  345. }
  346. /// On-loop implementation of `handleError(_:)`.
  347. private func _handleError(_ error: Error) {
  348. self.callEventLoop.assertInEventLoop()
  349. switch self.state.handleError() {
  350. case .doNothing:
  351. ()
  352. case .propagateError:
  353. self.forwardErrorToInterceptors(error)
  354. self.failBufferedWrites(with: error)
  355. case .propagateErrorAndClose:
  356. self.forwardErrorToInterceptors(error)
  357. self.failBufferedWrites(with: error)
  358. self.channel?.close(mode: .all, promise: nil)
  359. }
  360. }
  361. /// Receive initial metadata from the `Channel`.
  362. private func receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  363. if self.callEventLoop.inEventLoop {
  364. self._receiveFromChannel(initialMetadata: headers)
  365. } else {
  366. self.callEventLoop.execute {
  367. self._receiveFromChannel(initialMetadata: headers)
  368. }
  369. }
  370. }
  371. /// On-loop implementation of `receiveFromChannel(initialMetadata:)`.
  372. private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) {
  373. self.callEventLoop.assertInEventLoop()
  374. if self.state.channelRead(isEnd: false) {
  375. self.forwardToInterceptors(.metadata(headers))
  376. }
  377. }
  378. /// Receive response message bytes from the `Channel`.
  379. private func receiveFromChannel(message buffer: ByteBuffer) {
  380. if self.callEventLoop.inEventLoop {
  381. self._receiveFromChannel(message: buffer)
  382. } else {
  383. self.callEventLoop.execute {
  384. self._receiveFromChannel(message: buffer)
  385. }
  386. }
  387. }
  388. /// On-loop implementation of `receiveFromChannel(message:)`.
  389. private func _receiveFromChannel(message buffer: ByteBuffer) {
  390. self.callEventLoop.assertInEventLoop()
  391. do {
  392. let message = try self.deserializer.deserialize(byteBuffer: buffer)
  393. if self.state.channelRead(isEnd: false) {
  394. self.forwardToInterceptors(.message(message))
  395. }
  396. } catch {
  397. self.handleError(error)
  398. }
  399. }
  400. /// Receive trailing metadata from the `Channel`.
  401. private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) {
  402. // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
  403. // in the interceptor pipeline.
  404. if self.callEventLoop.inEventLoop {
  405. self.trailers = trailers
  406. } else {
  407. self.callEventLoop.execute {
  408. self.trailers = trailers
  409. }
  410. }
  411. }
  412. /// Receive the final status from the `Channel`.
  413. private func receiveFromChannel(status: GRPCStatus) {
  414. if self.callEventLoop.inEventLoop {
  415. self._receiveFromChannel(status: status)
  416. } else {
  417. self.callEventLoop.execute {
  418. self._receiveFromChannel(status: status)
  419. }
  420. }
  421. }
  422. /// On-loop implementation of `receiveFromChannel(status:)`.
  423. private func _receiveFromChannel(status: GRPCStatus) {
  424. self.callEventLoop.assertInEventLoop()
  425. if self.state.channelRead(isEnd: true) {
  426. self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
  427. self.trailers = nil
  428. }
  429. }
  430. }
  431. // MARK: - State Handling
  432. private enum ClientTransportState {
  433. /// Idle. We're waiting for the RPC to be configured.
  434. ///
  435. /// Valid transitions:
  436. /// - `awaitingTransport` (the transport is being configured)
  437. /// - `closed` (the RPC cancels)
  438. case idle
  439. /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
  440. /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
  441. /// transport in this state is an error.
  442. ///
  443. /// Valid transitions:
  444. /// - `activatingTransport` (the channel becomes active)
  445. /// - `closing` (the RPC cancels)
  446. /// - `closed` (the channel fails to become active)
  447. case awaitingTransport
  448. /// The transport is active but we're unbuffering any requests to write on that transport.
  449. /// We'll continue buffering in this state. Receiving messages from the transport in this state
  450. /// is okay.
  451. ///
  452. /// Valid transitions:
  453. /// - `active` (we finish unbuffering)
  454. /// - `closing` (the RPC cancels, the channel encounters an error)
  455. /// - `closed` (the channel becomes inactive)
  456. case activatingTransport
  457. /// Fully active. An RPC is in progress and is communicating over an active transport.
  458. ///
  459. /// Valid transitions:
  460. /// - `closing` (the RPC cancels, the channel encounters an error)
  461. /// - `closed` (the channel becomes inactive)
  462. case active
  463. /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
  464. /// become inactive yet.
  465. ///
  466. /// Valid transitions:
  467. /// - `closed` (the channel becomes inactive)
  468. case closing
  469. /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
  470. /// be ignored.
  471. ///
  472. /// Valid transitions:
  473. /// - none: this state is terminal.
  474. case closed
  475. /// Whether writes may be unbuffered in this state.
  476. internal var isUnbuffering: Bool {
  477. switch self {
  478. case .activatingTransport:
  479. return true
  480. case .idle, .awaitingTransport, .active, .closing, .closed:
  481. return false
  482. }
  483. }
  484. /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
  485. internal var mayBuffer: Bool {
  486. switch self {
  487. case .idle, .activatingTransport, .awaitingTransport:
  488. return true
  489. case .active, .closing, .closed:
  490. return false
  491. }
  492. }
  493. }
  494. extension ClientTransportState {
  495. /// The caller would like to configure the transport. Returns a boolean indicating whether we
  496. /// should configure it or not.
  497. mutating func configureTransport() -> Bool {
  498. switch self {
  499. // We're idle until we configure. Anything else is just a repeat request to configure.
  500. case .idle:
  501. self = .awaitingTransport
  502. return true
  503. case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
  504. return false
  505. }
  506. }
  507. enum SendAction {
  508. /// Write the request into the buffer.
  509. case writeToBuffer
  510. /// Write the request into the channel.
  511. case writeToChannel
  512. /// The RPC has already completed, fail any promise associated with the write.
  513. case alreadyComplete
  514. }
  515. /// The pipeline would like to send a request part to the transport.
  516. mutating func send() -> SendAction {
  517. switch self {
  518. // We don't have any transport yet, just buffer the part.
  519. case .idle, .awaitingTransport, .activatingTransport:
  520. return .writeToBuffer
  521. // We have a `Channel`, we can pipe the write straight through.
  522. case .active:
  523. return .writeToChannel
  524. // The transport is going or has gone away. Fail the promise.
  525. case .closing, .closed:
  526. return .alreadyComplete
  527. }
  528. }
  529. enum UnbufferedAction {
  530. /// Nothing needs to be done.
  531. case doNothing
  532. /// Succeed the channel promise associated with the transport.
  533. case succeedChannelPromise
  534. }
  535. /// We finished dealing with the buffered writes.
  536. mutating func unbuffered() -> UnbufferedAction {
  537. switch self {
  538. // These can't happen since we only begin unbuffering when we transition to
  539. // '.activatingTransport', which must come after these two states..
  540. case .idle, .awaitingTransport:
  541. preconditionFailure("Requests can't be unbuffered before the transport is activated")
  542. // We dealt with any buffered writes. We can become active now. This is the only way to become
  543. // active.
  544. case .activatingTransport:
  545. self = .active
  546. return .succeedChannelPromise
  547. case .active:
  548. preconditionFailure("Unbuffering completed but the transport is already active")
  549. // Something caused us to close while unbuffering, that's okay, we won't take any further
  550. // action.
  551. case .closing, .closed:
  552. return .doNothing
  553. }
  554. }
  555. /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
  556. /// cancellation can go ahead (and also whether the channel should be torn down).
  557. mutating func cancel() -> Bool {
  558. switch self {
  559. case .idle:
  560. // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
  561. // we're done, fail any writes, and then deal with the cancellation promise.
  562. self = .closed
  563. return true
  564. case .awaitingTransport:
  565. // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
  566. // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
  567. // the `Channel` becoming active (see `channelActive(context:)`).
  568. self = .closing
  569. return true
  570. case .activatingTransport:
  571. // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
  572. // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
  573. // any buffered writes and then complete the cancellation promise.
  574. self = .closing
  575. return true
  576. case .active:
  577. // The RPC and channel are up and running. We'll fail the RPC and close the channel.
  578. self = .closing
  579. return true
  580. case .closing, .closed:
  581. // We're already closing or closing. The cancellation is too late.
  582. return false
  583. }
  584. }
  585. enum ActivateAction {
  586. case unbuffer
  587. case close
  588. case doNothing
  589. }
  590. /// `channelActive` was invoked on the transport by the `Channel`.
  591. mutating func activate() -> ActivateAction {
  592. // The channel has become active: what now?
  593. switch self {
  594. case .idle:
  595. preconditionFailure("Can't activate an idle transport")
  596. case .awaitingTransport:
  597. self = .activatingTransport
  598. return .unbuffer
  599. case .activatingTransport, .active:
  600. // Already activated.
  601. return .doNothing
  602. case .closing:
  603. // We remain in closing: we only transition to closed on 'channelInactive'.
  604. return .close
  605. case .closed:
  606. preconditionFailure("Invalid state: stream is already inactive")
  607. }
  608. }
  609. enum ChannelInactiveAction {
  610. /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
  611. case tearDown
  612. /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
  613. case failChannelPromise
  614. /// Do nothing.
  615. case doNothing
  616. }
  617. /// `channelInactive` was invoked on the transport by the `Channel`.
  618. mutating func deactivate() -> ChannelInactiveAction {
  619. switch self {
  620. case .idle:
  621. // We can't become inactive before we've requested a `Channel`.
  622. preconditionFailure("Can't deactivate an idle transport")
  623. case .awaitingTransport, .activatingTransport, .active:
  624. // We're activating the transport - i.e. offloading any buffered requests - and the channel
  625. // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
  626. // synthesize an error status to fail the RPC with.
  627. self = .closed
  628. return .tearDown
  629. case .closing:
  630. // We were already closing, now we're fully closed.
  631. self = .closed
  632. return .failChannelPromise
  633. case .closed:
  634. // We're already closed.
  635. return .doNothing
  636. }
  637. }
  638. /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
  639. /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
  640. mutating func channelRead(isEnd: Bool) -> Bool {
  641. switch self {
  642. case .idle, .awaitingTransport:
  643. // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
  644. preconditionFailure("Can't receive response part on idle transport")
  645. case .activatingTransport, .active:
  646. // We have an active `Channel`, we can forward the request part but we may need to start
  647. // closing if we see the status, since it indicates the call is terminating.
  648. if isEnd {
  649. self = .closing
  650. }
  651. return true
  652. case .closing, .closed:
  653. // We closed early, ignore any reads.
  654. return false
  655. }
  656. }
  657. enum HandleErrorAction {
  658. /// Propagate the error to the interceptor pipeline and fail any buffered writes.
  659. case propagateError
  660. /// As above, but close the 'Channel' as well.
  661. case propagateErrorAndClose
  662. /// No action is required.
  663. case doNothing
  664. }
  665. /// An error was caught.
  666. mutating func handleError() -> HandleErrorAction {
  667. switch self {
  668. case .idle:
  669. // The `Channel` can't error if it doesn't exist.
  670. preconditionFailure("Can't catch error on idle transport")
  671. case .awaitingTransport:
  672. // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
  673. // buffered writes along the way.
  674. self = .closing
  675. return .propagateError
  676. case .activatingTransport,
  677. .active:
  678. // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
  679. self = .closing
  680. return .propagateErrorAndClose
  681. case .closing, .closed:
  682. // We're already closing/closed, we can ignore this.
  683. return .doNothing
  684. }
  685. }
  686. enum GetChannelAction {
  687. /// No action is required.
  688. case doNothing
  689. /// Succeed the Channel promise.
  690. case succeed
  691. /// Fail the 'Channel' promise, the RPC is already complete.
  692. case fail
  693. }
  694. /// The caller has asked for the underlying `Channel`.
  695. mutating func getChannel() -> GetChannelAction {
  696. switch self {
  697. case .idle, .awaitingTransport, .activatingTransport:
  698. // Do nothing, we'll complete the promise when we become active or closed.
  699. return .doNothing
  700. case .active:
  701. // We're already active, so there was no promise to succeed when we made this transition. We
  702. // can complete it now.
  703. return .succeed
  704. case .closing:
  705. // We'll complete the promise when we transition to closed.
  706. return .doNothing
  707. case .closed:
  708. // We're already closed; there was no promise to fail when we made this transition. We can go
  709. // ahead and fail it now though.
  710. return .fail
  711. }
  712. }
  713. }
  714. // MARK: - State Actions
  715. extension ClientTransport {
  716. /// Configures this transport with the `configurator`.
  717. private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
  718. configurator(self).whenFailure { error in
  719. // We might be on a different EL, but `handleError` will sort that out for us, so no need to
  720. // hop.
  721. if error is GRPCStatus || error is GRPCStatusTransformable {
  722. self.handleError(error)
  723. } else {
  724. // Fallback to something which will mark the RPC as 'unavailable'.
  725. self.handleError(ConnectionFailure(reason: error))
  726. }
  727. }
  728. }
  729. /// Append a request part to the write buffer.
  730. /// - Parameters:
  731. /// - part: The request part to buffer.
  732. /// - promise: A promise to complete when the request part has been sent.
  733. private func buffer(
  734. _ part: GRPCClientRequestPart<Request>,
  735. promise: EventLoopPromise<Void>?
  736. ) {
  737. self.callEventLoop.assertInEventLoop()
  738. self.logger.trace(
  739. "buffering request part",
  740. metadata: [
  741. "request_part": "\(part.name)",
  742. "call_state": self.stateForLogging,
  743. ]
  744. )
  745. self.writeBuffer.append(.init(request: part, promise: promise))
  746. }
  747. /// Writes any buffered request parts to the `Channel`.
  748. private func unbuffer() {
  749. self.callEventLoop.assertInEventLoop()
  750. guard let channel = self.channel else {
  751. return
  752. }
  753. // Save any flushing until we're done writing.
  754. var shouldFlush = false
  755. self.logger.trace(
  756. "unbuffering request parts",
  757. metadata: [
  758. "request_parts": "\(self.writeBuffer.count)"
  759. ]
  760. )
  761. // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
  762. // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
  763. // may miss more buffered writes.
  764. while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
  765. // Pull out as many writes as possible.
  766. while let write = self.writeBuffer.popFirst() {
  767. self.logger.trace(
  768. "unbuffering request part",
  769. metadata: [
  770. "request_part": "\(write.request.name)"
  771. ]
  772. )
  773. if !shouldFlush {
  774. shouldFlush = self.shouldFlush(after: write.request)
  775. }
  776. self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false)
  777. }
  778. // Okay, flush now.
  779. if shouldFlush {
  780. shouldFlush = false
  781. channel.flush()
  782. }
  783. }
  784. if self.writeBuffer.isEmpty {
  785. self.logger.trace("request buffer drained")
  786. } else {
  787. self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging])
  788. }
  789. // We're unbuffered. What now?
  790. switch self.state.unbuffered() {
  791. case .doNothing:
  792. ()
  793. case .succeedChannelPromise:
  794. self.channelPromise?.succeed(channel)
  795. }
  796. }
  797. /// Fails any promises that come with buffered writes with `error`.
  798. /// - Parameter error: The `Error` to fail promises with.
  799. private func failBufferedWrites(with error: Error) {
  800. self.logger.trace("failing buffered writes", metadata: ["call_state": self.stateForLogging])
  801. while let write = self.writeBuffer.popFirst() {
  802. write.promise?.fail(error)
  803. }
  804. }
  805. /// Write a request part to the `Channel`.
  806. /// - Parameters:
  807. /// - channel: The `Channel` to write `part` to.
  808. /// - part: The request part to write.
  809. /// - promise: A promise to complete once the write has been completed.
  810. /// - flush: Whether to flush the `Channel` after writing.
  811. private func writeToChannel(
  812. _ channel: Channel,
  813. part: GRPCClientRequestPart<Request>,
  814. promise: EventLoopPromise<Void>?,
  815. flush: Bool
  816. ) {
  817. switch part {
  818. case let .metadata(headers):
  819. let head = self.makeRequestHead(with: headers)
  820. channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
  821. // Messages are buffered by this class and in the async writer for async calls. Initially the
  822. // async writer is not allowed to emit messages; the call to 'onStart()' signals that messages
  823. // may be emitted. We call it here to avoid races between writing headers and messages.
  824. self.onStart?()
  825. self.onStart = nil
  826. case let .message(request, metadata):
  827. do {
  828. let bytes = try self.serializer.serialize(request, allocator: channel.allocator)
  829. let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
  830. channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
  831. } catch {
  832. self.handleError(error)
  833. }
  834. case .end:
  835. channel.write(self.wrapOutboundOut(.end), promise: promise)
  836. }
  837. if flush {
  838. channel.flush()
  839. }
  840. }
  841. /// Forward the response part to the interceptor pipeline.
  842. /// - Parameter part: The response part to forward.
  843. private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
  844. self.callEventLoop.assertInEventLoop()
  845. self._pipeline?.receive(part)
  846. }
  847. /// Forward the error to the interceptor pipeline.
  848. /// - Parameter error: The error to forward.
  849. private func forwardErrorToInterceptors(_ error: Error) {
  850. self.callEventLoop.assertInEventLoop()
  851. self._pipeline?.errorCaught(error)
  852. }
  853. }
  854. // MARK: - Helpers
  855. extension ClientTransport {
  856. /// Returns whether the `Channel` should be flushed after writing the given part to it.
  857. private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
  858. switch part {
  859. case .metadata:
  860. // If we're not streaming requests then we hold off on the flush until we see end.
  861. return self.isStreamingRequests
  862. case let .message(_, metadata):
  863. // Message flushing is determined by caller preference.
  864. return metadata.flush
  865. case .end:
  866. // Always flush at the end of the request stream.
  867. return true
  868. }
  869. }
  870. /// Make a `_GRPCRequestHead` with the provided metadata.
  871. private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
  872. return _GRPCRequestHead(
  873. method: self.callDetails.options.cacheable ? "GET" : "POST",
  874. scheme: self.callDetails.scheme,
  875. path: self.callDetails.path,
  876. host: self.callDetails.authority,
  877. deadline: self.callDetails.options.timeLimit.makeDeadline(),
  878. customMetadata: metadata,
  879. encoding: self.callDetails.options.messageEncoding
  880. )
  881. }
  882. }
  883. extension GRPCClientRequestPart {
  884. /// The name of the request part, used for logging.
  885. fileprivate var name: String {
  886. switch self {
  887. case .metadata:
  888. return "metadata"
  889. case .message:
  890. return "message"
  891. case .end:
  892. return "end"
  893. }
  894. }
  895. }
  896. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  897. // well as extract a 'GRPCStatus' with code '.unavailable'.
  898. internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  899. /// The reason the connection failed.
  900. var reason: Error
  901. init(reason: Error) {
  902. self.reason = reason
  903. }
  904. var description: String {
  905. return String(describing: self.reason)
  906. }
  907. func makeGRPCStatus() -> GRPCStatus {
  908. return GRPCStatus(
  909. code: .unavailable,
  910. message: String(describing: self.reason),
  911. cause: self.reason
  912. )
  913. }
  914. }