2
0

GRPCAsyncServerHandler.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import NIOCore
  18. import NIOHPACK
  19. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  20. public struct GRPCAsyncServerHandler<
  21. Serializer: MessageSerializer,
  22. Deserializer: MessageDeserializer,
  23. Request: Sendable,
  24. Response: Sendable
  25. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  26. @usableFromInline
  27. internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
  28. public func receiveMetadata(_ metadata: HPACKHeaders) {
  29. self._handler.receiveMetadata(metadata)
  30. }
  31. public func receiveMessage(_ bytes: ByteBuffer) {
  32. self._handler.receiveMessage(bytes)
  33. }
  34. public func receiveEnd() {
  35. self._handler.receiveEnd()
  36. }
  37. public func receiveError(_ error: Error) {
  38. self._handler.receiveError(error)
  39. }
  40. public func finish() {
  41. self._handler.finish()
  42. }
  43. }
  44. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  45. extension GRPCAsyncServerHandler {
  46. public typealias Request = Deserializer.Output
  47. public typealias Response = Serializer.Input
  48. @inlinable
  49. public init(
  50. context: CallHandlerContext,
  51. requestDeserializer: Deserializer,
  52. responseSerializer: Serializer,
  53. interceptors: [ServerInterceptor<Request, Response>],
  54. wrapping unary: @escaping @Sendable(Request, GRPCAsyncServerCallContext) async throws
  55. -> Response
  56. ) {
  57. self._handler = .init(
  58. context: context,
  59. requestDeserializer: requestDeserializer,
  60. responseSerializer: responseSerializer,
  61. interceptors: interceptors,
  62. userHandler: { requestStream, responseStreamWriter, context in
  63. var iterator = requestStream.makeAsyncIterator()
  64. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  65. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  66. }
  67. let response = try await unary(request, context)
  68. try await responseStreamWriter.send(response)
  69. }
  70. )
  71. }
  72. @inlinable
  73. public init(
  74. context: CallHandlerContext,
  75. requestDeserializer: Deserializer,
  76. responseSerializer: Serializer,
  77. interceptors: [ServerInterceptor<Request, Response>],
  78. wrapping clientStreaming: @escaping @Sendable(
  79. GRPCAsyncRequestStream<Request>,
  80. GRPCAsyncServerCallContext
  81. ) async throws -> Response
  82. ) {
  83. self._handler = .init(
  84. context: context,
  85. requestDeserializer: requestDeserializer,
  86. responseSerializer: responseSerializer,
  87. interceptors: interceptors,
  88. userHandler: { requestStream, responseStreamWriter, context in
  89. let response = try await clientStreaming(requestStream, context)
  90. try await responseStreamWriter.send(response)
  91. }
  92. )
  93. }
  94. @inlinable
  95. public init(
  96. context: CallHandlerContext,
  97. requestDeserializer: Deserializer,
  98. responseSerializer: Serializer,
  99. interceptors: [ServerInterceptor<Request, Response>],
  100. wrapping serverStreaming: @escaping @Sendable(
  101. Request,
  102. GRPCAsyncResponseStreamWriter<Response>,
  103. GRPCAsyncServerCallContext
  104. ) async throws -> Void
  105. ) {
  106. self._handler = .init(
  107. context: context,
  108. requestDeserializer: requestDeserializer,
  109. responseSerializer: responseSerializer,
  110. interceptors: interceptors,
  111. userHandler: { requestStream, responseStreamWriter, context in
  112. var iterator = requestStream.makeAsyncIterator()
  113. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  114. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  115. }
  116. try await serverStreaming(request, responseStreamWriter, context)
  117. }
  118. )
  119. }
  120. @inlinable
  121. public init(
  122. context: CallHandlerContext,
  123. requestDeserializer: Deserializer,
  124. responseSerializer: Serializer,
  125. interceptors: [ServerInterceptor<Request, Response>],
  126. wrapping bidirectional: @escaping @Sendable(
  127. GRPCAsyncRequestStream<Request>,
  128. GRPCAsyncResponseStreamWriter<Response>,
  129. GRPCAsyncServerCallContext
  130. ) async throws -> Void
  131. ) {
  132. self._handler = .init(
  133. context: context,
  134. requestDeserializer: requestDeserializer,
  135. responseSerializer: responseSerializer,
  136. interceptors: interceptors,
  137. userHandler: bidirectional
  138. )
  139. }
  140. }
  141. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  142. @usableFromInline
  143. internal final class AsyncServerHandler<
  144. Serializer: MessageSerializer,
  145. Deserializer: MessageDeserializer,
  146. Request: Sendable,
  147. Response: Sendable
  148. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  149. /// A response serializer.
  150. @usableFromInline
  151. internal let serializer: Serializer
  152. /// A request deserializer.
  153. @usableFromInline
  154. internal let deserializer: Deserializer
  155. /// A pipeline of user provided interceptors.
  156. @usableFromInline
  157. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  158. /// The context required in order create the function.
  159. @usableFromInline
  160. internal let context: CallHandlerContext
  161. /// A reference to a `UserInfo`.
  162. @usableFromInline
  163. internal let userInfoRef: Ref<UserInfo>
  164. /// The user provided function to execute.
  165. @usableFromInline
  166. internal let userHandler: @Sendable(
  167. GRPCAsyncRequestStream<Request>,
  168. GRPCAsyncResponseStreamWriter<Response>,
  169. GRPCAsyncServerCallContext
  170. ) async throws -> Void
  171. /// The state of the handler.
  172. @usableFromInline
  173. internal var state: State = .idle
  174. /// The task used to run the async user handler.
  175. ///
  176. /// - TODO: I'd like it if this was part of the assoc data for the .active state but doing so may introduce a race condition.
  177. @usableFromInline
  178. internal var userHandlerTask: Task<Void, Never>? = nil
  179. @usableFromInline
  180. internal enum State {
  181. /// No headers have been received.
  182. case idle
  183. @usableFromInline
  184. internal struct ActiveState {
  185. /// The source backing the request stream that is being consumed by the user handler.
  186. @usableFromInline
  187. let requestStreamSource: PassthroughMessageSource<Request, Error>
  188. /// The call context that was passed to the user handler.
  189. @usableFromInline
  190. let context: GRPCAsyncServerCallContext
  191. /// The response stream writer that is being used by the user handler.
  192. ///
  193. /// Because this is pausable, it may contain responses after the user handler has completed
  194. /// that have yet to be written. However we will remain in the `.active` state until the
  195. /// response stream writer has completed.
  196. @usableFromInline
  197. let responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>
  198. /// The response headers have been sent back to the client via the interceptors.
  199. @usableFromInline
  200. var haveSentResponseHeaders: Bool = false
  201. /// The promise we are using to bridge the NIO and async-await worlds.
  202. ///
  203. /// It is the mechanism that we use to run a callback when the user handler has completed.
  204. /// The promise is not passed to the user handler directly. Instead it is fulfilled with the
  205. /// result of the async `Task` executing the user handler using `completeWithTask(_:)`.
  206. ///
  207. /// - TODO: It shouldn't really be necessary to stash this promise here. Specifically it is
  208. /// never used anywhere when the `.active` enum value is accessed. However, if we do not store
  209. /// it here then the tests periodically segfault. This appears to be a reference counting bug
  210. /// in Swift and/or NIO since it should have been captured by `completeWithTask(_:)`.
  211. let _userHandlerPromise: EventLoopPromise<Void>
  212. @usableFromInline
  213. internal init(
  214. requestStreamSource: PassthroughMessageSource<Request, Error>,
  215. context: GRPCAsyncServerCallContext,
  216. responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>,
  217. userHandlerPromise: EventLoopPromise<Void>
  218. ) {
  219. self.requestStreamSource = requestStreamSource
  220. self.context = context
  221. self.responseStreamWriter = responseStreamWriter
  222. self._userHandlerPromise = userHandlerPromise
  223. }
  224. }
  225. /// Headers have been received and an async `Task` has been created to execute the user handler.
  226. case active(ActiveState)
  227. /// The handler has completed.
  228. case completed
  229. }
  230. @inlinable
  231. public init(
  232. context: CallHandlerContext,
  233. requestDeserializer: Deserializer,
  234. responseSerializer: Serializer,
  235. interceptors: [ServerInterceptor<Request, Response>],
  236. userHandler: @escaping @Sendable(
  237. GRPCAsyncRequestStream<Request>,
  238. GRPCAsyncResponseStreamWriter<Response>,
  239. GRPCAsyncServerCallContext
  240. ) async throws -> Void
  241. ) {
  242. self.serializer = responseSerializer
  243. self.deserializer = requestDeserializer
  244. self.context = context
  245. self.userHandler = userHandler
  246. let userInfoRef = Ref(UserInfo())
  247. self.userInfoRef = userInfoRef
  248. self.interceptors = ServerInterceptorPipeline(
  249. logger: context.logger,
  250. eventLoop: context.eventLoop,
  251. path: context.path,
  252. callType: .bidirectionalStreaming,
  253. remoteAddress: context.remoteAddress,
  254. userInfoRef: userInfoRef,
  255. interceptors: interceptors,
  256. onRequestPart: self.receiveInterceptedPart(_:),
  257. onResponsePart: self.sendInterceptedPart(_:promise:)
  258. )
  259. }
  260. // MARK: - GRPCServerHandlerProtocol conformance
  261. @inlinable
  262. internal func receiveMetadata(_ headers: HPACKHeaders) {
  263. self.interceptors.receive(.metadata(headers))
  264. }
  265. @inlinable
  266. internal func receiveMessage(_ bytes: ByteBuffer) {
  267. do {
  268. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  269. self.interceptors.receive(.message(message))
  270. } catch {
  271. self.handleError(error)
  272. }
  273. }
  274. @inlinable
  275. internal func receiveEnd() {
  276. self.interceptors.receive(.end)
  277. }
  278. @inlinable
  279. internal func receiveError(_ error: Error) {
  280. self.handleError(error)
  281. self.finish()
  282. }
  283. @inlinable
  284. internal func finish() {
  285. switch self.state {
  286. case .idle:
  287. self.interceptors = nil
  288. self.state = .completed
  289. case .active:
  290. self.state = .completed
  291. self.interceptors = nil
  292. self.userHandlerTask?.cancel()
  293. case .completed:
  294. self.interceptors = nil
  295. }
  296. }
  297. // MARK: - Interceptors to User Function
  298. @inlinable
  299. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  300. switch part {
  301. case let .metadata(headers):
  302. self.receiveInterceptedMetadata(headers)
  303. case let .message(message):
  304. self.receiveInterceptedMessage(message)
  305. case .end:
  306. self.receiveInterceptedEnd()
  307. }
  308. }
  309. @inlinable
  310. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  311. switch self.state {
  312. case .idle:
  313. // Make a context to invoke the user handler with.
  314. let context = GRPCAsyncServerCallContext(
  315. headers: headers,
  316. logger: self.context.logger,
  317. userInfoRef: self.userInfoRef
  318. )
  319. // Create a source for our request stream.
  320. let requestStreamSource = PassthroughMessageSource<Request, Error>()
  321. // Create a promise to hang a callback off when the user handler completes.
  322. let userHandlerPromise: EventLoopPromise<Void> = self.context.eventLoop.makePromise()
  323. // Create a request stream from our stream source to pass to the user handler.
  324. let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
  325. // TODO: In future use `AsyncWriter.init(maxPendingElements:maxWritesBeforeYield:delegate:)`?
  326. let responseStreamWriter =
  327. GRPCAsyncResponseStreamWriter(
  328. wrapping: AsyncWriter(delegate: AsyncResponseStreamWriterDelegate(
  329. context: context,
  330. compressionIsEnabled: self.context.encoding.isEnabled,
  331. send: self.interceptResponse(_:metadata:),
  332. finish: self.responseStreamDrained(_:)
  333. ))
  334. )
  335. // Set the state to active and bundle in all the associated data.
  336. self.state = .active(.init(
  337. requestStreamSource: requestStreamSource,
  338. context: context,
  339. responseStreamWriter: responseStreamWriter,
  340. userHandlerPromise: userHandlerPromise
  341. ))
  342. // Register callback for the completion of the user handler.
  343. userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:))
  344. // Spin up a task to call the async user handler.
  345. self.userHandlerTask = userHandlerPromise.completeWithTask {
  346. return try await withTaskCancellationHandler {
  347. do {
  348. // When the user handler completes we invalidate the request stream source.
  349. defer { requestStreamSource.finish() }
  350. // Call the user handler.
  351. try await self.userHandler(requestStream, responseStreamWriter, context)
  352. } catch let status as GRPCStatus where status.isOk {
  353. // The user handler throwing `GRPCStatus.ok` is considered to be invalid.
  354. await responseStreamWriter.asyncWriter.cancel()
  355. throw GRPCStatus(
  356. code: .unknown,
  357. message: "Handler threw GRPCStatus error with code .ok"
  358. )
  359. } catch {
  360. await responseStreamWriter.asyncWriter.cancel()
  361. throw error
  362. }
  363. // Wait for the response stream writer to finish writing its responses.
  364. try await responseStreamWriter.asyncWriter.finish(.ok)
  365. } onCancel: {
  366. /// The task being cancelled from outside is the signal to this task that an error has
  367. /// occured and we should abort the user handler.
  368. ///
  369. /// Adopters are encouraged to cooperatively check for cancellation in their handlers but
  370. /// we cannot rely on this.
  371. ///
  372. /// We additionally signal the handler that an error has occured by terminating the source
  373. /// backing the request stream that the user handler is consuming.
  374. ///
  375. /// - NOTE: This handler has different semantics from the extant non-async-await handlers
  376. /// where the `statusPromise` was explicitly failed with `GRPCStatus.unavailable` from
  377. /// _outside_ the user handler. Here we terminate the request stream with a
  378. /// `CancellationError` which manifests _inside_ the user handler when it tries to access
  379. /// the next request in the stream. We have no control over the implementation of the user
  380. /// handler. It may choose to handle this error or not. In the event that the handler
  381. /// either rethrows or does not handle the error, this will be converted to a
  382. /// `GRPCStatus.unknown` by `handleError(_:)`. Yielding a `CancellationError` _inside_
  383. /// the user handler feels like the clearest semantics of what we want--"the RPC has an
  384. /// error, cancel whatever you're doing." If we want to preserve the API of the
  385. /// non-async-await handlers in this error flow we could add conformance to
  386. /// `GRPCStatusTransformable` to `CancellationError`, but we still cannot control _how_
  387. /// the user handler will handle the `CancellationError` which could even be swallowed.
  388. ///
  389. /// - NOTE: Currently we _have_ added `GRPCStatusTransformable` conformance to
  390. /// `CancellationError` to convert it into `GRPCStatus.unavailable` and expect to
  391. /// document that user handlers should always rethrow `CacellationError` if handled, after
  392. /// optional cleanup.
  393. requestStreamSource.finish(throwing: CancellationError())
  394. /// Cancel the writer here to drop any pending responses.
  395. responseStreamWriter.asyncWriter.cancelAsynchronously()
  396. }
  397. }
  398. case .active:
  399. self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
  400. case .completed:
  401. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  402. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  403. // Dropping them is fine.
  404. ()
  405. }
  406. }
  407. @inlinable
  408. internal func receiveInterceptedMessage(_ request: Request) {
  409. switch self.state {
  410. case .idle:
  411. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  412. case let .active(activeState):
  413. switch activeState.requestStreamSource.yield(request) {
  414. case .accepted(queueDepth: _):
  415. // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`.
  416. break
  417. case .dropped:
  418. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  419. /// if the request stream source has already terminated then it must have been the result of
  420. /// receiving `.end`. Therefore this `.message` must have been sent by the client after it
  421. /// sent `.end`, which is a protocol violation.
  422. self.handleError(GRPCError.ProtocolViolation("Message received after end of stream"))
  423. }
  424. case .completed:
  425. // We received a message but we're already done: this may happen if we terminate the RPC
  426. // due to a channel error, for example.
  427. ()
  428. }
  429. }
  430. @inlinable
  431. internal func receiveInterceptedEnd() {
  432. switch self.state {
  433. case .idle:
  434. self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
  435. case let .active(activeState):
  436. switch activeState.requestStreamSource.finish() {
  437. case .accepted(queueDepth: _):
  438. break
  439. case .dropped:
  440. // The task executing the user handler will finish the request stream source after the
  441. // user handler completes. If that's the case we will drop the end-of-stream here.
  442. break
  443. }
  444. case .completed:
  445. // We received a message but we're already done: this may happen if we terminate the RPC
  446. // due to a channel error, for example.
  447. ()
  448. }
  449. }
  450. // MARK: - User Function To Interceptors
  451. @inlinable
  452. internal func _interceptResponse(_ response: Response, metadata: MessageMetadata) {
  453. self.context.eventLoop.assertInEventLoop()
  454. switch self.state {
  455. case .idle:
  456. // The user handler cannot send responses before it has been invoked.
  457. preconditionFailure()
  458. case var .active(activeState):
  459. if !activeState.haveSentResponseHeaders {
  460. activeState.haveSentResponseHeaders = true
  461. self.state = .active(activeState)
  462. // Send response headers back via the interceptors.
  463. self.interceptors.send(.metadata(activeState.context.initialResponseMetadata), promise: nil)
  464. }
  465. // Send the response back via the interceptors.
  466. self.interceptors.send(.message(response, metadata), promise: nil)
  467. case .completed:
  468. /// If we are in the completed state then the async writer delegate will have been cancelled,
  469. /// however the cancellation is asynchronous so there's a chance that we receive this callback
  470. /// after that has happened. We can drop the response.
  471. ()
  472. }
  473. }
  474. @Sendable
  475. @inlinable
  476. internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
  477. if self.context.eventLoop.inEventLoop {
  478. self._interceptResponse(response, metadata: metadata)
  479. } else {
  480. self.context.eventLoop.execute {
  481. self._interceptResponse(response, metadata: metadata)
  482. }
  483. }
  484. }
  485. @inlinable
  486. internal func userHandlerCompleted(_ result: Result<Void, Error>) {
  487. switch self.state {
  488. case .idle:
  489. // The user handler cannot complete before it is invoked.
  490. preconditionFailure()
  491. case .active:
  492. switch result {
  493. case .success:
  494. /// The user handler has completed successfully.
  495. /// We don't take any action here; the state transition and termination of the message
  496. /// stream happen when the response stream has drained, in the response stream writer
  497. /// delegate callback, `responseStreamDrained(_:)`.
  498. break
  499. case let .failure(error):
  500. self.handleError(error, thrownFromHandler: true)
  501. }
  502. case .completed:
  503. ()
  504. }
  505. }
  506. @inlinable
  507. internal func _responseStreamDrained(_ status: GRPCStatus) {
  508. self.context.eventLoop.assertInEventLoop()
  509. switch self.state {
  510. case .idle:
  511. preconditionFailure()
  512. case let .active(activeState):
  513. // Now we have drained the response stream writer from the user handler we can send end.
  514. self.state = .completed
  515. self.interceptors.send(
  516. .end(status, activeState.context.trailingResponseMetadata),
  517. promise: nil
  518. )
  519. case .completed:
  520. ()
  521. }
  522. }
  523. @Sendable
  524. @inlinable
  525. internal func responseStreamDrained(_ status: GRPCStatus) {
  526. if self.context.eventLoop.inEventLoop {
  527. self._responseStreamDrained(status)
  528. } else {
  529. self.context.eventLoop.execute {
  530. self._responseStreamDrained(status)
  531. }
  532. }
  533. }
  534. @inlinable
  535. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  536. switch self.state {
  537. case .idle:
  538. assert(!isHandlerError)
  539. self.state = .completed
  540. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  541. error,
  542. delegate: self.context.errorDelegate
  543. )
  544. self.interceptors.send(.end(status, trailers), promise: nil)
  545. case let .active(activeState):
  546. self.state = .completed
  547. // If we have an async task, then cancel it, which will terminate the request stream from
  548. // which it is reading and give the user handler an opportunity to cleanup.
  549. self.userHandlerTask?.cancel()
  550. let status: GRPCStatus
  551. let trailers: HPACKHeaders
  552. if isHandlerError {
  553. (status, trailers) = ServerErrorProcessor.processObserverError(
  554. error,
  555. headers: activeState.context.requestMetadata,
  556. trailers: activeState.context.trailingResponseMetadata,
  557. delegate: self.context.errorDelegate
  558. )
  559. } else {
  560. (status, trailers) = ServerErrorProcessor.processLibraryError(
  561. error,
  562. delegate: self.context.errorDelegate
  563. )
  564. }
  565. // TODO: This doesn't go via the user handler task.
  566. self.interceptors.send(.end(status, trailers), promise: nil)
  567. case .completed:
  568. ()
  569. }
  570. }
  571. @inlinable
  572. internal func sendInterceptedPart(
  573. _ part: GRPCServerResponsePart<Response>,
  574. promise: EventLoopPromise<Void>?
  575. ) {
  576. switch part {
  577. case let .metadata(headers):
  578. self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
  579. case let .message(message, metadata):
  580. do {
  581. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  582. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  583. } catch {
  584. // Serialization failed: fail the promise and send end.
  585. promise?.fail(error)
  586. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  587. error,
  588. delegate: self.context.errorDelegate
  589. )
  590. // Loop back via the interceptors.
  591. self.interceptors.send(.end(status, trailers), promise: nil)
  592. }
  593. case let .end(status, trailers):
  594. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  595. }
  596. }
  597. }
  598. #endif