GRPCAsyncServerHandler.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5)
  17. import _NIOConcurrency
  18. import NIOCore
  19. import NIOHPACK
  20. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  21. public struct GRPCAsyncServerHandler<
  22. Serializer: MessageSerializer,
  23. Deserializer: MessageDeserializer
  24. >: GRPCServerHandlerProtocol {
  25. @usableFromInline
  26. internal let _handler: AsyncServerHandler<Serializer, Deserializer>
  27. public func receiveMetadata(_ metadata: HPACKHeaders) {
  28. self._handler.receiveMetadata(metadata)
  29. }
  30. public func receiveMessage(_ bytes: ByteBuffer) {
  31. self._handler.receiveMessage(bytes)
  32. }
  33. public func receiveEnd() {
  34. self._handler.receiveEnd()
  35. }
  36. public func receiveError(_ error: Error) {
  37. self._handler.receiveError(error)
  38. }
  39. public func finish() {
  40. self._handler.finish()
  41. }
  42. }
  43. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  44. extension GRPCAsyncServerHandler {
  45. public typealias Request = Deserializer.Output
  46. public typealias Response = Serializer.Input
  47. @inlinable
  48. public init(
  49. context: CallHandlerContext,
  50. requestDeserializer: Deserializer,
  51. responseSerializer: Serializer,
  52. interceptors: [ServerInterceptor<Request, Response>],
  53. wrapping unary: @escaping @Sendable(Request, GRPCAsyncServerCallContext) async throws
  54. -> Response
  55. ) {
  56. self._handler = .init(
  57. context: context,
  58. requestDeserializer: requestDeserializer,
  59. responseSerializer: responseSerializer,
  60. interceptors: interceptors,
  61. userHandler: { requestStream, responseStreamWriter, context in
  62. var iterator = requestStream.makeAsyncIterator()
  63. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  64. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  65. }
  66. let response = try await unary(request, context)
  67. try await responseStreamWriter.send(response)
  68. }
  69. )
  70. }
  71. @inlinable
  72. public init(
  73. context: CallHandlerContext,
  74. requestDeserializer: Deserializer,
  75. responseSerializer: Serializer,
  76. interceptors: [ServerInterceptor<Request, Response>],
  77. wrapping clientStreaming: @escaping @Sendable(
  78. GRPCAsyncRequestStream<Request>,
  79. GRPCAsyncServerCallContext
  80. ) async throws -> Response
  81. ) {
  82. self._handler = .init(
  83. context: context,
  84. requestDeserializer: requestDeserializer,
  85. responseSerializer: responseSerializer,
  86. interceptors: interceptors,
  87. userHandler: { requestStream, responseStreamWriter, context in
  88. let response = try await clientStreaming(requestStream, context)
  89. try await responseStreamWriter.send(response)
  90. }
  91. )
  92. }
  93. @inlinable
  94. public init(
  95. context: CallHandlerContext,
  96. requestDeserializer: Deserializer,
  97. responseSerializer: Serializer,
  98. interceptors: [ServerInterceptor<Request, Response>],
  99. wrapping serverStreaming: @escaping @Sendable(
  100. Request,
  101. GRPCAsyncResponseStreamWriter<Response>,
  102. GRPCAsyncServerCallContext
  103. ) async throws -> Void
  104. ) {
  105. self._handler = .init(
  106. context: context,
  107. requestDeserializer: requestDeserializer,
  108. responseSerializer: responseSerializer,
  109. interceptors: interceptors,
  110. userHandler: { requestStream, responseStreamWriter, context in
  111. var iterator = requestStream.makeAsyncIterator()
  112. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  113. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  114. }
  115. try await serverStreaming(request, responseStreamWriter, context)
  116. }
  117. )
  118. }
  119. @inlinable
  120. public init(
  121. context: CallHandlerContext,
  122. requestDeserializer: Deserializer,
  123. responseSerializer: Serializer,
  124. interceptors: [ServerInterceptor<Request, Response>],
  125. wrapping bidirectional: @escaping @Sendable(
  126. GRPCAsyncRequestStream<Request>,
  127. GRPCAsyncResponseStreamWriter<Response>,
  128. GRPCAsyncServerCallContext
  129. ) async throws -> Void
  130. ) {
  131. self._handler = .init(
  132. context: context,
  133. requestDeserializer: requestDeserializer,
  134. responseSerializer: responseSerializer,
  135. interceptors: interceptors,
  136. userHandler: bidirectional
  137. )
  138. }
  139. }
  140. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  141. @usableFromInline
  142. internal final class AsyncServerHandler<
  143. Serializer: MessageSerializer,
  144. Deserializer: MessageDeserializer
  145. >: GRPCServerHandlerProtocol {
  146. @usableFromInline
  147. internal typealias Request = Deserializer.Output
  148. @usableFromInline
  149. internal typealias Response = Serializer.Input
  150. /// A response serializer.
  151. @usableFromInline
  152. internal let serializer: Serializer
  153. /// A request deserializer.
  154. @usableFromInline
  155. internal let deserializer: Deserializer
  156. /// A pipeline of user provided interceptors.
  157. @usableFromInline
  158. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  159. /// The context required in order create the function.
  160. @usableFromInline
  161. internal let context: CallHandlerContext
  162. /// A reference to a `UserInfo`.
  163. @usableFromInline
  164. internal let userInfoRef: Ref<UserInfo>
  165. /// The user provided function to execute.
  166. @usableFromInline
  167. internal let userHandler: (
  168. GRPCAsyncRequestStream<Request>,
  169. GRPCAsyncResponseStreamWriter<Response>,
  170. GRPCAsyncServerCallContext
  171. ) async throws -> Void
  172. /// The state of the handler.
  173. @usableFromInline
  174. internal var state: State = .idle
  175. /// The task used to run the async user handler.
  176. ///
  177. /// - TODO: I'd like it if this was part of the assoc data for the .active state but doing so may introduce a race condition.
  178. @usableFromInline
  179. internal var userHandlerTask: Task<Void, Never>? = nil
  180. @usableFromInline
  181. internal enum State {
  182. /// No headers have been received.
  183. case idle
  184. /// Headers have been received, and an async `Task` has been created to execute the user
  185. /// handler.
  186. ///
  187. /// The inputs to the user handler are held in the associated data of this enum value:
  188. ///
  189. /// - The `PassthroughMessageSource` is the source backing the request stream that is being
  190. /// consumed by the user handler.
  191. ///
  192. /// - The `GRPCAsyncServerContext` is a reference to the context that was passed to the user
  193. /// handler.
  194. ///
  195. /// - The `GRPCAsyncResponseStreamWriter` is the response stream writer that is being written to
  196. /// by the user handler. Because this is pausable, it may contain responses after the user
  197. /// handler has completed that have yet to be written. However we will remain in the `.active`
  198. /// state until the response stream writer has completed.
  199. ///
  200. /// - The `EventLoopPromise` bridges the NIO and async-await worlds. It is the mechanism that we
  201. /// use to run a callback when the user handler has completed. The promise is not passed to the
  202. /// user handler directly. Instead it is fulfilled with the result of the async `Task` executing
  203. /// the user handler using `completeWithTask(_:)`.
  204. ///
  205. /// - TODO: It shouldn't really be necessary to stash the `GRPCAsyncResponseStreamWriter` or the
  206. /// `EventLoopPromise` in this enum value. Specifically they are never used anywhere when this
  207. /// enum value is accessed. However, if we do not store them here then the tests periodically
  208. /// segfault. This appears to be an bug in Swift and/or NIO since these should both have been
  209. /// captured by `completeWithTask(_:)`.
  210. case active(
  211. PassthroughMessageSource<Request, Error>,
  212. GRPCAsyncServerCallContext,
  213. GRPCAsyncResponseStreamWriter<Response>,
  214. EventLoopPromise<Void>
  215. )
  216. /// The handler has completed.
  217. case completed
  218. }
  219. @inlinable
  220. public init(
  221. context: CallHandlerContext,
  222. requestDeserializer: Deserializer,
  223. responseSerializer: Serializer,
  224. interceptors: [ServerInterceptor<Request, Response>],
  225. userHandler: @escaping @Sendable(
  226. GRPCAsyncRequestStream<Request>,
  227. GRPCAsyncResponseStreamWriter<Response>,
  228. GRPCAsyncServerCallContext
  229. ) async throws -> Void
  230. ) {
  231. self.serializer = responseSerializer
  232. self.deserializer = requestDeserializer
  233. self.context = context
  234. self.userHandler = userHandler
  235. let userInfoRef = Ref(UserInfo())
  236. self.userInfoRef = userInfoRef
  237. self.interceptors = ServerInterceptorPipeline(
  238. logger: context.logger,
  239. eventLoop: context.eventLoop,
  240. path: context.path,
  241. callType: .bidirectionalStreaming,
  242. remoteAddress: context.remoteAddress,
  243. userInfoRef: userInfoRef,
  244. interceptors: interceptors,
  245. onRequestPart: self.receiveInterceptedPart(_:),
  246. onResponsePart: self.sendInterceptedPart(_:promise:)
  247. )
  248. }
  249. // MARK: - GRPCServerHandlerProtocol conformance
  250. @inlinable
  251. internal func receiveMetadata(_ headers: HPACKHeaders) {
  252. self.interceptors.receive(.metadata(headers))
  253. }
  254. @inlinable
  255. internal func receiveMessage(_ bytes: ByteBuffer) {
  256. do {
  257. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  258. self.interceptors.receive(.message(message))
  259. } catch {
  260. self.handleError(error)
  261. }
  262. }
  263. @inlinable
  264. internal func receiveEnd() {
  265. self.interceptors.receive(.end)
  266. }
  267. @inlinable
  268. internal func receiveError(_ error: Error) {
  269. self.handleError(error)
  270. self.finish()
  271. }
  272. @inlinable
  273. internal func finish() {
  274. switch self.state {
  275. case .idle:
  276. self.interceptors = nil
  277. self.state = .completed
  278. case .active:
  279. self.userHandlerTask?.cancel()
  280. case .completed:
  281. self.interceptors = nil
  282. }
  283. }
  284. // MARK: - Interceptors to User Function
  285. @inlinable
  286. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  287. switch part {
  288. case let .metadata(headers):
  289. self.receiveInterceptedMetadata(headers)
  290. case let .message(message):
  291. self.receiveInterceptedMessage(message)
  292. case .end:
  293. self.receiveInterceptedEnd()
  294. }
  295. }
  296. @inlinable
  297. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  298. switch self.state {
  299. case .idle:
  300. // Make a context to invoke the user handler with.
  301. let context = GRPCAsyncServerCallContext(
  302. headers: headers,
  303. logger: self.context.logger,
  304. userInfoRef: self.userInfoRef
  305. )
  306. // Create a source for our request stream.
  307. let requestStreamSource = PassthroughMessageSource<Request, Error>()
  308. // Create a promise to hang a callback off when the user handler completes.
  309. let userHandlerPromise: EventLoopPromise<Void> = self.context.eventLoop.makePromise()
  310. // Create a request stream from our stream source to pass to the user handler.
  311. let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
  312. // TODO: In future use `AsyncWriter.init(maxPendingElements:maxWritesBeforeYield:delegate:)`?
  313. let responseStreamWriter =
  314. GRPCAsyncResponseStreamWriter(
  315. wrapping: AsyncWriter(delegate: AsyncResponseStreamWriterDelegate(
  316. context: context,
  317. compressionIsEnabled: self.context.encoding.isEnabled,
  318. send: self.interceptResponse(_:metadata:),
  319. finish: self.responseStreamDrained(_:)
  320. ))
  321. )
  322. // Set the state to active and bundle in all the associated data.
  323. self.state = .active(requestStreamSource, context, responseStreamWriter, userHandlerPromise)
  324. // Register callback for the completion of the user handler.
  325. userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:))
  326. // Send response headers back via the interceptors.
  327. // TODO: In future we may want to defer this until the first response is available from the user handler which will allow the user to set the response headers via the context.
  328. self.interceptors.send(.metadata([:]), promise: nil)
  329. // Spin up a task to call the async user handler.
  330. self.userHandlerTask = userHandlerPromise.completeWithTask {
  331. return try await withTaskCancellationHandler {
  332. do {
  333. // When the user handler completes we invalidate the request stream source.
  334. defer { requestStreamSource.finish() }
  335. // Call the user handler.
  336. try await self.userHandler(requestStream, responseStreamWriter, context)
  337. } catch let status as GRPCStatus where status.isOk {
  338. // The user handler throwing `GRPCStatus.ok` is considered to be invalid.
  339. await responseStreamWriter.asyncWriter.cancel()
  340. throw GRPCStatus(
  341. code: .unknown,
  342. message: "Handler threw GRPCStatus error with code .ok"
  343. )
  344. } catch {
  345. await responseStreamWriter.asyncWriter.cancel()
  346. throw error
  347. }
  348. // Wait for the response stream writer to finish writing its responses.
  349. try await responseStreamWriter.asyncWriter.finish(.ok)
  350. } onCancel: {
  351. /// The task being cancelled from outside is the signal to this task that an error has
  352. /// occured and we should abort the user handler.
  353. ///
  354. /// Adopters are encouraged to cooperatively check for cancellation in their handlers but
  355. /// we cannot rely on this.
  356. ///
  357. /// We additionally signal the handler that an error has occured by terminating the source
  358. /// backing the request stream that the user handler is consuming.
  359. ///
  360. /// - NOTE: This handler has different semantics from the extant non-async-await handlers
  361. /// where the `statusPromise` was explicitly failed with `GRPCStatus.unavailable` from
  362. /// _outside_ the user handler. Here we terminate the request stream with a
  363. /// `CancellationError` which manifests _inside_ the user handler when it tries to access
  364. /// the next request in the stream. We have no control over the implementation of the user
  365. /// handler. It may choose to handle this error or not. In the event that the handler
  366. /// either rethrows or does not handle the error, this will be converted to a
  367. /// `GRPCStatus.unknown` by `handleError(_:)`. Yielding a `CancellationError` _inside_
  368. /// the user handler feels like the clearest semantics of what we want--"the RPC has an
  369. /// error, cancel whatever you're doing." If we want to preserve the API of the
  370. /// non-async-await handlers in this error flow we could add conformance to
  371. /// `GRPCStatusTransformable` to `CancellationError`, but we still cannot control _how_
  372. /// the user handler will handle the `CancellationError` which could even be swallowed.
  373. ///
  374. /// - NOTE: Currently we _have_ added `GRPCStatusTransformable` conformance to
  375. /// `CancellationError` to convert it into `GRPCStatus.unavailable` and expect to
  376. /// document that user handlers should always rethrow `CacellationError` if handled, after
  377. /// optional cleanup.
  378. requestStreamSource.finish(throwing: CancellationError())
  379. /// Cancel the writer here to drop any pending responses.
  380. responseStreamWriter.asyncWriter.cancelAsynchronously()
  381. }
  382. }
  383. case .active:
  384. self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
  385. case .completed:
  386. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  387. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  388. // Dropping them is fine.
  389. ()
  390. }
  391. }
  392. @inlinable
  393. internal func receiveInterceptedMessage(_ request: Request) {
  394. switch self.state {
  395. case .idle:
  396. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  397. case let .active(requestStreamSource, _, _, _):
  398. switch requestStreamSource.yield(request) {
  399. case .accepted(queueDepth: _):
  400. // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`.
  401. break
  402. case .dropped:
  403. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  404. /// if the request stream source has already terminated then it must have been the result of
  405. /// receiving `.end`. Therefore this `.message` must have been sent by the client after it
  406. /// sent `.end`, which is a protocol violation.
  407. self.handleError(GRPCError.ProtocolViolation("Message received after end of stream"))
  408. }
  409. case .completed:
  410. // We received a message but we're already done: this may happen if we terminate the RPC
  411. // due to a channel error, for example.
  412. ()
  413. }
  414. }
  415. @inlinable
  416. internal func receiveInterceptedEnd() {
  417. switch self.state {
  418. case .idle:
  419. self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
  420. case let .active(requestStreamSource, _, _, _):
  421. switch requestStreamSource.finish() {
  422. case .accepted(queueDepth: _):
  423. break
  424. case .dropped:
  425. /// If we are in the `.active` state then we have yet to encounter an error. Therefore
  426. /// if the request stream source has already terminated then it must have been the result of
  427. /// receiving `.end`. Therefore this `.end` must have been sent by the client after it
  428. /// already sent `.end`, which is a protocol violation.
  429. self.handleError(GRPCError.ProtocolViolation("Message duplicate end of stream"))
  430. }
  431. case .completed:
  432. // We received a message but we're already done: this may happen if we terminate the RPC
  433. // due to a channel error, for example.
  434. ()
  435. }
  436. }
  437. // MARK: - User Function To Interceptors
  438. @inlinable
  439. internal func _interceptResponse(_ response: Response, metadata: MessageMetadata) {
  440. self.context.eventLoop.assertInEventLoop()
  441. switch self.state {
  442. case .idle:
  443. // The user handler cannot send responses before it has been invoked.
  444. preconditionFailure()
  445. case .active:
  446. self.interceptors.send(.message(response, metadata), promise: nil)
  447. case .completed:
  448. /// If we are in the completed state then the async writer delegate must have terminated.
  449. preconditionFailure()
  450. }
  451. }
  452. @inlinable
  453. internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
  454. if self.context.eventLoop.inEventLoop {
  455. self._interceptResponse(response, metadata: metadata)
  456. } else {
  457. self.context.eventLoop.execute {
  458. self._interceptResponse(response, metadata: metadata)
  459. }
  460. }
  461. }
  462. @inlinable
  463. internal func userHandlerCompleted(_ result: Result<Void, Error>) {
  464. switch self.state {
  465. case .idle:
  466. // The user handler cannot complete before it is invoked.
  467. preconditionFailure()
  468. case .active:
  469. switch result {
  470. case .success:
  471. /// The user handler has completed successfully.
  472. /// We don't take any action here; the state transition and termination of the message
  473. /// stream happen when the response stream has drained, in the response stream writer
  474. /// delegate callback, `responseStreamDrained(_:)`.
  475. break
  476. case let .failure(error):
  477. self.handleError(error, thrownFromHandler: true)
  478. }
  479. case .completed:
  480. ()
  481. }
  482. }
  483. @inlinable
  484. internal func _responseStreamDrained(_ status: GRPCStatus) {
  485. self.context.eventLoop.assertInEventLoop()
  486. switch self.state {
  487. case .idle:
  488. preconditionFailure()
  489. case let .active(_, context, _, _):
  490. // Now we have drained the response stream writer from the user handler we can send end.
  491. self.state = .completed
  492. self.interceptors.send(.end(status, context.trailers), promise: nil)
  493. case .completed:
  494. ()
  495. }
  496. }
  497. @inlinable
  498. internal func responseStreamDrained(_ status: GRPCStatus) {
  499. if self.context.eventLoop.inEventLoop {
  500. self._responseStreamDrained(status)
  501. } else {
  502. self.context.eventLoop.execute {
  503. self._responseStreamDrained(status)
  504. }
  505. }
  506. }
  507. @inlinable
  508. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  509. switch self.state {
  510. case .idle:
  511. assert(!isHandlerError)
  512. self.state = .completed
  513. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  514. error,
  515. delegate: self.context.errorDelegate
  516. )
  517. self.interceptors.send(.end(status, trailers), promise: nil)
  518. case let .active(_, context, _, _):
  519. self.state = .completed
  520. // If we have an async task, then cancel it, which will terminate the request stream from
  521. // which it is reading and give the user handler an opportunity to cleanup.
  522. self.userHandlerTask?.cancel()
  523. let status: GRPCStatus
  524. let trailers: HPACKHeaders
  525. if isHandlerError {
  526. (status, trailers) = ServerErrorProcessor.processObserverError(
  527. error,
  528. headers: context.headers,
  529. trailers: context.trailers,
  530. delegate: self.context.errorDelegate
  531. )
  532. } else {
  533. (status, trailers) = ServerErrorProcessor.processLibraryError(
  534. error,
  535. delegate: self.context.errorDelegate
  536. )
  537. }
  538. // TODO: This doesn't go via the user handler task.
  539. self.interceptors.send(.end(status, trailers), promise: nil)
  540. case .completed:
  541. ()
  542. }
  543. }
  544. @inlinable
  545. internal func sendInterceptedPart(
  546. _ part: GRPCServerResponsePart<Response>,
  547. promise: EventLoopPromise<Void>?
  548. ) {
  549. switch part {
  550. case let .metadata(headers):
  551. self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
  552. case let .message(message, metadata):
  553. do {
  554. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  555. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  556. } catch {
  557. // Serialization failed: fail the promise and send end.
  558. promise?.fail(error)
  559. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  560. error,
  561. delegate: self.context.errorDelegate
  562. )
  563. // Loop back via the interceptors.
  564. self.interceptors.send(.end(status, trailers), promise: nil)
  565. }
  566. case let .end(status, trailers):
  567. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  568. }
  569. }
  570. }
  571. #endif