GRPCAsyncServerHandler.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5)
  17. import _NIOConcurrency
  18. import NIOCore
  19. import NIOHPACK
  20. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  21. public struct GRPCAsyncServerHandler<
  22. Serializer: MessageSerializer,
  23. Deserializer: MessageDeserializer
  24. >: GRPCServerHandlerProtocol {
  25. @usableFromInline
  26. internal let _handler: AsyncServerHandler<Serializer, Deserializer>
  27. public func receiveMetadata(_ metadata: HPACKHeaders) {
  28. self._handler.receiveMetadata(metadata)
  29. }
  30. public func receiveMessage(_ bytes: ByteBuffer) {
  31. self._handler.receiveMessage(bytes)
  32. }
  33. public func receiveEnd() {
  34. self._handler.receiveEnd()
  35. }
  36. public func receiveError(_ error: Error) {
  37. self._handler.receiveError(error)
  38. }
  39. public func finish() {
  40. self._handler.finish()
  41. }
  42. }
  43. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  44. extension GRPCAsyncServerHandler {
  45. public typealias Request = Deserializer.Output
  46. public typealias Response = Serializer.Input
  47. @inlinable
  48. public init(
  49. context: CallHandlerContext,
  50. requestDeserializer: Deserializer,
  51. responseSerializer: Serializer,
  52. interceptors: [ServerInterceptor<Request, Response>],
  53. wrapping unary: @escaping @Sendable(Request, GRPCAsyncServerCallContext) async throws
  54. -> Response
  55. ) {
  56. self._handler = .init(
  57. context: context,
  58. requestDeserializer: requestDeserializer,
  59. responseSerializer: responseSerializer,
  60. interceptors: interceptors,
  61. userHandler: { requestStream, responseStreamWriter, context in
  62. var iterator = requestStream.makeAsyncIterator()
  63. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  64. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  65. }
  66. let response = try await unary(request, context)
  67. try await responseStreamWriter.send(response)
  68. }
  69. )
  70. }
  71. @inlinable
  72. public init(
  73. context: CallHandlerContext,
  74. requestDeserializer: Deserializer,
  75. responseSerializer: Serializer,
  76. interceptors: [ServerInterceptor<Request, Response>],
  77. wrapping clientStreaming: @escaping @Sendable(
  78. GRPCAsyncRequestStream<Request>,
  79. GRPCAsyncServerCallContext
  80. ) async throws -> Response
  81. ) {
  82. self._handler = .init(
  83. context: context,
  84. requestDeserializer: requestDeserializer,
  85. responseSerializer: responseSerializer,
  86. interceptors: interceptors,
  87. userHandler: { requestStream, responseStreamWriter, context in
  88. let response = try await clientStreaming(requestStream, context)
  89. try await responseStreamWriter.send(response)
  90. }
  91. )
  92. }
  93. @inlinable
  94. public init(
  95. context: CallHandlerContext,
  96. requestDeserializer: Deserializer,
  97. responseSerializer: Serializer,
  98. interceptors: [ServerInterceptor<Request, Response>],
  99. wrapping serverStreaming: @escaping @Sendable(
  100. Request,
  101. GRPCAsyncResponseStreamWriter<Response>,
  102. GRPCAsyncServerCallContext
  103. ) async throws -> Void
  104. ) {
  105. self._handler = .init(
  106. context: context,
  107. requestDeserializer: requestDeserializer,
  108. responseSerializer: responseSerializer,
  109. interceptors: interceptors,
  110. userHandler: { requestStream, responseStreamWriter, context in
  111. var iterator = requestStream.makeAsyncIterator()
  112. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  113. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  114. }
  115. try await serverStreaming(request, responseStreamWriter, context)
  116. }
  117. )
  118. }
  119. @inlinable
  120. public init(
  121. context: CallHandlerContext,
  122. requestDeserializer: Deserializer,
  123. responseSerializer: Serializer,
  124. interceptors: [ServerInterceptor<Request, Response>],
  125. wrapping bidirectional: @escaping @Sendable(
  126. GRPCAsyncRequestStream<Request>,
  127. GRPCAsyncResponseStreamWriter<Response>,
  128. GRPCAsyncServerCallContext
  129. ) async throws -> Void
  130. ) {
  131. self._handler = .init(
  132. context: context,
  133. requestDeserializer: requestDeserializer,
  134. responseSerializer: responseSerializer,
  135. interceptors: interceptors,
  136. userHandler: bidirectional
  137. )
  138. }
  139. }
  140. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  141. @usableFromInline
  142. internal final class AsyncServerHandler<
  143. Serializer: MessageSerializer,
  144. Deserializer: MessageDeserializer
  145. >: GRPCServerHandlerProtocol {
  146. @usableFromInline
  147. internal typealias Request = Deserializer.Output
  148. @usableFromInline
  149. internal typealias Response = Serializer.Input
  150. /// A response serializer.
  151. @usableFromInline
  152. internal let serializer: Serializer
  153. /// A request deserializer.
  154. @usableFromInline
  155. internal let deserializer: Deserializer
  156. /// A pipeline of user provided interceptors.
  157. @usableFromInline
  158. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  159. /// The context required in order create the function.
  160. @usableFromInline
  161. internal let context: CallHandlerContext
  162. /// A reference to a `UserInfo`.
  163. @usableFromInline
  164. internal let userInfoRef: Ref<UserInfo>
  165. /// The user provided function to execute.
  166. @usableFromInline
  167. internal let userHandler: (
  168. GRPCAsyncRequestStream<Request>,
  169. GRPCAsyncResponseStreamWriter<Response>,
  170. GRPCAsyncServerCallContext
  171. ) async throws -> Void
  172. /// The state of the handler.
  173. @usableFromInline
  174. internal var state: State = .idle
  175. /// The task used to run the async user handler.
  176. ///
  177. /// - TODO: I'd like it if this was part of the assoc data for the .active state but doing so may introduce a race condition.
  178. @usableFromInline
  179. internal var userHandlerTask: Task<Void, Never>? = nil
  180. @usableFromInline
  181. internal enum State {
  182. /// No headers have been received.
  183. case idle
  184. @usableFromInline
  185. internal struct ActiveState {
  186. /// The source backing the request stream that is being consumed by the user handler.
  187. @usableFromInline
  188. let requestStreamSource: PassthroughMessageSource<Request, Error>
  189. /// The call context that was passed to the user handler.
  190. @usableFromInline
  191. let context: GRPCAsyncServerCallContext
  192. /// The response stream writer that is being used by the user handler.
  193. ///
  194. /// Because this is pausable, it may contain responses after the user handler has completed
  195. /// that have yet to be written. However we will remain in the `.active` state until the
  196. /// response stream writer has completed.
  197. @usableFromInline
  198. let responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>
  199. /// The response headers have been sent back to the client via the interceptors.
  200. @usableFromInline
  201. var haveSentResponseHeaders: Bool = false
  202. /// The promise we are using to bridge the NIO and async-await worlds.
  203. ///
  204. /// It is the mechanism that we use to run a callback when the user handler has completed.
  205. /// The promise is not passed to the user handler directly. Instead it is fulfilled with the
  206. /// result of the async `Task` executing the user handler using `completeWithTask(_:)`.
  207. ///
  208. /// - TODO: It shouldn't really be necessary to stash this promise here. Specifically it is
  209. /// never used anywhere when the `.active` enum value is accessed. However, if we do not store
  210. /// it here then the tests periodically segfault. This appears to be a reference counting bug
  211. /// in Swift and/or NIO since it should have been captured by `completeWithTask(_:)`.
  212. let _userHandlerPromise: EventLoopPromise<Void>
  213. @usableFromInline
  214. internal init(
  215. requestStreamSource: PassthroughMessageSource<Request, Error>,
  216. context: GRPCAsyncServerCallContext,
  217. responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>,
  218. userHandlerPromise: EventLoopPromise<Void>
  219. ) {
  220. self.requestStreamSource = requestStreamSource
  221. self.context = context
  222. self.responseStreamWriter = responseStreamWriter
  223. self._userHandlerPromise = userHandlerPromise
  224. }
  225. }
  226. /// Headers have been received and an async `Task` has been created to execute the user handler.
  227. case active(ActiveState)
  228. /// The handler has completed.
  229. case completed
  230. }
  231. @inlinable
  232. public init(
  233. context: CallHandlerContext,
  234. requestDeserializer: Deserializer,
  235. responseSerializer: Serializer,
  236. interceptors: [ServerInterceptor<Request, Response>],
  237. userHandler: @escaping @Sendable(
  238. GRPCAsyncRequestStream<Request>,
  239. GRPCAsyncResponseStreamWriter<Response>,
  240. GRPCAsyncServerCallContext
  241. ) async throws -> Void
  242. ) {
  243. self.serializer = responseSerializer
  244. self.deserializer = requestDeserializer
  245. self.context = context
  246. self.userHandler = userHandler
  247. let userInfoRef = Ref(UserInfo())
  248. self.userInfoRef = userInfoRef
  249. self.interceptors = ServerInterceptorPipeline(
  250. logger: context.logger,
  251. eventLoop: context.eventLoop,
  252. path: context.path,
  253. callType: .bidirectionalStreaming,
  254. remoteAddress: context.remoteAddress,
  255. userInfoRef: userInfoRef,
  256. interceptors: interceptors,
  257. onRequestPart: self.receiveInterceptedPart(_:),
  258. onResponsePart: self.sendInterceptedPart(_:promise:)
  259. )
  260. }
  261. // MARK: - GRPCServerHandlerProtocol conformance
  262. @inlinable
  263. internal func receiveMetadata(_ headers: HPACKHeaders) {
  264. self.interceptors.receive(.metadata(headers))
  265. }
  266. @inlinable
  267. internal func receiveMessage(_ bytes: ByteBuffer) {
  268. do {
  269. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  270. self.interceptors.receive(.message(message))
  271. } catch {
  272. self.handleError(error)
  273. }
  274. }
  275. @inlinable
  276. internal func receiveEnd() {
  277. self.interceptors.receive(.end)
  278. }
  279. @inlinable
  280. internal func receiveError(_ error: Error) {
  281. self.handleError(error)
  282. self.finish()
  283. }
  284. @inlinable
  285. internal func finish() {
  286. switch self.state {
  287. case .idle:
  288. self.interceptors = nil
  289. self.state = .completed
  290. case .active:
  291. self.userHandlerTask?.cancel()
  292. case .completed:
  293. self.interceptors = nil
  294. }
  295. }
  296. // MARK: - Interceptors to User Function
  297. @inlinable
  298. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  299. switch part {
  300. case let .metadata(headers):
  301. self.receiveInterceptedMetadata(headers)
  302. case let .message(message):
  303. self.receiveInterceptedMessage(message)
  304. case .end:
  305. self.receiveInterceptedEnd()
  306. }
  307. }
  308. @inlinable
  309. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  310. switch self.state {
  311. case .idle:
  312. // Make a context to invoke the user handler with.
  313. let context = GRPCAsyncServerCallContext(
  314. headers: headers,
  315. logger: self.context.logger,
  316. userInfoRef: self.userInfoRef
  317. )
  318. // Create a source for our request stream.
  319. let requestStreamSource = PassthroughMessageSource<Request, Error>()
  320. // Create a promise to hang a callback off when the user handler completes.
  321. let userHandlerPromise: EventLoopPromise<Void> = self.context.eventLoop.makePromise()
  322. // Create a request stream from our stream source to pass to the user handler.
  323. let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
  324. // TODO: In future use `AsyncWriter.init(maxPendingElements:maxWritesBeforeYield:delegate:)`?
  325. let responseStreamWriter =
  326. GRPCAsyncResponseStreamWriter(
  327. wrapping: AsyncWriter(delegate: AsyncResponseStreamWriterDelegate(
  328. context: context,
  329. compressionIsEnabled: self.context.encoding.isEnabled,
  330. send: self.interceptResponse(_:metadata:),
  331. finish: self.responseStreamDrained(_:)
  332. ))
  333. )
  334. // Set the state to active and bundle in all the associated data.
  335. self.state = .active(.init(
  336. requestStreamSource: requestStreamSource,
  337. context: context,
  338. responseStreamWriter: responseStreamWriter,
  339. userHandlerPromise: userHandlerPromise
  340. ))
  341. // Register callback for the completion of the user handler.
  342. userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:))
  343. // Spin up a task to call the async user handler.
  344. self.userHandlerTask = userHandlerPromise.completeWithTask {
  345. return try await withTaskCancellationHandler {
  346. do {
  347. // When the user handler completes we invalidate the request stream source.
  348. defer { requestStreamSource.finish() }
  349. // Call the user handler.
  350. try await self.userHandler(requestStream, responseStreamWriter, context)
  351. } catch let status as GRPCStatus where status.isOk {
  352. // The user handler throwing `GRPCStatus.ok` is considered to be invalid.
  353. await responseStreamWriter.asyncWriter.cancel()
  354. throw GRPCStatus(
  355. code: .unknown,
  356. message: "Handler threw GRPCStatus error with code .ok"
  357. )
  358. } catch {
  359. await responseStreamWriter.asyncWriter.cancel()
  360. throw error
  361. }
  362. // Wait for the response stream writer to finish writing its responses.
  363. try await responseStreamWriter.asyncWriter.finish(.ok)
  364. } onCancel: {
  365. /// The task being cancelled from outside is the signal to this task that an error has
  366. /// occured and we should abort the user handler.
  367. ///
  368. /// Adopters are encouraged to cooperatively check for cancellation in their handlers but
  369. /// we cannot rely on this.
  370. ///
  371. /// We additionally signal the handler that an error has occured by terminating the source
  372. /// backing the request stream that the user handler is consuming.
  373. ///
  374. /// - NOTE: This handler has different semantics from the extant non-async-await handlers
  375. /// where the `statusPromise` was explicitly failed with `GRPCStatus.unavailable` from
  376. /// _outside_ the user handler. Here we terminate the request stream with a
  377. /// `CancellationError` which manifests _inside_ the user handler when it tries to access
  378. /// the next request in the stream. We have no control over the implementation of the user
  379. /// handler. It may choose to handle this error or not. In the event that the handler
  380. /// either rethrows or does not handle the error, this will be converted to a
  381. /// `GRPCStatus.unknown` by `handleError(_:)`. Yielding a `CancellationError` _inside_
  382. /// the user handler feels like the clearest semantics of what we want--"the RPC has an
  383. /// error, cancel whatever you're doing." If we want to preserve the API of the
  384. /// non-async-await handlers in this error flow we could add conformance to
  385. /// `GRPCStatusTransformable` to `CancellationError`, but we still cannot control _how_
  386. /// the user handler will handle the `CancellationError` which could even be swallowed.
  387. ///
  388. /// - NOTE: Currently we _have_ added `GRPCStatusTransformable` conformance to
  389. /// `CancellationError` to convert it into `GRPCStatus.unavailable` and expect to
  390. /// document that user handlers should always rethrow `CacellationError` if handled, after
  391. /// optional cleanup.
  392. requestStreamSource.finish(throwing: CancellationError())
  393. /// Cancel the writer here to drop any pending responses.
  394. responseStreamWriter.asyncWriter.cancelAsynchronously()
  395. }
  396. }
  397. case .active:
  398. self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
  399. case .completed:
  400. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  401. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  402. // Dropping them is fine.
  403. ()
  404. }
  405. }
  406. @inlinable
  407. internal func receiveInterceptedMessage(_ request: Request) {
  408. switch self.state {
  409. case .idle:
  410. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  411. case let .active(activeState):
  412. switch activeState.requestStreamSource.yield(request) {
  413. case .accepted(queueDepth: _):
  414. // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`.
  415. break
  416. case .dropped:
  417. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  418. /// if the request stream source has already terminated then it must have been the result of
  419. /// receiving `.end`. Therefore this `.message` must have been sent by the client after it
  420. /// sent `.end`, which is a protocol violation.
  421. self.handleError(GRPCError.ProtocolViolation("Message received after end of stream"))
  422. }
  423. case .completed:
  424. // We received a message but we're already done: this may happen if we terminate the RPC
  425. // due to a channel error, for example.
  426. ()
  427. }
  428. }
  429. @inlinable
  430. internal func receiveInterceptedEnd() {
  431. switch self.state {
  432. case .idle:
  433. self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
  434. case let .active(activeState):
  435. switch activeState.requestStreamSource.finish() {
  436. case .accepted(queueDepth: _):
  437. break
  438. case .dropped:
  439. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  440. /// if the request stream source has already terminated then it must have been the result of
  441. /// receiving `.end`. Therefore this `.end` must have been sent by the client after it
  442. /// already sent `.end`, which is a protocol violation.
  443. self.handleError(GRPCError.ProtocolViolation("Message duplicate end of stream"))
  444. }
  445. case .completed:
  446. // We received a message but we're already done: this may happen if we terminate the RPC
  447. // due to a channel error, for example.
  448. ()
  449. }
  450. }
  451. // MARK: - User Function To Interceptors
  452. @inlinable
  453. internal func _interceptResponse(_ response: Response, metadata: MessageMetadata) {
  454. self.context.eventLoop.assertInEventLoop()
  455. switch self.state {
  456. case .idle:
  457. // The user handler cannot send responses before it has been invoked.
  458. preconditionFailure()
  459. case var .active(activeState):
  460. if !activeState.haveSentResponseHeaders {
  461. activeState.haveSentResponseHeaders = true
  462. self.state = .active(activeState)
  463. // Send response headers back via the interceptors.
  464. self.interceptors.send(.metadata(activeState.context.initialResponseMetadata), promise: nil)
  465. }
  466. // Send the response back via the interceptors.
  467. self.interceptors.send(.message(response, metadata), promise: nil)
  468. case .completed:
  469. /// If we are in the completed state then the async writer delegate must have terminated.
  470. preconditionFailure()
  471. }
  472. }
  473. @inlinable
  474. internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
  475. if self.context.eventLoop.inEventLoop {
  476. self._interceptResponse(response, metadata: metadata)
  477. } else {
  478. self.context.eventLoop.execute {
  479. self._interceptResponse(response, metadata: metadata)
  480. }
  481. }
  482. }
  483. @inlinable
  484. internal func userHandlerCompleted(_ result: Result<Void, Error>) {
  485. switch self.state {
  486. case .idle:
  487. // The user handler cannot complete before it is invoked.
  488. preconditionFailure()
  489. case .active:
  490. switch result {
  491. case .success:
  492. /// The user handler has completed successfully.
  493. /// We don't take any action here; the state transition and termination of the message
  494. /// stream happen when the response stream has drained, in the response stream writer
  495. /// delegate callback, `responseStreamDrained(_:)`.
  496. break
  497. case let .failure(error):
  498. self.handleError(error, thrownFromHandler: true)
  499. }
  500. case .completed:
  501. ()
  502. }
  503. }
  504. @inlinable
  505. internal func _responseStreamDrained(_ status: GRPCStatus) {
  506. self.context.eventLoop.assertInEventLoop()
  507. switch self.state {
  508. case .idle:
  509. preconditionFailure()
  510. case let .active(activeState):
  511. // Now we have drained the response stream writer from the user handler we can send end.
  512. self.state = .completed
  513. self.interceptors.send(
  514. .end(status, activeState.context.trailingResponseMetadata),
  515. promise: nil
  516. )
  517. case .completed:
  518. ()
  519. }
  520. }
  521. @inlinable
  522. internal func responseStreamDrained(_ status: GRPCStatus) {
  523. if self.context.eventLoop.inEventLoop {
  524. self._responseStreamDrained(status)
  525. } else {
  526. self.context.eventLoop.execute {
  527. self._responseStreamDrained(status)
  528. }
  529. }
  530. }
  531. @inlinable
  532. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  533. switch self.state {
  534. case .idle:
  535. assert(!isHandlerError)
  536. self.state = .completed
  537. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  538. error,
  539. delegate: self.context.errorDelegate
  540. )
  541. self.interceptors.send(.end(status, trailers), promise: nil)
  542. case let .active(activeState):
  543. self.state = .completed
  544. // If we have an async task, then cancel it, which will terminate the request stream from
  545. // which it is reading and give the user handler an opportunity to cleanup.
  546. self.userHandlerTask?.cancel()
  547. let status: GRPCStatus
  548. let trailers: HPACKHeaders
  549. if isHandlerError {
  550. (status, trailers) = ServerErrorProcessor.processObserverError(
  551. error,
  552. headers: activeState.context.requestMetadata,
  553. trailers: activeState.context.trailingResponseMetadata,
  554. delegate: self.context.errorDelegate
  555. )
  556. } else {
  557. (status, trailers) = ServerErrorProcessor.processLibraryError(
  558. error,
  559. delegate: self.context.errorDelegate
  560. )
  561. }
  562. // TODO: This doesn't go via the user handler task.
  563. self.interceptors.send(.end(status, trailers), promise: nil)
  564. case .completed:
  565. ()
  566. }
  567. }
  568. @inlinable
  569. internal func sendInterceptedPart(
  570. _ part: GRPCServerResponsePart<Response>,
  571. promise: EventLoopPromise<Void>?
  572. ) {
  573. switch part {
  574. case let .metadata(headers):
  575. self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
  576. case let .message(message, metadata):
  577. do {
  578. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  579. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  580. } catch {
  581. // Serialization failed: fail the promise and send end.
  582. promise?.fail(error)
  583. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  584. error,
  585. delegate: self.context.errorDelegate
  586. )
  587. // Loop back via the interceptors.
  588. self.interceptors.send(.end(status, trailers), promise: nil)
  589. }
  590. case let .end(status, trailers):
  591. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  592. }
  593. }
  594. }
  595. #endif