ClientCallTransport.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to
  21. /// framework users. It is the glue between a call object and the underlying transport (typically a
  22. /// NIO Channel).
  23. ///
  24. /// Typically, each call will be configured on an HTTP/2 stream channel. The stream channel will
  25. /// will be configured as such:
  26. ///
  27. /// ```
  28. /// ┌────────────────────────────────────┐
  29. /// │ ChannelTransport<Request,Response> │
  30. /// └─────▲───────────────────────┬──────┘
  31. /// │ │
  32. /// --------------------------------│-----------------------│------------------------------
  33. /// HTTP2StreamChannel │ │
  34. /// ┌────────────┴──────────┐ │
  35. /// │ GRPCClientCallHandler │ │
  36. /// └────────────▲──────────┘ │
  37. /// GRPCClientResponsePart<Response>│ │GRPCClientRequestPart<Request>
  38. /// ┌─┴───────────────────────▼─┐
  39. /// │ GRPCClientChannelHandler │
  40. /// └─▲───────────────────────┬─┘
  41. /// HTTP2Frame│ │HTTP2Frame
  42. /// | |
  43. /// ```
  44. ///
  45. /// Note: the "main" pipeline provided by the channel in `ClientConnection`.
  46. internal class ChannelTransport<Request, Response> {
  47. internal typealias RequestPart = _GRPCClientRequestPart<Request>
  48. internal typealias ResponsePart = _GRPCClientResponsePart<Response>
  49. /// The `EventLoop` this call is running on.
  50. internal let eventLoop: EventLoop
  51. /// A logger.
  52. private let logger: Logger
  53. /// The current state of the call.
  54. private var state: State
  55. /// A scheduled timeout for the call.
  56. private var scheduledTimeout: Scheduled<Void>?
  57. // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
  58. // have 3 parts.
  59. /// A buffer to store requests in before the channel has become active.
  60. private var requestBuffer = MarkedCircularBuffer<BufferedRequest>(initialCapacity: 4)
  61. /// A request that we'll deal with at a later point in time.
  62. private struct BufferedRequest {
  63. /// The request to write.
  64. var message: _GRPCClientRequestPart<Request>
  65. /// Any promise associated with the request.
  66. var promise: EventLoopPromise<Void>?
  67. }
  68. /// An error delegate provided by the user.
  69. private var errorDelegate: ClientErrorDelegate?
  70. /// A container for response part promises for the call.
  71. internal var responseContainer: ResponsePartContainer<Response>
  72. /// A stopwatch for timing the RPC.
  73. private var stopwatch: Stopwatch?
  74. enum State {
  75. // Waiting for a stream to become active.
  76. //
  77. // Valid transitions:
  78. // - active
  79. // - closed
  80. case buffering(EventLoopFuture<Channel>)
  81. // We have a channel, we're doing the RPC, there may be a timeout.
  82. //
  83. // Valid transitions:
  84. // - closed
  85. case active(Channel)
  86. // We're closed; the RPC is done for one reason or another. This is terminal.
  87. case closed
  88. }
  89. private init(
  90. eventLoop: EventLoop,
  91. state: State,
  92. responseContainer: ResponsePartContainer<Response>,
  93. errorDelegate: ClientErrorDelegate?,
  94. logger: Logger
  95. ) {
  96. self.eventLoop = eventLoop
  97. self.state = state
  98. self.responseContainer = responseContainer
  99. self.errorDelegate = errorDelegate
  100. self.logger = logger
  101. self.startTimer()
  102. }
  103. internal convenience init(
  104. eventLoop: EventLoop,
  105. responseContainer: ResponsePartContainer<Response>,
  106. timeLimit: TimeLimit,
  107. errorDelegate: ClientErrorDelegate?,
  108. logger: Logger,
  109. channelProvider: (ChannelTransport<Request, Response>, EventLoopPromise<Channel>) -> Void
  110. ) {
  111. let channelPromise = eventLoop.makePromise(of: Channel.self)
  112. self.init(
  113. eventLoop: eventLoop,
  114. state: .buffering(channelPromise.futureResult),
  115. responseContainer: responseContainer,
  116. errorDelegate: errorDelegate,
  117. logger: logger
  118. )
  119. // If the channel creation fails we need to error the call. Note that we receive an
  120. // 'activation' from the channel instead of relying on the success of the future.
  121. channelPromise.futureResult.whenFailure { error in
  122. if error is GRPCStatus || error is GRPCStatusTransformable {
  123. self.handleError(error, promise: nil)
  124. } else {
  125. // Fallback to something which will mark the RPC as 'unavailable'.
  126. self.handleError(ConnectionFailure(reason: error), promise: nil)
  127. }
  128. }
  129. // Schedule the timeout.
  130. self.setUpTimeLimit(timeLimit)
  131. // Now attempt to make the channel.
  132. channelProvider(self, channelPromise)
  133. }
  134. internal convenience init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  135. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  136. serializer: Serializer,
  137. deserializer: Deserializer,
  138. responseContainer: ResponsePartContainer<Response>,
  139. callType: GRPCCallType,
  140. timeLimit: TimeLimit,
  141. errorDelegate: ClientErrorDelegate?,
  142. logger: Logger
  143. ) where Serializer.Input == Request, Deserializer.Output == Response {
  144. self.init(
  145. eventLoop: multiplexer.eventLoop,
  146. responseContainer: responseContainer,
  147. timeLimit: timeLimit,
  148. errorDelegate: errorDelegate,
  149. logger: logger
  150. ) { call, streamPromise in
  151. multiplexer.whenComplete { result in
  152. switch result {
  153. case let .success(mux):
  154. mux.createStreamChannel(promise: streamPromise) { stream in
  155. stream.pipeline.addHandlers([
  156. _GRPCClientChannelHandler(callType: callType, logger: logger),
  157. GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
  158. GRPCClientCallHandler(call: call),
  159. ])
  160. }
  161. case let .failure(error):
  162. streamPromise.fail(error)
  163. }
  164. }
  165. }
  166. }
  167. internal convenience init(
  168. fakeResponse: _FakeResponseStream<Request, Response>,
  169. responseContainer: ResponsePartContainer<Response>,
  170. timeLimit: TimeLimit,
  171. logger: Logger
  172. ) {
  173. self.init(
  174. eventLoop: fakeResponse.channel.eventLoop,
  175. responseContainer: responseContainer,
  176. timeLimit: timeLimit,
  177. errorDelegate: nil,
  178. logger: logger
  179. ) { call, streamPromise in
  180. fakeResponse.channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).map {
  181. fakeResponse.channel
  182. }.cascade(to: streamPromise)
  183. }
  184. }
  185. /// Makes a transport whose channel promise is failed immediately.
  186. internal static func makeTransportForMissingFakeResponse(
  187. eventLoop: EventLoop,
  188. responseContainer: ResponsePartContainer<Response>,
  189. logger: Logger
  190. ) -> ChannelTransport<Request, Response> {
  191. return .init(
  192. eventLoop: eventLoop,
  193. responseContainer: responseContainer,
  194. timeLimit: .none,
  195. errorDelegate: nil,
  196. logger: logger
  197. ) { _, promise in
  198. let error = GRPCStatus(
  199. code: .unavailable,
  200. message: "No fake response was registered before starting an RPC."
  201. )
  202. promise.fail(error)
  203. }
  204. }
  205. }
  206. // MARK: - Call API (i.e. called from {Unary,ClientStreaming,...}Call)
  207. extension ChannelTransport: ClientCallOutbound {
  208. /// Send a request part.
  209. ///
  210. /// Does not have to be called from the event loop.
  211. internal func sendRequest(_ part: RequestPart, promise: EventLoopPromise<Void>?) {
  212. if self.eventLoop.inEventLoop {
  213. self.writePart(part, flush: true, promise: promise)
  214. } else {
  215. self.eventLoop.execute {
  216. self.writePart(part, flush: true, promise: promise)
  217. }
  218. }
  219. }
  220. /// Send multiple request parts.
  221. ///
  222. /// Does not have to be called from the event loop.
  223. internal func sendRequests<S>(
  224. _ parts: S,
  225. promise: EventLoopPromise<Void>?
  226. ) where S: Sequence, S.Element == RequestPart {
  227. if self.eventLoop.inEventLoop {
  228. self._sendRequests(parts, promise: promise)
  229. } else {
  230. self.eventLoop.execute {
  231. self._sendRequests(parts, promise: promise)
  232. }
  233. }
  234. }
  235. /// Request that the RPC is cancelled.
  236. ///
  237. /// Does not have to be called from the event loop.
  238. internal func cancel(promise: EventLoopPromise<Void>?) {
  239. self.logger.info("rpc cancellation requested", source: "GRPC")
  240. if self.eventLoop.inEventLoop {
  241. self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
  242. } else {
  243. self.eventLoop.execute {
  244. self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
  245. }
  246. }
  247. }
  248. /// Returns the `Channel` for the HTTP/2 stream that this RPC is using.
  249. internal func streamChannel() -> EventLoopFuture<Channel> {
  250. if self.eventLoop.inEventLoop {
  251. return self.getStreamChannel()
  252. } else {
  253. return self.eventLoop.flatSubmit {
  254. self.getStreamChannel()
  255. }
  256. }
  257. }
  258. }
  259. extension ChannelTransport {
  260. /// Return a future for the stream channel.
  261. ///
  262. /// Must be called from the event loop.
  263. private func getStreamChannel() -> EventLoopFuture<Channel> {
  264. self.eventLoop.preconditionInEventLoop()
  265. switch self.state {
  266. case let .buffering(future):
  267. return future
  268. case let .active(channel):
  269. return self.eventLoop.makeSucceededFuture(channel)
  270. case .closed:
  271. return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel)
  272. }
  273. }
  274. /// Send many requests.
  275. ///
  276. /// Must be called from the event loop.
  277. private func _sendRequests<S>(
  278. _ parts: S,
  279. promise: EventLoopPromise<Void>?
  280. ) where S: Sequence, S.Element == RequestPart {
  281. self.eventLoop.preconditionInEventLoop()
  282. // We have a promise: create one for each request part and cascade the overall result to it.
  283. // If we're flushing we'll do it at the end.
  284. if let promise = promise {
  285. let loop = promise.futureResult.eventLoop
  286. let futures: [EventLoopFuture<Void>] = parts.map { part in
  287. let partPromise = loop.makePromise(of: Void.self)
  288. self.writePart(part, flush: false, promise: partPromise)
  289. return partPromise.futureResult
  290. }
  291. // Cascade the futures to the provided promise.
  292. EventLoopFuture.andAllSucceed(futures, on: loop).cascade(to: promise)
  293. } else {
  294. for part in parts {
  295. self.writePart(part, flush: false, promise: nil)
  296. }
  297. }
  298. // Now flush.
  299. self.flush()
  300. }
  301. /// Buffer or send a flush.
  302. ///
  303. /// Must be called from the event loop.
  304. private func flush() {
  305. self.eventLoop.preconditionInEventLoop()
  306. switch self.state {
  307. case .buffering:
  308. self.requestBuffer.mark()
  309. case let .active(stream):
  310. stream.flush()
  311. case .closed:
  312. ()
  313. }
  314. }
  315. /// Write a request part.
  316. ///
  317. /// Must be called from the event loop.
  318. ///
  319. /// - Parameters:
  320. /// - part: The part to write.
  321. /// - flush: Whether we should flush the channel after this write.
  322. /// - promise: A promise to fulfill when the part has been written.
  323. private func writePart(_ part: RequestPart, flush: Bool, promise: EventLoopPromise<Void>?) {
  324. self.eventLoop.assertInEventLoop()
  325. switch self.state {
  326. // We're buffering, so buffer the message.
  327. case .buffering:
  328. self.logger.debug("buffering request part", metadata: [
  329. "request_part": "\(part.name)",
  330. "call_state": "\(self.describeCallState())",
  331. ], source: "GRPC")
  332. self.requestBuffer.append(BufferedRequest(message: part, promise: promise))
  333. if flush {
  334. self.requestBuffer.mark()
  335. }
  336. // We have an active stream, just pass the write and promise through.
  337. case let .active(stream):
  338. self.logger.debug(
  339. "writing request part",
  340. metadata: ["request_part": "\(part.name)"],
  341. source: "GRPC"
  342. )
  343. stream.write(part, promise: promise)
  344. if flush {
  345. stream.flush()
  346. }
  347. // We're closed: drop the request.
  348. case .closed:
  349. self.logger.debug("dropping request part", metadata: [
  350. "request_part": "\(part.name)",
  351. "call_state": "\(self.describeCallState())",
  352. ], source: "GRPC")
  353. promise?.fail(ChannelError.ioOnClosedChannel)
  354. }
  355. }
  356. /// The scheduled timeout triggered: timeout the RPC if it's not yet finished.
  357. ///
  358. /// Must be called from the event loop.
  359. private func timedOut(after timeLimit: TimeLimit) {
  360. self.eventLoop.preconditionInEventLoop()
  361. let error = GRPCError.RPCTimedOut(timeLimit).captureContext()
  362. self.handleError(error, promise: nil)
  363. }
  364. /// Handle an error and optionally fail the provided promise with the error.
  365. ///
  366. /// Must be called from the event loop.
  367. private func handleError(_ error: Error, promise: EventLoopPromise<Void>?) {
  368. self.eventLoop.preconditionInEventLoop()
  369. switch self.state {
  370. // We only care about errors if we're not shutdown yet.
  371. case .buffering, .active:
  372. // Add our current state to the logger we provide to the callback.
  373. var loggerWithState = self.logger
  374. loggerWithState[metadataKey: "call_state"] = "\(self.describeCallState())"
  375. let errorStatus: GRPCStatus
  376. let errorWithoutContext: Error
  377. if let errorWithContext = error as? GRPCError.WithContext {
  378. errorStatus = errorWithContext.error.makeGRPCStatus()
  379. errorWithoutContext = errorWithContext.error
  380. self.errorDelegate?.didCatchError(
  381. errorWithContext.error,
  382. logger: loggerWithState,
  383. file: errorWithContext.file,
  384. line: errorWithContext.line
  385. )
  386. } else if let transformable = error as? GRPCStatusTransformable {
  387. errorStatus = transformable.makeGRPCStatus()
  388. errorWithoutContext = error
  389. self.errorDelegate?.didCatchErrorWithoutContext(error, logger: loggerWithState)
  390. } else {
  391. errorStatus = .processingError
  392. errorWithoutContext = error
  393. self.errorDelegate?.didCatchErrorWithoutContext(error, logger: loggerWithState)
  394. }
  395. // Update our state: we're closing.
  396. self.close(error: errorWithoutContext, status: errorStatus)
  397. promise?.fail(errorStatus)
  398. case .closed:
  399. promise?.fail(ChannelError.alreadyClosed)
  400. }
  401. }
  402. /// Close the call, if it's not yet closed with the given status.
  403. ///
  404. /// Must be called from the event loop.
  405. private func close(error: Error, status: GRPCStatus) {
  406. self.eventLoop.preconditionInEventLoop()
  407. switch self.state {
  408. case let .buffering(streamFuture):
  409. // We're closed now.
  410. self.state = .closed
  411. self.stopTimer(status: status)
  412. // We're done; cancel the timeout.
  413. self.scheduledTimeout?.cancel()
  414. self.scheduledTimeout = nil
  415. // Fail any outstanding promises.
  416. self.responseContainer.fail(with: error, status: status)
  417. // Fail any buffered writes.
  418. while !self.requestBuffer.isEmpty {
  419. let write = self.requestBuffer.removeFirst()
  420. write.promise?.fail(status)
  421. }
  422. // Close the channel, if it comes up.
  423. streamFuture.whenSuccess {
  424. $0.close(mode: .all, promise: nil)
  425. }
  426. case let .active(channel):
  427. // We're closed now.
  428. self.state = .closed
  429. self.stopTimer(status: status)
  430. // We're done; cancel the timeout.
  431. self.scheduledTimeout?.cancel()
  432. self.scheduledTimeout = nil
  433. // Fail any outstanding promises.
  434. self.responseContainer.fail(with: error, status: status)
  435. // Close the channel.
  436. channel.close(mode: .all, promise: nil)
  437. case .closed:
  438. ()
  439. }
  440. }
  441. }
  442. // MARK: - Channel Inbound
  443. extension ChannelTransport: ClientCallInbound {
  444. /// Receive an error from the Channel.
  445. ///
  446. /// Must be called on the event loop.
  447. internal func receiveError(_ error: Error) {
  448. self.eventLoop.preconditionInEventLoop()
  449. self.handleError(error, promise: nil)
  450. }
  451. /// Receive a response part from the Channel.
  452. ///
  453. /// Must be called on the event loop.
  454. func receiveResponse(_ part: _GRPCClientResponsePart<Response>) {
  455. self.eventLoop.preconditionInEventLoop()
  456. switch self.state {
  457. case .buffering:
  458. preconditionFailure("Received response part in 'buffering' state")
  459. case .active:
  460. self.logger.debug(
  461. "received response part",
  462. metadata: ["response_part": "\(part.name)"],
  463. source: "GRPC"
  464. )
  465. switch part {
  466. case let .initialMetadata(metadata):
  467. self.responseContainer.lazyInitialMetadataPromise.completeWith(.success(metadata))
  468. case let .message(messageContext):
  469. switch self.responseContainer.responseHandler {
  470. case let .unary(responsePromise):
  471. responsePromise.succeed(messageContext.message)
  472. case let .stream(handler):
  473. handler(messageContext.message)
  474. }
  475. case let .trailingMetadata(metadata):
  476. self.responseContainer.lazyTrailingMetadataPromise.succeed(metadata)
  477. case let .status(status):
  478. // We're closed now.
  479. self.state = .closed
  480. self.stopTimer(status: status)
  481. // We're done; cancel the timeout.
  482. self.scheduledTimeout?.cancel()
  483. self.scheduledTimeout = nil
  484. // We're not really failing the status here; in some cases the server may fast fail, in which
  485. // case we'll only see trailing metadata and status: we should fail the initial metadata and
  486. // response in that case.
  487. self.responseContainer.fail(with: status, status: status)
  488. }
  489. case .closed:
  490. self.logger.debug("dropping response part", metadata: [
  491. "response_part": "\(part.name)",
  492. "call_state": "\(self.describeCallState())",
  493. ], source: "GRPC")
  494. }
  495. }
  496. /// The underlying channel become active and can start accepting writes.
  497. ///
  498. /// Must be called on the event loop.
  499. internal func activate(stream: Channel) {
  500. self.eventLoop.preconditionInEventLoop()
  501. self.logger.debug("activated stream channel", source: "GRPC")
  502. // The channel has become active: what now?
  503. switch self.state {
  504. case .buffering:
  505. while !self.requestBuffer.isEmpty {
  506. // Are we marked?
  507. let hadMark = self.requestBuffer.hasMark
  508. let request = self.requestBuffer.removeFirst()
  509. // We became unmarked: we need to flush.
  510. let shouldFlush = hadMark && !self.requestBuffer.hasMark
  511. self.logger.debug(
  512. "unbuffering request part",
  513. metadata: ["request_part": "\(request.message.name)"],
  514. source: "GRPC"
  515. )
  516. stream.write(request.message, promise: request.promise)
  517. if shouldFlush {
  518. stream.flush()
  519. }
  520. }
  521. self.logger.debug("request buffer drained", source: "GRPC")
  522. self.state = .active(stream)
  523. case .active:
  524. preconditionFailure("Invalid state: stream is already active")
  525. case .closed:
  526. // The channel became active but we're already closed: we must've timed out waiting for the
  527. // channel to activate so close the channel now.
  528. stream.close(mode: .all, promise: nil)
  529. }
  530. }
  531. }
  532. // MARK: Private Helpers
  533. extension ChannelTransport {
  534. private func describeCallState() -> String {
  535. self.eventLoop.preconditionInEventLoop()
  536. switch self.state {
  537. case .buffering:
  538. return "waiting for connection; \(self.requestBuffer.count) request part(s) buffered"
  539. case .active:
  540. return "active"
  541. case .closed:
  542. return "closed"
  543. }
  544. }
  545. private func startTimer() {
  546. assert(self.stopwatch == nil)
  547. self.stopwatch = Stopwatch()
  548. self.logger.debug("starting rpc", source: "GRPC")
  549. }
  550. private func stopTimer(status: GRPCStatus) {
  551. self.eventLoop.preconditionInEventLoop()
  552. if let stopwatch = self.stopwatch {
  553. let millis = stopwatch.elapsedMillis()
  554. self.logger.debug("rpc call finished", metadata: [
  555. "duration_ms": "\(millis)",
  556. "status_code": "\(status.code.rawValue)",
  557. "status_message": "\(status.message ?? "nil")",
  558. ], source: "GRPC")
  559. self.stopwatch = nil
  560. }
  561. }
  562. /// Sets a time limit for the RPC.
  563. private func setUpTimeLimit(_ timeLimit: TimeLimit) {
  564. let deadline = timeLimit.makeDeadline()
  565. guard deadline != .distantFuture else {
  566. // This is too distant to worry about.
  567. return
  568. }
  569. let timedOutTask = {
  570. self.timedOut(after: timeLimit)
  571. }
  572. // 'scheduledTimeout' must only be accessed from the event loop.
  573. if self.eventLoop.inEventLoop {
  574. self.scheduledTimeout = self.eventLoop.scheduleTask(deadline: deadline, timedOutTask)
  575. } else {
  576. self.eventLoop.execute {
  577. self.scheduledTimeout = self.eventLoop.scheduleTask(deadline: deadline, timedOutTask)
  578. }
  579. }
  580. }
  581. }
  582. extension _GRPCClientRequestPart {
  583. fileprivate var name: String {
  584. switch self {
  585. case .head:
  586. return "head"
  587. case .message:
  588. return "message"
  589. case .end:
  590. return "end"
  591. }
  592. }
  593. }
  594. extension _GRPCClientResponsePart {
  595. fileprivate var name: String {
  596. switch self {
  597. case .initialMetadata:
  598. return "initial metadata"
  599. case .message:
  600. return "message"
  601. case .trailingMetadata:
  602. return "trailing metadata"
  603. case .status:
  604. return "status"
  605. }
  606. }
  607. }
  608. // A wrapper for connection errors: we need to be able to preserve the underlying error as
  609. // well as extract a 'GRPCStatus' with code '.unavailable'.
  610. private struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
  611. /// The reason the connection failed.
  612. var reason: Error
  613. init(reason: Error) {
  614. self.reason = reason
  615. }
  616. var description: String {
  617. return String(describing: self.reason)
  618. }
  619. func makeGRPCStatus() -> GRPCStatus {
  620. return GRPCStatus(code: .unavailable, message: String(describing: self.reason))
  621. }
  622. }