ClientCallTransport.swift 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP2
  18. import NIOHPACK
  19. import Logging
  20. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to
  21. /// framework users. It is the glue between a call object and the underlying transport (typically a
  22. /// NIO Channel).
  23. ///
  24. /// Typically, each call will be configured on an HTTP/2 stream channel. The stream channel will
  25. /// will be configured as such:
  26. ///
  27. /// ```
  28. /// ┌────────────────────────────────────┐
  29. /// │ ChannelTransport<Request,Response> │
  30. /// └─────▲───────────────────────┬──────┘
  31. /// │ │
  32. /// --------------------------------│-----------------------│------------------------------
  33. /// HTTP2StreamChannel │ │
  34. /// ┌────────────┴──────────┐ │
  35. /// │ GRPCClientCallHandler │ │
  36. /// └────────────▲──────────┘ │
  37. /// GRPCClientResponsePart<Response>│ │GRPCClientRequestPart<Request>
  38. /// ┌─┴───────────────────────▼─┐
  39. /// │ GRPCClientChannelHandler │
  40. /// └─▲───────────────────────┬─┘
  41. /// HTTP2Frame│ │HTTP2Frame
  42. /// | |
  43. ///```
  44. ///
  45. /// Note: the "main" pipeline provided by the channel in `ClientConnection`.
  46. internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
  47. internal typealias RequestPart = _GRPCClientRequestPart<Request>
  48. internal typealias ResponsePart = _GRPCClientResponsePart<Response>
  49. /// The `EventLoop` this call is running on.
  50. internal let eventLoop: EventLoop
  51. /// A logger.
  52. private let logger: Logger
  53. /// The current state of the call.
  54. private var state: State
  55. /// A scheduled timeout for the call.
  56. private var scheduledTimeout: Scheduled<Void>?
  57. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  58. // have 3 parts.
  59. /// A buffer to store requests in before the channel has become active.
  60. private var requestBuffer = MarkedCircularBuffer<BufferedRequest>(initialCapacity: 4)
  61. /// A request that we'll deal with at a later point in time.
  62. private struct BufferedRequest {
  63. /// The request to write.
  64. var message: _GRPCClientRequestPart<Request>
  65. /// Any promise associated with the request.
  66. var promise: EventLoopPromise<Void>?
  67. }
  68. /// An error delegate provided by the user.
  69. private var errorDelegate: ClientErrorDelegate?
  70. /// A container for response part promises for the call.
  71. internal var responseContainer: ResponsePartContainer<Response>
  72. /// A stopwatch for timing the RPC.
  73. private var stopwatch: Stopwatch?
  74. enum State {
  75. // Waiting for a stream to become active.
  76. //
  77. // Valid transitions:
  78. // - active
  79. // - closed
  80. case buffering(EventLoopFuture<Channel>)
  81. // We have a channel, we're doing the RPC, there may be a timeout.
  82. //
  83. // Valid transitions:
  84. // - closed
  85. case active(Channel)
  86. // We're closed; the RPC is done for one reason or another. This is terminal.
  87. case closed
  88. }
  89. private init(
  90. eventLoop: EventLoop,
  91. state: State,
  92. responseContainer: ResponsePartContainer<Response>,
  93. errorDelegate: ClientErrorDelegate?,
  94. logger: Logger
  95. ) {
  96. self.eventLoop = eventLoop
  97. self.state = state
  98. self.responseContainer = responseContainer
  99. self.errorDelegate = errorDelegate
  100. self.logger = logger
  101. self.startTimer()
  102. }
  103. internal convenience init(
  104. eventLoop: EventLoop,
  105. responseContainer: ResponsePartContainer<Response>,
  106. timeLimit: TimeLimit,
  107. errorDelegate: ClientErrorDelegate?,
  108. logger: Logger,
  109. channelProvider: (ChannelTransport<Request, Response>, EventLoopPromise<Channel>) -> ()
  110. ) {
  111. let channelPromise = eventLoop.makePromise(of: Channel.self)
  112. self.init(
  113. eventLoop: eventLoop,
  114. state: .buffering(channelPromise.futureResult),
  115. responseContainer: responseContainer,
  116. errorDelegate: errorDelegate,
  117. logger: logger
  118. )
  119. // If the channel creation fails we need to error the call. Note that we receive an
  120. // 'activation' from the channel instead of relying on the success of the future.
  121. channelPromise.futureResult.whenFailure { error in
  122. self.handleError(error, promise: nil)
  123. }
  124. // Schedule the timeout.
  125. self.setUpTimeLimit(timeLimit)
  126. // Now attempt to make the channel.
  127. channelProvider(self, channelPromise)
  128. }
  129. internal convenience init(
  130. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  131. responseContainer: ResponsePartContainer<Response>,
  132. callType: GRPCCallType,
  133. timeLimit: TimeLimit,
  134. errorDelegate: ClientErrorDelegate?,
  135. logger: Logger
  136. ) {
  137. self.init(
  138. eventLoop: multiplexer.eventLoop,
  139. responseContainer: responseContainer,
  140. timeLimit: timeLimit,
  141. errorDelegate: errorDelegate,
  142. logger: logger
  143. ) { call, streamPromise in
  144. multiplexer.whenComplete { result in
  145. switch result {
  146. case .success(let mux):
  147. mux.createStreamChannel(promise: streamPromise) { stream, streamID in
  148. stream.pipeline.addHandlers([
  149. _GRPCClientChannelHandler<Request, Response>(streamID: streamID, callType: callType, logger: logger),
  150. GRPCClientCallHandler(call: call)
  151. ])
  152. }
  153. case .failure(let error):
  154. streamPromise.fail(error)
  155. }
  156. }
  157. }
  158. }
  159. }
  160. // MARK: - Call API (i.e. called from {Unary,ClientStreaming,...}Call)
  161. extension ChannelTransport: ClientCallOutbound {
  162. /// Send a request part.
  163. ///
  164. /// Does not have to be called from the event loop.
  165. internal func sendRequest(_ part: RequestPart, promise: EventLoopPromise<Void>?) {
  166. if self.eventLoop.inEventLoop {
  167. self.writePart(part, flush: true, promise: promise)
  168. } else {
  169. self.eventLoop.execute {
  170. self.writePart(part, flush: true, promise: promise)
  171. }
  172. }
  173. }
  174. /// Send multiple request parts.
  175. ///
  176. /// Does not have to be called from the event loop.
  177. internal func sendRequests<S>(
  178. _ parts: S,
  179. promise: EventLoopPromise<Void>?
  180. ) where S: Sequence, S.Element == RequestPart {
  181. if self.eventLoop.inEventLoop {
  182. self._sendRequests(parts, promise: promise)
  183. } else {
  184. self.eventLoop.execute {
  185. self._sendRequests(parts, promise: promise)
  186. }
  187. }
  188. }
  189. /// Request that the RPC is cancelled.
  190. ///
  191. /// Does not have to be called from the event loop.
  192. internal func cancel(promise: EventLoopPromise<Void>?) {
  193. self.logger.info("rpc cancellation requested")
  194. if self.eventLoop.inEventLoop {
  195. self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
  196. } else {
  197. self.eventLoop.execute {
  198. self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
  199. }
  200. }
  201. }
  202. /// Returns the `Channel` for the HTTP/2 stream that this RPC is using.
  203. internal func streamChannel() -> EventLoopFuture<Channel> {
  204. if self.eventLoop.inEventLoop {
  205. return self.getStreamChannel()
  206. } else {
  207. return self.eventLoop.flatSubmit {
  208. self.getStreamChannel()
  209. }
  210. }
  211. }
  212. }
  213. extension ChannelTransport {
  214. /// Return a future for the stream channel.
  215. ///
  216. /// Must be called from the event loop.
  217. private func getStreamChannel() -> EventLoopFuture<Channel> {
  218. self.eventLoop.preconditionInEventLoop()
  219. switch self.state {
  220. case .buffering(let future):
  221. return future
  222. case .active(let channel):
  223. return self.eventLoop.makeSucceededFuture(channel)
  224. case .closed:
  225. return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
  226. }
  227. }
  228. /// Send many requests.
  229. ///
  230. /// Must be called from the event loop.
  231. private func _sendRequests<S>(
  232. _ parts: S,
  233. promise: EventLoopPromise<Void>?
  234. ) where S: Sequence, S.Element == RequestPart {
  235. self.eventLoop.preconditionInEventLoop()
  236. // We have a promise: create one for each request part and cascade the overall result to it.
  237. // If we're flushing we'll do it at the end.
  238. if let promise = promise {
  239. let loop = promise.futureResult.eventLoop
  240. let futures: [EventLoopFuture<Void>] = parts.map { part in
  241. let partPromise = loop.makePromise(of: Void.self)
  242. self.writePart(part, flush: false, promise: partPromise)
  243. return partPromise.futureResult
  244. }
  245. // Cascade the futures to the provided promise.
  246. EventLoopFuture.andAllSucceed(futures, on: loop).cascade(to: promise)
  247. } else {
  248. for part in parts {
  249. self.writePart(part, flush: false, promise: nil)
  250. }
  251. }
  252. // Now flush.
  253. self.flush()
  254. }
  255. /// Buffer or send a flush.
  256. ///
  257. /// Must be called from the event loop.
  258. private func flush() {
  259. self.eventLoop.preconditionInEventLoop()
  260. switch self.state {
  261. case .buffering:
  262. self.requestBuffer.mark()
  263. case .active(let stream):
  264. stream.flush()
  265. case .closed:
  266. ()
  267. }
  268. }
  269. /// Write a request part.
  270. ///
  271. /// Must be called from the event loop.
  272. ///
  273. /// - Parameters:
  274. /// - part: The part to write.
  275. /// - flush: Whether we should flush the channel after this write.
  276. /// - promise: A promise to fulfill when the part has been written.
  277. private func writePart(_ part: RequestPart, flush: Bool, promise: EventLoopPromise<Void>?) {
  278. self.eventLoop.assertInEventLoop()
  279. switch self.state {
  280. // We're buffering, so buffer the message.
  281. case .buffering:
  282. self.logger.debug("buffering request part", metadata: [
  283. "request_part": "\(part.name)",
  284. "call_state": "\(self.describeCallState())"
  285. ])
  286. self.requestBuffer.append(BufferedRequest(message: part, promise: promise))
  287. if flush {
  288. self.requestBuffer.mark()
  289. }
  290. // We have an active stream, just pass the write and promise through.
  291. case .active(let stream):
  292. self.logger.debug("writing request part", metadata: ["request_part": "\(part.name)"])
  293. stream.write(part, promise: promise)
  294. if flush {
  295. stream.flush()
  296. }
  297. // We're closed: drop the request.
  298. case .closed:
  299. self.logger.debug("dropping request part", metadata: [
  300. "request_part": "\(part.name)",
  301. "call_state": "\(self.describeCallState())"
  302. ])
  303. promise?.fail(ChannelError.ioOnClosedChannel)
  304. }
  305. }
  306. /// The scheduled timeout triggered: timeout the RPC if it's not yet finished.
  307. ///
  308. /// Must be called from the event loop.
  309. private func timedOut(after timeLimit: TimeLimit) {
  310. self.eventLoop.preconditionInEventLoop()
  311. let error = GRPCError.RPCTimedOut(timeLimit).captureContext()
  312. self.handleError(error, promise: nil)
  313. }
  314. /// Handle an error and optionally fail the provided promise with the error.
  315. ///
  316. /// Must be called from the event loop.
  317. private func handleError(_ error: Error, promise: EventLoopPromise<Void>?) {
  318. self.eventLoop.preconditionInEventLoop()
  319. switch self.state {
  320. // We only care about errors if we're not shutdown yet.
  321. case .buffering, .active:
  322. // Add our current state to the logger we provide to the callback.
  323. var loggerWithState = self.logger
  324. loggerWithState[metadataKey: "call_state"] = "\(self.describeCallState())"
  325. let errorStatus: GRPCStatus
  326. if let errorWithContext = error as? GRPCError.WithContext {
  327. errorStatus = errorWithContext.error.makeGRPCStatus()
  328. self.errorDelegate?.didCatchError(
  329. errorWithContext.error,
  330. logger: loggerWithState,
  331. file: errorWithContext.file,
  332. line: errorWithContext.line
  333. )
  334. } else if let transformable = error as? GRPCStatusTransformable {
  335. errorStatus = transformable.makeGRPCStatus()
  336. self.errorDelegate?.didCatchErrorWithoutContext(error, logger: loggerWithState)
  337. } else {
  338. errorStatus = .processingError
  339. self.errorDelegate?.didCatchErrorWithoutContext(error, logger: loggerWithState)
  340. }
  341. // Update our state: we're closing.
  342. self.close(withStatus: errorStatus)
  343. promise?.fail(errorStatus)
  344. case .closed:
  345. promise?.fail(ChannelError.alreadyClosed)
  346. }
  347. }
  348. /// Close the call, if it's not yet closed with the given status.
  349. ///
  350. /// Must be called from the event loop.
  351. private func close(withStatus status: GRPCStatus) {
  352. self.eventLoop.preconditionInEventLoop()
  353. switch self.state {
  354. case .buffering(let streamFuture):
  355. // We're closed now.
  356. self.state = .closed
  357. self.stopTimer(status: status)
  358. // We're done; cancel the timeout.
  359. self.scheduledTimeout?.cancel()
  360. self.scheduledTimeout = nil
  361. // Fail any outstanding promises.
  362. self.responseContainer.fail(with: status)
  363. // Fail any buffered writes.
  364. while !self.requestBuffer.isEmpty {
  365. let write = self.requestBuffer.removeFirst()
  366. write.promise?.fail(status)
  367. }
  368. // Close the channel, if it comes up.
  369. streamFuture.whenSuccess {
  370. $0.close(mode: .all, promise: nil)
  371. }
  372. case .active(let channel):
  373. // We're closed now.
  374. self.state = .closed
  375. self.stopTimer(status: status)
  376. // We're done; cancel the timeout.
  377. self.scheduledTimeout?.cancel()
  378. self.scheduledTimeout = nil
  379. // Fail any outstanding promises.
  380. self.responseContainer.fail(with: status)
  381. // Close the channel.
  382. channel.close(mode: .all, promise: nil)
  383. case .closed:
  384. ()
  385. }
  386. }
  387. }
  388. // MARK: - Channel Inbound
  389. extension ChannelTransport: ClientCallInbound {
  390. /// Receive an error from the Channel.
  391. ///
  392. /// Must be called on the event loop.
  393. internal func receiveError(_ error: Error) {
  394. self.eventLoop.preconditionInEventLoop()
  395. self.handleError(error, promise: nil)
  396. }
  397. /// Receive a response part from the Channel.
  398. ///
  399. /// Must be called on the event loop.
  400. func receiveResponse(_ part: _GRPCClientResponsePart<Response>) {
  401. self.eventLoop.preconditionInEventLoop()
  402. switch self.state {
  403. case .buffering:
  404. preconditionFailure("Received response part in 'buffering' state")
  405. case .active:
  406. self.logger.debug("received response part", metadata: ["response_part": "\(part.name)"])
  407. switch part {
  408. case .initialMetadata(let metadata):
  409. self.responseContainer.lazyInitialMetadataPromise.completeWith(.success(metadata))
  410. case .message(let messageContext):
  411. switch self.responseContainer.responseHandler {
  412. case .unary(let responsePromise):
  413. responsePromise.succeed(messageContext.message)
  414. case .stream(let handler):
  415. handler(messageContext.message)
  416. }
  417. case .trailingMetadata(let metadata):
  418. self.responseContainer.lazyTrailingMetadataPromise.succeed(metadata)
  419. case .status(let status):
  420. // We're done; cancel the timeout.
  421. self.scheduledTimeout?.cancel()
  422. self.scheduledTimeout = nil
  423. // We're closed now.
  424. self.state = .closed
  425. self.stopTimer(status: status)
  426. // We're not really failing the status here; in some cases the server may fast fail, in which
  427. // case we'll only see trailing metadata and status: we should fail the initial metadata and
  428. // response in that case.
  429. self.responseContainer.fail(with: status)
  430. }
  431. case .closed:
  432. self.logger.debug("dropping response part", metadata: [
  433. "response_part": "\(part.name)",
  434. "call_state": "\(self.describeCallState())"
  435. ])
  436. }
  437. }
  438. /// The underlying channel become active and can start accepting writes.
  439. ///
  440. /// Must be called on the event loop.
  441. internal func activate(stream: Channel) {
  442. self.eventLoop.preconditionInEventLoop()
  443. // The channel has become active: what now?
  444. switch self.state {
  445. case .buffering:
  446. while !self.requestBuffer.isEmpty {
  447. // Are we marked?
  448. let hadMark = self.requestBuffer.hasMark
  449. let request = self.requestBuffer.removeFirst()
  450. // We became unmarked: we need to flush.
  451. let shouldFlush = hadMark && !self.requestBuffer.hasMark
  452. self.logger.debug("unbuffering request part", metadata: ["request_part": "\(request.message.name)"])
  453. stream.write(request.message, promise: request.promise)
  454. if shouldFlush {
  455. stream.flush()
  456. }
  457. }
  458. self.logger.debug("request buffer drained")
  459. self.state = .active(stream)
  460. case .active:
  461. preconditionFailure("Invalid state: stream is already active")
  462. case .closed:
  463. // The channel became active but we're already closed: we must've timed out waiting for the
  464. // channel to activate so close the channel now.
  465. stream.close(mode: .all, promise: nil)
  466. }
  467. }
  468. }
  469. // MARK: Private Helpers
  470. extension ChannelTransport {
  471. private func describeCallState() -> String {
  472. self.eventLoop.preconditionInEventLoop()
  473. switch self.state {
  474. case .buffering:
  475. return "waiting for connection; \(self.requestBuffer.count) request part(s) buffered"
  476. case .active:
  477. return "active"
  478. case .closed:
  479. return "closed"
  480. }
  481. }
  482. private func startTimer() {
  483. assert(self.stopwatch == nil)
  484. self.stopwatch = Stopwatch()
  485. self.logger.debug("starting rpc")
  486. }
  487. private func stopTimer(status: GRPCStatus) {
  488. self.eventLoop.preconditionInEventLoop()
  489. if let stopwatch = self.stopwatch {
  490. let millis = stopwatch.elapsedMillis()
  491. self.logger.debug("rpc call finished", metadata: [
  492. "duration_ms": "\(millis)",
  493. "status_code": "\(status.code.rawValue)",
  494. "status_message": "\(status.message ?? "nil")"
  495. ])
  496. self.stopwatch = nil
  497. }
  498. }
  499. /// Sets a time limit for the RPC.
  500. private func setUpTimeLimit(_ timeLimit: TimeLimit) {
  501. let deadline = timeLimit.makeDeadline()
  502. guard deadline != .distantFuture else {
  503. // This is too distant to worry about.
  504. return
  505. }
  506. let timedOutTask = {
  507. self.timedOut(after: timeLimit)
  508. }
  509. // 'scheduledTimeout' must only be accessed from the event loop.
  510. if self.eventLoop.inEventLoop {
  511. self.scheduledTimeout = self.eventLoop.scheduleTask(deadline: deadline, timedOutTask)
  512. } else {
  513. self.eventLoop.execute {
  514. self.scheduledTimeout = self.eventLoop.scheduleTask(deadline: deadline, timedOutTask)
  515. }
  516. }
  517. }
  518. }
  519. extension _GRPCClientRequestPart {
  520. fileprivate var name: String {
  521. switch self {
  522. case .head:
  523. return "head"
  524. case .message:
  525. return "message"
  526. case .end:
  527. return "end"
  528. }
  529. }
  530. }
  531. extension _GRPCClientResponsePart {
  532. fileprivate var name: String {
  533. switch self {
  534. case .initialMetadata:
  535. return "initial metadata"
  536. case .message:
  537. return "message"
  538. case .trailingMetadata:
  539. return "trailing metadata"
  540. case .status:
  541. return "status"
  542. }
  543. }
  544. }