GRPCAsyncServerHandler.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5) && canImport(_Concurrency)
  17. import NIOCore
  18. import NIOHPACK
  19. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  20. public struct GRPCAsyncServerHandler<
  21. Serializer: MessageSerializer,
  22. Deserializer: MessageDeserializer
  23. >: GRPCServerHandlerProtocol {
  24. @usableFromInline
  25. internal let _handler: AsyncServerHandler<Serializer, Deserializer>
  26. public func receiveMetadata(_ metadata: HPACKHeaders) {
  27. self._handler.receiveMetadata(metadata)
  28. }
  29. public func receiveMessage(_ bytes: ByteBuffer) {
  30. self._handler.receiveMessage(bytes)
  31. }
  32. public func receiveEnd() {
  33. self._handler.receiveEnd()
  34. }
  35. public func receiveError(_ error: Error) {
  36. self._handler.receiveError(error)
  37. }
  38. public func finish() {
  39. self._handler.finish()
  40. }
  41. }
  42. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  43. extension GRPCAsyncServerHandler {
  44. public typealias Request = Deserializer.Output
  45. public typealias Response = Serializer.Input
  46. @inlinable
  47. public init(
  48. context: CallHandlerContext,
  49. requestDeserializer: Deserializer,
  50. responseSerializer: Serializer,
  51. interceptors: [ServerInterceptor<Request, Response>],
  52. wrapping unary: @escaping @Sendable(Request, GRPCAsyncServerCallContext) async throws
  53. -> Response
  54. ) {
  55. self._handler = .init(
  56. context: context,
  57. requestDeserializer: requestDeserializer,
  58. responseSerializer: responseSerializer,
  59. interceptors: interceptors,
  60. userHandler: { requestStream, responseStreamWriter, context in
  61. var iterator = requestStream.makeAsyncIterator()
  62. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  63. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  64. }
  65. let response = try await unary(request, context)
  66. try await responseStreamWriter.send(response)
  67. }
  68. )
  69. }
  70. @inlinable
  71. public init(
  72. context: CallHandlerContext,
  73. requestDeserializer: Deserializer,
  74. responseSerializer: Serializer,
  75. interceptors: [ServerInterceptor<Request, Response>],
  76. wrapping clientStreaming: @escaping @Sendable(
  77. GRPCAsyncRequestStream<Request>,
  78. GRPCAsyncServerCallContext
  79. ) async throws -> Response
  80. ) {
  81. self._handler = .init(
  82. context: context,
  83. requestDeserializer: requestDeserializer,
  84. responseSerializer: responseSerializer,
  85. interceptors: interceptors,
  86. userHandler: { requestStream, responseStreamWriter, context in
  87. let response = try await clientStreaming(requestStream, context)
  88. try await responseStreamWriter.send(response)
  89. }
  90. )
  91. }
  92. @inlinable
  93. public init(
  94. context: CallHandlerContext,
  95. requestDeserializer: Deserializer,
  96. responseSerializer: Serializer,
  97. interceptors: [ServerInterceptor<Request, Response>],
  98. wrapping serverStreaming: @escaping @Sendable(
  99. Request,
  100. GRPCAsyncResponseStreamWriter<Response>,
  101. GRPCAsyncServerCallContext
  102. ) async throws -> Void
  103. ) {
  104. self._handler = .init(
  105. context: context,
  106. requestDeserializer: requestDeserializer,
  107. responseSerializer: responseSerializer,
  108. interceptors: interceptors,
  109. userHandler: { requestStream, responseStreamWriter, context in
  110. var iterator = requestStream.makeAsyncIterator()
  111. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  112. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  113. }
  114. try await serverStreaming(request, responseStreamWriter, context)
  115. }
  116. )
  117. }
  118. @inlinable
  119. public init(
  120. context: CallHandlerContext,
  121. requestDeserializer: Deserializer,
  122. responseSerializer: Serializer,
  123. interceptors: [ServerInterceptor<Request, Response>],
  124. wrapping bidirectional: @escaping @Sendable(
  125. GRPCAsyncRequestStream<Request>,
  126. GRPCAsyncResponseStreamWriter<Response>,
  127. GRPCAsyncServerCallContext
  128. ) async throws -> Void
  129. ) {
  130. self._handler = .init(
  131. context: context,
  132. requestDeserializer: requestDeserializer,
  133. responseSerializer: responseSerializer,
  134. interceptors: interceptors,
  135. userHandler: bidirectional
  136. )
  137. }
  138. }
  139. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  140. @usableFromInline
  141. internal final class AsyncServerHandler<
  142. Serializer: MessageSerializer,
  143. Deserializer: MessageDeserializer
  144. >: GRPCServerHandlerProtocol {
  145. @usableFromInline
  146. internal typealias Request = Deserializer.Output
  147. @usableFromInline
  148. internal typealias Response = Serializer.Input
  149. /// A response serializer.
  150. @usableFromInline
  151. internal let serializer: Serializer
  152. /// A request deserializer.
  153. @usableFromInline
  154. internal let deserializer: Deserializer
  155. /// A pipeline of user provided interceptors.
  156. @usableFromInline
  157. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  158. /// The context required in order create the function.
  159. @usableFromInline
  160. internal let context: CallHandlerContext
  161. /// A reference to a `UserInfo`.
  162. @usableFromInline
  163. internal let userInfoRef: Ref<UserInfo>
  164. /// The user provided function to execute.
  165. @usableFromInline
  166. internal let userHandler: (
  167. GRPCAsyncRequestStream<Request>,
  168. GRPCAsyncResponseStreamWriter<Response>,
  169. GRPCAsyncServerCallContext
  170. ) async throws -> Void
  171. /// The state of the handler.
  172. @usableFromInline
  173. internal var state: State = .idle
  174. /// The task used to run the async user handler.
  175. ///
  176. /// - TODO: I'd like it if this was part of the assoc data for the .active state but doing so may introduce a race condition.
  177. @usableFromInline
  178. internal var userHandlerTask: Task<Void, Never>? = nil
  179. @usableFromInline
  180. internal enum State {
  181. /// No headers have been received.
  182. case idle
  183. @usableFromInline
  184. internal struct ActiveState {
  185. /// The source backing the request stream that is being consumed by the user handler.
  186. @usableFromInline
  187. let requestStreamSource: PassthroughMessageSource<Request, Error>
  188. /// The call context that was passed to the user handler.
  189. @usableFromInline
  190. let context: GRPCAsyncServerCallContext
  191. /// The response stream writer that is being used by the user handler.
  192. ///
  193. /// Because this is pausable, it may contain responses after the user handler has completed
  194. /// that have yet to be written. However we will remain in the `.active` state until the
  195. /// response stream writer has completed.
  196. @usableFromInline
  197. let responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>
  198. /// The response headers have been sent back to the client via the interceptors.
  199. @usableFromInline
  200. var haveSentResponseHeaders: Bool = false
  201. /// The promise we are using to bridge the NIO and async-await worlds.
  202. ///
  203. /// It is the mechanism that we use to run a callback when the user handler has completed.
  204. /// The promise is not passed to the user handler directly. Instead it is fulfilled with the
  205. /// result of the async `Task` executing the user handler using `completeWithTask(_:)`.
  206. ///
  207. /// - TODO: It shouldn't really be necessary to stash this promise here. Specifically it is
  208. /// never used anywhere when the `.active` enum value is accessed. However, if we do not store
  209. /// it here then the tests periodically segfault. This appears to be a reference counting bug
  210. /// in Swift and/or NIO since it should have been captured by `completeWithTask(_:)`.
  211. let _userHandlerPromise: EventLoopPromise<Void>
  212. @usableFromInline
  213. internal init(
  214. requestStreamSource: PassthroughMessageSource<Request, Error>,
  215. context: GRPCAsyncServerCallContext,
  216. responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>,
  217. userHandlerPromise: EventLoopPromise<Void>
  218. ) {
  219. self.requestStreamSource = requestStreamSource
  220. self.context = context
  221. self.responseStreamWriter = responseStreamWriter
  222. self._userHandlerPromise = userHandlerPromise
  223. }
  224. }
  225. /// Headers have been received and an async `Task` has been created to execute the user handler.
  226. case active(ActiveState)
  227. /// The handler has completed.
  228. case completed
  229. }
  230. @inlinable
  231. public init(
  232. context: CallHandlerContext,
  233. requestDeserializer: Deserializer,
  234. responseSerializer: Serializer,
  235. interceptors: [ServerInterceptor<Request, Response>],
  236. userHandler: @escaping @Sendable(
  237. GRPCAsyncRequestStream<Request>,
  238. GRPCAsyncResponseStreamWriter<Response>,
  239. GRPCAsyncServerCallContext
  240. ) async throws -> Void
  241. ) {
  242. self.serializer = responseSerializer
  243. self.deserializer = requestDeserializer
  244. self.context = context
  245. self.userHandler = userHandler
  246. let userInfoRef = Ref(UserInfo())
  247. self.userInfoRef = userInfoRef
  248. self.interceptors = ServerInterceptorPipeline(
  249. logger: context.logger,
  250. eventLoop: context.eventLoop,
  251. path: context.path,
  252. callType: .bidirectionalStreaming,
  253. remoteAddress: context.remoteAddress,
  254. userInfoRef: userInfoRef,
  255. interceptors: interceptors,
  256. onRequestPart: self.receiveInterceptedPart(_:),
  257. onResponsePart: self.sendInterceptedPart(_:promise:)
  258. )
  259. }
  260. // MARK: - GRPCServerHandlerProtocol conformance
  261. @inlinable
  262. internal func receiveMetadata(_ headers: HPACKHeaders) {
  263. self.interceptors.receive(.metadata(headers))
  264. }
  265. @inlinable
  266. internal func receiveMessage(_ bytes: ByteBuffer) {
  267. do {
  268. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  269. self.interceptors.receive(.message(message))
  270. } catch {
  271. self.handleError(error)
  272. }
  273. }
  274. @inlinable
  275. internal func receiveEnd() {
  276. self.interceptors.receive(.end)
  277. }
  278. @inlinable
  279. internal func receiveError(_ error: Error) {
  280. self.handleError(error)
  281. self.finish()
  282. }
  283. @inlinable
  284. internal func finish() {
  285. switch self.state {
  286. case .idle:
  287. self.interceptors = nil
  288. self.state = .completed
  289. case .active:
  290. self.state = .completed
  291. self.interceptors = nil
  292. self.userHandlerTask?.cancel()
  293. case .completed:
  294. self.interceptors = nil
  295. }
  296. }
  297. // MARK: - Interceptors to User Function
  298. @inlinable
  299. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  300. switch part {
  301. case let .metadata(headers):
  302. self.receiveInterceptedMetadata(headers)
  303. case let .message(message):
  304. self.receiveInterceptedMessage(message)
  305. case .end:
  306. self.receiveInterceptedEnd()
  307. }
  308. }
  309. @inlinable
  310. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  311. switch self.state {
  312. case .idle:
  313. // Make a context to invoke the user handler with.
  314. let context = GRPCAsyncServerCallContext(
  315. headers: headers,
  316. logger: self.context.logger,
  317. userInfoRef: self.userInfoRef
  318. )
  319. // Create a source for our request stream.
  320. let requestStreamSource = PassthroughMessageSource<Request, Error>()
  321. // Create a promise to hang a callback off when the user handler completes.
  322. let userHandlerPromise: EventLoopPromise<Void> = self.context.eventLoop.makePromise()
  323. // Create a request stream from our stream source to pass to the user handler.
  324. let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
  325. // TODO: In future use `AsyncWriter.init(maxPendingElements:maxWritesBeforeYield:delegate:)`?
  326. let responseStreamWriter =
  327. GRPCAsyncResponseStreamWriter(
  328. wrapping: AsyncWriter(delegate: AsyncResponseStreamWriterDelegate(
  329. context: context,
  330. compressionIsEnabled: self.context.encoding.isEnabled,
  331. send: self.interceptResponse(_:metadata:),
  332. finish: self.responseStreamDrained(_:)
  333. ))
  334. )
  335. // Set the state to active and bundle in all the associated data.
  336. self.state = .active(.init(
  337. requestStreamSource: requestStreamSource,
  338. context: context,
  339. responseStreamWriter: responseStreamWriter,
  340. userHandlerPromise: userHandlerPromise
  341. ))
  342. // Register callback for the completion of the user handler.
  343. userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:))
  344. // Spin up a task to call the async user handler.
  345. self.userHandlerTask = userHandlerPromise.completeWithTask {
  346. return try await withTaskCancellationHandler {
  347. do {
  348. // When the user handler completes we invalidate the request stream source.
  349. defer { requestStreamSource.finish() }
  350. // Call the user handler.
  351. try await self.userHandler(requestStream, responseStreamWriter, context)
  352. } catch let status as GRPCStatus where status.isOk {
  353. // The user handler throwing `GRPCStatus.ok` is considered to be invalid.
  354. await responseStreamWriter.asyncWriter.cancel()
  355. throw GRPCStatus(
  356. code: .unknown,
  357. message: "Handler threw GRPCStatus error with code .ok"
  358. )
  359. } catch {
  360. await responseStreamWriter.asyncWriter.cancel()
  361. throw error
  362. }
  363. // Wait for the response stream writer to finish writing its responses.
  364. try await responseStreamWriter.asyncWriter.finish(.ok)
  365. } onCancel: {
  366. /// The task being cancelled from outside is the signal to this task that an error has
  367. /// occured and we should abort the user handler.
  368. ///
  369. /// Adopters are encouraged to cooperatively check for cancellation in their handlers but
  370. /// we cannot rely on this.
  371. ///
  372. /// We additionally signal the handler that an error has occured by terminating the source
  373. /// backing the request stream that the user handler is consuming.
  374. ///
  375. /// - NOTE: This handler has different semantics from the extant non-async-await handlers
  376. /// where the `statusPromise` was explicitly failed with `GRPCStatus.unavailable` from
  377. /// _outside_ the user handler. Here we terminate the request stream with a
  378. /// `CancellationError` which manifests _inside_ the user handler when it tries to access
  379. /// the next request in the stream. We have no control over the implementation of the user
  380. /// handler. It may choose to handle this error or not. In the event that the handler
  381. /// either rethrows or does not handle the error, this will be converted to a
  382. /// `GRPCStatus.unknown` by `handleError(_:)`. Yielding a `CancellationError` _inside_
  383. /// the user handler feels like the clearest semantics of what we want--"the RPC has an
  384. /// error, cancel whatever you're doing." If we want to preserve the API of the
  385. /// non-async-await handlers in this error flow we could add conformance to
  386. /// `GRPCStatusTransformable` to `CancellationError`, but we still cannot control _how_
  387. /// the user handler will handle the `CancellationError` which could even be swallowed.
  388. ///
  389. /// - NOTE: Currently we _have_ added `GRPCStatusTransformable` conformance to
  390. /// `CancellationError` to convert it into `GRPCStatus.unavailable` and expect to
  391. /// document that user handlers should always rethrow `CacellationError` if handled, after
  392. /// optional cleanup.
  393. requestStreamSource.finish(throwing: CancellationError())
  394. /// Cancel the writer here to drop any pending responses.
  395. responseStreamWriter.asyncWriter.cancelAsynchronously()
  396. }
  397. }
  398. case .active:
  399. self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
  400. case .completed:
  401. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  402. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  403. // Dropping them is fine.
  404. ()
  405. }
  406. }
  407. @inlinable
  408. internal func receiveInterceptedMessage(_ request: Request) {
  409. switch self.state {
  410. case .idle:
  411. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  412. case let .active(activeState):
  413. switch activeState.requestStreamSource.yield(request) {
  414. case .accepted(queueDepth: _):
  415. // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`.
  416. break
  417. case .dropped:
  418. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  419. /// if the request stream source has already terminated then it must have been the result of
  420. /// receiving `.end`. Therefore this `.message` must have been sent by the client after it
  421. /// sent `.end`, which is a protocol violation.
  422. self.handleError(GRPCError.ProtocolViolation("Message received after end of stream"))
  423. }
  424. case .completed:
  425. // We received a message but we're already done: this may happen if we terminate the RPC
  426. // due to a channel error, for example.
  427. ()
  428. }
  429. }
  430. @inlinable
  431. internal func receiveInterceptedEnd() {
  432. switch self.state {
  433. case .idle:
  434. self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
  435. case let .active(activeState):
  436. switch activeState.requestStreamSource.finish() {
  437. case .accepted(queueDepth: _):
  438. break
  439. case .dropped:
  440. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  441. /// if the request stream source has already terminated then it must have been the result of
  442. /// receiving `.end`. Therefore this `.end` must have been sent by the client after it
  443. /// already sent `.end`, which is a protocol violation.
  444. self.handleError(GRPCError.ProtocolViolation("Message duplicate end of stream"))
  445. }
  446. case .completed:
  447. // We received a message but we're already done: this may happen if we terminate the RPC
  448. // due to a channel error, for example.
  449. ()
  450. }
  451. }
  452. // MARK: - User Function To Interceptors
  453. @inlinable
  454. internal func _interceptResponse(_ response: Response, metadata: MessageMetadata) {
  455. self.context.eventLoop.assertInEventLoop()
  456. switch self.state {
  457. case .idle:
  458. // The user handler cannot send responses before it has been invoked.
  459. preconditionFailure()
  460. case var .active(activeState):
  461. if !activeState.haveSentResponseHeaders {
  462. activeState.haveSentResponseHeaders = true
  463. self.state = .active(activeState)
  464. // Send response headers back via the interceptors.
  465. self.interceptors.send(.metadata(activeState.context.initialResponseMetadata), promise: nil)
  466. }
  467. // Send the response back via the interceptors.
  468. self.interceptors.send(.message(response, metadata), promise: nil)
  469. case .completed:
  470. /// If we are in the completed state then the async writer delegate will have been cancelled,
  471. /// however the cancellation is asynchronous so there's a chance that we receive this callback
  472. /// after that has happened. We can drop the response.
  473. ()
  474. }
  475. }
  476. @inlinable
  477. internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
  478. if self.context.eventLoop.inEventLoop {
  479. self._interceptResponse(response, metadata: metadata)
  480. } else {
  481. self.context.eventLoop.execute {
  482. self._interceptResponse(response, metadata: metadata)
  483. }
  484. }
  485. }
  486. @inlinable
  487. internal func userHandlerCompleted(_ result: Result<Void, Error>) {
  488. switch self.state {
  489. case .idle:
  490. // The user handler cannot complete before it is invoked.
  491. preconditionFailure()
  492. case .active:
  493. switch result {
  494. case .success:
  495. /// The user handler has completed successfully.
  496. /// We don't take any action here; the state transition and termination of the message
  497. /// stream happen when the response stream has drained, in the response stream writer
  498. /// delegate callback, `responseStreamDrained(_:)`.
  499. break
  500. case let .failure(error):
  501. self.handleError(error, thrownFromHandler: true)
  502. }
  503. case .completed:
  504. ()
  505. }
  506. }
  507. @inlinable
  508. internal func _responseStreamDrained(_ status: GRPCStatus) {
  509. self.context.eventLoop.assertInEventLoop()
  510. switch self.state {
  511. case .idle:
  512. preconditionFailure()
  513. case let .active(activeState):
  514. // Now we have drained the response stream writer from the user handler we can send end.
  515. self.state = .completed
  516. self.interceptors.send(
  517. .end(status, activeState.context.trailingResponseMetadata),
  518. promise: nil
  519. )
  520. case .completed:
  521. ()
  522. }
  523. }
  524. @inlinable
  525. internal func responseStreamDrained(_ status: GRPCStatus) {
  526. if self.context.eventLoop.inEventLoop {
  527. self._responseStreamDrained(status)
  528. } else {
  529. self.context.eventLoop.execute {
  530. self._responseStreamDrained(status)
  531. }
  532. }
  533. }
  534. @inlinable
  535. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  536. switch self.state {
  537. case .idle:
  538. assert(!isHandlerError)
  539. self.state = .completed
  540. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  541. error,
  542. delegate: self.context.errorDelegate
  543. )
  544. self.interceptors.send(.end(status, trailers), promise: nil)
  545. case let .active(activeState):
  546. self.state = .completed
  547. // If we have an async task, then cancel it, which will terminate the request stream from
  548. // which it is reading and give the user handler an opportunity to cleanup.
  549. self.userHandlerTask?.cancel()
  550. let status: GRPCStatus
  551. let trailers: HPACKHeaders
  552. if isHandlerError {
  553. (status, trailers) = ServerErrorProcessor.processObserverError(
  554. error,
  555. headers: activeState.context.requestMetadata,
  556. trailers: activeState.context.trailingResponseMetadata,
  557. delegate: self.context.errorDelegate
  558. )
  559. } else {
  560. (status, trailers) = ServerErrorProcessor.processLibraryError(
  561. error,
  562. delegate: self.context.errorDelegate
  563. )
  564. }
  565. // TODO: This doesn't go via the user handler task.
  566. self.interceptors.send(.end(status, trailers), promise: nil)
  567. case .completed:
  568. ()
  569. }
  570. }
  571. @inlinable
  572. internal func sendInterceptedPart(
  573. _ part: GRPCServerResponsePart<Response>,
  574. promise: EventLoopPromise<Void>?
  575. ) {
  576. switch part {
  577. case let .metadata(headers):
  578. self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
  579. case let .message(message, metadata):
  580. do {
  581. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  582. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  583. } catch {
  584. // Serialization failed: fail the promise and send end.
  585. promise?.fail(error)
  586. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  587. error,
  588. delegate: self.context.errorDelegate
  589. )
  590. // Loop back via the interceptors.
  591. self.interceptors.send(.end(status, trailers), promise: nil)
  592. }
  593. case let .end(status, trailers):
  594. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  595. }
  596. }
  597. }
  598. #endif