_BaseCallHandler.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Provides a means for decoding incoming gRPC messages into protobuf objects.
  22. ///
  23. /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
  24. /// - Important: This is **NOT** part of the public API.
  25. public class _BaseCallHandler<
  26. RequestDeserializer: MessageDeserializer,
  27. ResponseSerializer: MessageSerializer
  28. >: GRPCCallHandler, ChannelInboundHandler {
  29. public typealias RequestPayload = RequestDeserializer.Output
  30. public typealias ResponsePayload = ResponseSerializer.Input
  31. public typealias InboundIn = GRPCServerRequestPart<ByteBuffer>
  32. public typealias OutboundOut = GRPCServerResponsePart<ByteBuffer>
  33. /// An interceptor pipeline.
  34. @usableFromInline
  35. internal var _pipeline: ServerInterceptorPipeline<RequestPayload, ResponsePayload>?
  36. /// Our current state.
  37. @usableFromInline
  38. internal var _state: State = .idle
  39. /// The type of this RPC, e.g. 'unary'.
  40. @usableFromInline
  41. internal let _callType: GRPCCallType
  42. /// Some context provided to us from the routing handler.
  43. @usableFromInline
  44. internal let _callHandlerContext: CallHandlerContext
  45. /// A request deserializer.
  46. @usableFromInline
  47. internal let _requestDeserializer: RequestDeserializer
  48. /// A response serializer.
  49. @usableFromInline
  50. internal let _responseSerializer: ResponseSerializer
  51. /// The `ChannelHandlerContext`.
  52. @usableFromInline
  53. internal var _context: ChannelHandlerContext?
  54. /// The event loop this call is being handled on.
  55. @usableFromInline
  56. internal var eventLoop: EventLoop {
  57. return self._callHandlerContext.eventLoop
  58. }
  59. /// An error delegate.
  60. @usableFromInline
  61. internal var errorDelegate: ServerErrorDelegate? {
  62. return self._callHandlerContext.errorDelegate
  63. }
  64. /// A logger.
  65. @usableFromInline
  66. internal var logger: Logger {
  67. return self._callHandlerContext.logger
  68. }
  69. /// A reference to `UserInfo`.
  70. @usableFromInline
  71. internal var _userInfoRef: Ref<UserInfo>
  72. @inlinable
  73. internal init(
  74. callHandlerContext: CallHandlerContext,
  75. requestDeserializer: RequestDeserializer,
  76. responseSerializer: ResponseSerializer,
  77. callType: GRPCCallType,
  78. interceptors: [ServerInterceptor<RequestPayload, ResponsePayload>]
  79. ) {
  80. let userInfoRef = Ref(UserInfo())
  81. self._requestDeserializer = requestDeserializer
  82. self._responseSerializer = responseSerializer
  83. self._callHandlerContext = callHandlerContext
  84. self._callType = callType
  85. self._userInfoRef = userInfoRef
  86. self._pipeline = ServerInterceptorPipeline(
  87. logger: callHandlerContext.logger,
  88. eventLoop: callHandlerContext.eventLoop,
  89. path: callHandlerContext.path,
  90. callType: callType,
  91. remoteAddress: callHandlerContext.remoteAddress,
  92. userInfoRef: userInfoRef,
  93. interceptors: interceptors,
  94. onRequestPart: self._receiveRequestPartFromInterceptors(_:),
  95. onResponsePart: self._sendResponsePartFromInterceptors(_:promise:)
  96. )
  97. }
  98. // MARK: - ChannelHandler
  99. public func handlerAdded(context: ChannelHandlerContext) {
  100. self._state.handlerAdded()
  101. self._context = context
  102. }
  103. public func handlerRemoved(context: ChannelHandlerContext) {
  104. self._pipeline = nil
  105. self._context = nil
  106. }
  107. public func channelInactive(context: ChannelHandlerContext) {
  108. self._pipeline = nil
  109. context.fireChannelInactive()
  110. }
  111. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  112. if self._state.errorCaught() {
  113. self.observeLibraryError(error)
  114. }
  115. }
  116. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  117. let part = self.unwrapInboundIn(data)
  118. switch part {
  119. case let .metadata(headers):
  120. if self._state.channelReadMetadata() {
  121. self._receiveRequestPartInInterceptors(.metadata(headers))
  122. }
  123. case let .message(buffer):
  124. if self._state.channelReadMessage() {
  125. do {
  126. let request = try self._requestDeserializer.deserialize(byteBuffer: buffer)
  127. self._receiveRequestPartInInterceptors(.message(request))
  128. } catch {
  129. self.errorCaught(context: context, error: error)
  130. }
  131. }
  132. case .end:
  133. if self._state.channelReadEnd() {
  134. self._receiveRequestPartInInterceptors(.end)
  135. }
  136. }
  137. // We're the last handler. We don't have anything to forward.
  138. }
  139. // MARK: - Event Observer
  140. @inlinable
  141. internal func observeHeaders(_ headers: HPACKHeaders) {
  142. fatalError("must be overridden by subclasses")
  143. }
  144. @inlinable
  145. internal func observeRequest(_ message: RequestPayload) {
  146. fatalError("must be overridden by subclasses")
  147. }
  148. @inlinable
  149. internal func observeEnd() {
  150. fatalError("must be overridden by subclasses")
  151. }
  152. @inlinable
  153. internal func observeLibraryError(_ error: Error) {
  154. fatalError("must be overridden by subclasses")
  155. }
  156. /// Send a response part to the interceptor pipeline. Called by an event observer.
  157. /// - Parameters:
  158. /// - part: The response part to send.
  159. /// - promise: A promise to complete once the response part has been written.
  160. @inlinable
  161. internal final func sendResponsePartFromObserver(
  162. _ part: GRPCServerResponsePart<ResponsePayload>,
  163. promise: EventLoopPromise<Void>?
  164. ) {
  165. let forward: Bool
  166. switch part {
  167. case .metadata:
  168. forward = self._state.sendResponsePartFromObserver(.metadata)
  169. case .message:
  170. forward = self._state.sendResponsePartFromObserver(.message)
  171. case .end:
  172. forward = self._state.sendResponsePartFromObserver(.end)
  173. }
  174. if forward {
  175. self._sendResponsePartToInterceptors(part, promise: promise)
  176. } else {
  177. promise?.fail(GRPCError.AlreadyComplete())
  178. }
  179. }
  180. /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
  181. /// - Parameter error: The error to process.
  182. /// - Returns: The status and trailers to send to the client.
  183. internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) {
  184. // Observe the error if we have a delegate.
  185. self.errorDelegate?.observeLibraryError(error)
  186. // What status are we terminating this RPC with?
  187. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  188. // them with any on the call context.
  189. // - If we don't have a delegate, then try to transform the error to a status.
  190. // - Fallback to a generic error.
  191. let status: GRPCStatus
  192. let trailers: HPACKHeaders
  193. if let transformed = self.errorDelegate?.transformLibraryError(error) {
  194. status = transformed.status
  195. trailers = transformed.trailers ?? [:]
  196. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  197. status = grpcStatusTransformable.makeGRPCStatus()
  198. trailers = [:]
  199. } else {
  200. // Eh... well, we don't what status to use. Use a generic one.
  201. status = .processingError
  202. trailers = [:]
  203. }
  204. return (status, trailers)
  205. }
  206. /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
  207. internal func processObserverError(
  208. _ error: Error,
  209. headers: HPACKHeaders,
  210. trailers: HPACKHeaders
  211. ) -> (GRPCStatus, HPACKHeaders) {
  212. // Observe the error if we have a delegate.
  213. self.errorDelegate?.observeRequestHandlerError(error, headers: headers)
  214. // What status are we terminating this RPC with?
  215. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  216. // them with any on the call context.
  217. // - If we don't have a delegate, then try to transform the error to a status.
  218. // - Fallback to a generic error.
  219. let status: GRPCStatus
  220. let mergedTrailers: HPACKHeaders
  221. if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) {
  222. status = transformed.status
  223. if var transformedTrailers = transformed.trailers {
  224. // The delegate returned trailers: merge in those from the context as well.
  225. transformedTrailers.add(contentsOf: trailers)
  226. mergedTrailers = transformedTrailers
  227. } else {
  228. mergedTrailers = trailers
  229. }
  230. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  231. status = grpcStatusTransformable.makeGRPCStatus()
  232. mergedTrailers = trailers
  233. } else {
  234. // Eh... well, we don't what status to use. Use a generic one.
  235. status = .processingError
  236. mergedTrailers = trailers
  237. }
  238. return (status, mergedTrailers)
  239. }
  240. }
  241. // MARK: - Interceptor API
  242. extension _BaseCallHandler {
  243. /// Receive a request part from the interceptors pipeline to forward to the event observer.
  244. /// - Parameter part: The request part to forward.
  245. @inlinable
  246. internal func _receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  247. let forward: Bool
  248. switch part {
  249. case .metadata:
  250. forward = self._state.receiveRequestPartFromInterceptors(.metadata)
  251. case .message:
  252. forward = self._state.receiveRequestPartFromInterceptors(.message)
  253. case .end:
  254. forward = self._state.receiveRequestPartFromInterceptors(.end)
  255. }
  256. if forward {
  257. self._receiveRequestPartInObserver(part)
  258. }
  259. }
  260. /// Send a response part via the `Channel`. Called once the response part has traversed the
  261. /// interceptor pipeline.
  262. /// - Parameters:
  263. /// - part: The response part to send.
  264. /// - promise: A promise to complete once the response part has been written.
  265. @inlinable
  266. internal func _sendResponsePartFromInterceptors(
  267. _ part: GRPCServerResponsePart<ResponsePayload>,
  268. promise: EventLoopPromise<Void>?
  269. ) {
  270. let forward: Bool
  271. switch part {
  272. case .metadata:
  273. forward = self._state.sendResponsePartFromInterceptors(.metadata)
  274. case .message:
  275. forward = self._state.sendResponsePartFromInterceptors(.message)
  276. case .end:
  277. forward = self._state.sendResponsePartFromInterceptors(.end)
  278. }
  279. if forward, let context = self._context {
  280. self._writeResponsePartToChannel(context: context, part: part, promise: promise)
  281. } else {
  282. promise?.fail(GRPCError.AlreadyComplete())
  283. }
  284. }
  285. }
  286. // MARK: - State
  287. @usableFromInline
  288. internal enum State {
  289. /// Idle. We're waiting to be added to a pipeline.
  290. case idle
  291. /// We're in a pipeline and receiving from the client.
  292. case active(ActiveState)
  293. /// We're done. This state is terminal, all actions are ignored.
  294. case closed
  295. }
  296. @usableFromInline
  297. internal enum RPCStreamPart {
  298. case metadata
  299. case message
  300. case end
  301. }
  302. extension State {
  303. /// The state of the request and response streams.
  304. ///
  305. /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
  306. /// the interceptor pipeline and event observer.
  307. @usableFromInline
  308. enum StreamState {
  309. case requestIdleResponseIdle
  310. case requestOpenResponseIdle
  311. case requestOpenResponseOpen
  312. case requestClosedResponseIdle
  313. case requestClosedResponseOpen
  314. case requestClosedResponseClosed
  315. @inlinable
  316. mutating func receiveHeaders() -> Bool {
  317. switch self {
  318. case .requestIdleResponseIdle:
  319. self = .requestOpenResponseIdle
  320. return true
  321. case .requestOpenResponseIdle,
  322. .requestOpenResponseOpen,
  323. .requestClosedResponseIdle,
  324. .requestClosedResponseOpen,
  325. .requestClosedResponseClosed:
  326. return false
  327. }
  328. }
  329. @inlinable
  330. func receiveMessage() -> Bool {
  331. switch self {
  332. case .requestOpenResponseIdle,
  333. .requestOpenResponseOpen:
  334. return true
  335. case .requestIdleResponseIdle,
  336. .requestClosedResponseIdle,
  337. .requestClosedResponseOpen,
  338. .requestClosedResponseClosed:
  339. return false
  340. }
  341. }
  342. @inlinable
  343. mutating func receiveEnd() -> Bool {
  344. switch self {
  345. case .requestOpenResponseIdle:
  346. self = .requestClosedResponseIdle
  347. return true
  348. case .requestOpenResponseOpen:
  349. self = .requestClosedResponseOpen
  350. return true
  351. case .requestIdleResponseIdle,
  352. .requestClosedResponseIdle,
  353. .requestClosedResponseOpen,
  354. .requestClosedResponseClosed:
  355. return false
  356. }
  357. }
  358. @inlinable
  359. mutating func sendHeaders() -> Bool {
  360. switch self {
  361. case .requestOpenResponseIdle:
  362. self = .requestOpenResponseOpen
  363. return true
  364. case .requestClosedResponseIdle:
  365. self = .requestClosedResponseOpen
  366. return true
  367. case .requestIdleResponseIdle,
  368. .requestOpenResponseOpen,
  369. .requestClosedResponseOpen,
  370. .requestClosedResponseClosed:
  371. return false
  372. }
  373. }
  374. @inlinable
  375. func sendMessage() -> Bool {
  376. switch self {
  377. case .requestOpenResponseOpen,
  378. .requestClosedResponseOpen:
  379. return true
  380. case .requestIdleResponseIdle,
  381. .requestOpenResponseIdle,
  382. .requestClosedResponseIdle,
  383. .requestClosedResponseClosed:
  384. return false
  385. }
  386. }
  387. @inlinable
  388. mutating func sendEnd() -> Bool {
  389. switch self {
  390. case .requestIdleResponseIdle:
  391. return false
  392. case .requestOpenResponseIdle,
  393. .requestOpenResponseOpen,
  394. .requestClosedResponseIdle,
  395. .requestClosedResponseOpen:
  396. self = .requestClosedResponseClosed
  397. return true
  398. case .requestClosedResponseClosed:
  399. return false
  400. }
  401. }
  402. }
  403. @usableFromInline
  404. struct ActiveState {
  405. /// The stream state between the 'Channel' and interceptor pipeline.
  406. @usableFromInline
  407. var channelStreamState: StreamState
  408. /// The stream state between the interceptor pipeline and event observer.
  409. @usableFromInline
  410. var observerStreamState: StreamState
  411. @inlinable
  412. init() {
  413. self.channelStreamState = .requestIdleResponseIdle
  414. self.observerStreamState = .requestIdleResponseIdle
  415. }
  416. }
  417. }
  418. extension State {
  419. /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
  420. /// state. We only expect this to be called once.
  421. internal mutating func handlerAdded() {
  422. switch self {
  423. case .idle:
  424. // This is the only way we can become active.
  425. self = .active(.init())
  426. case .active:
  427. preconditionFailure("Invalid state: already active")
  428. case .closed:
  429. ()
  430. }
  431. }
  432. /// Received an error from the `Channel`.
  433. /// - Returns: True if the error should be forwarded to the error observer, or false if it should
  434. /// be dropped.
  435. internal func errorCaught() -> Bool {
  436. switch self {
  437. case .active:
  438. return true
  439. case .idle, .closed:
  440. return false
  441. }
  442. }
  443. /// Receive a metadata part from the `Channel`.
  444. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  445. internal mutating func channelReadMetadata() -> Bool {
  446. switch self {
  447. case .idle:
  448. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  449. case var .active(state):
  450. let allow = state.channelStreamState.receiveHeaders()
  451. self = .active(state)
  452. return allow
  453. case .closed:
  454. return false
  455. }
  456. }
  457. /// Receive a message part from the `Channel`.
  458. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  459. internal func channelReadMessage() -> Bool {
  460. switch self {
  461. case .idle:
  462. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  463. case let .active(state):
  464. return state.channelStreamState.receiveMessage()
  465. case .closed:
  466. return false
  467. }
  468. }
  469. /// Receive an end-stream part from the `Channel`.
  470. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  471. internal mutating func channelReadEnd() -> Bool {
  472. switch self {
  473. case .idle:
  474. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  475. case var .active(state):
  476. let allow = state.channelStreamState.receiveEnd()
  477. self = .active(state)
  478. return allow
  479. case .closed:
  480. return false
  481. }
  482. }
  483. /// Send a response part from the observer to the interceptors.
  484. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  485. @inlinable
  486. internal mutating func sendResponsePartFromObserver(_ part: RPCStreamPart) -> Bool {
  487. switch self {
  488. case .idle:
  489. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  490. case var .active(state):
  491. // Avoid CoW-ing 'state'.
  492. self = .idle
  493. let allow: Bool
  494. switch part {
  495. case .metadata:
  496. allow = state.observerStreamState.sendHeaders()
  497. case .message:
  498. allow = state.observerStreamState.sendMessage()
  499. case .end:
  500. allow = state.observerStreamState.sendEnd()
  501. }
  502. // Restore the state.
  503. self = .active(state)
  504. return allow
  505. case .closed:
  506. return false
  507. }
  508. }
  509. /// Send a response part from the interceptors to the `Channel`.
  510. /// - Returns: True if the part should be forwarded to the `Channel`, false otherwise.
  511. @inlinable
  512. internal mutating func sendResponsePartFromInterceptors(_ part: RPCStreamPart) -> Bool {
  513. switch self {
  514. case .idle:
  515. preconditionFailure("Invalid state: can't send response on idle call")
  516. case var .active(state):
  517. // Avoid CoW-ing 'state'.
  518. self = .idle
  519. let allow: Bool
  520. switch part {
  521. case .metadata:
  522. allow = state.channelStreamState.sendHeaders()
  523. self = .active(state)
  524. case .message:
  525. allow = state.channelStreamState.sendMessage()
  526. self = .active(state)
  527. case .end:
  528. allow = state.channelStreamState.sendEnd()
  529. // We're sending end, we're no longer active.
  530. self = .closed
  531. }
  532. return allow
  533. case .closed:
  534. // We're already closed.
  535. return false
  536. }
  537. }
  538. /// A request part has traversed the interceptor pipeline, now send it to the observer.
  539. /// - Returns: True if the part should be forwarded to the observer, false otherwise.
  540. @inlinable
  541. internal mutating func receiveRequestPartFromInterceptors(_ part: RPCStreamPart) -> Bool {
  542. switch self {
  543. case .idle:
  544. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  545. case var .active(state):
  546. // Avoid CoW-ing `state`.
  547. self = .idle
  548. let allow: Bool
  549. // Does the active state allow us to send this?
  550. switch part {
  551. case .metadata:
  552. allow = state.observerStreamState.receiveHeaders()
  553. case .message:
  554. allow = state.observerStreamState.receiveMessage()
  555. case .end:
  556. allow = state.observerStreamState.receiveEnd()
  557. }
  558. // Put `state` back.
  559. self = .active(state)
  560. return allow
  561. case .closed:
  562. // We're closed, just ignore this.
  563. return false
  564. }
  565. }
  566. }
  567. // MARK: State Actions
  568. extension _BaseCallHandler {
  569. /// Receives a request part in the interceptor pipeline.
  570. @inlinable
  571. internal func _receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  572. self._pipeline?.receive(part)
  573. }
  574. /// Observe a request part. This just farms out to the subclass implementation for the
  575. /// appropriate part.
  576. @inlinable
  577. internal func _receiveRequestPartInObserver(_ part: GRPCServerRequestPart<RequestPayload>) {
  578. switch part {
  579. case let .metadata(headers):
  580. self.observeHeaders(headers)
  581. case let .message(request):
  582. self.observeRequest(request)
  583. case .end:
  584. self.observeEnd()
  585. }
  586. }
  587. /// Sends a response part into the interceptor pipeline.
  588. @inlinable
  589. internal func _sendResponsePartToInterceptors(
  590. _ part: GRPCServerResponsePart<ResponsePayload>,
  591. promise: EventLoopPromise<Void>?
  592. ) {
  593. if let pipeline = self._pipeline {
  594. pipeline.send(part, promise: promise)
  595. } else {
  596. promise?.fail(GRPCError.AlreadyComplete())
  597. }
  598. }
  599. /// Writes a response part to the `Channel`.
  600. @inlinable
  601. internal func _writeResponsePartToChannel(
  602. context: ChannelHandlerContext,
  603. part: GRPCServerResponsePart<ResponsePayload>,
  604. promise: EventLoopPromise<Void>?
  605. ) {
  606. let flush: Bool
  607. switch part {
  608. case let .metadata(headers):
  609. // Only flush if we're not unary: if we're unary we'll wait for the response and end before
  610. // emitting the flush.
  611. flush = self._callType != .unary
  612. context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise)
  613. case let .message(message, metadata):
  614. do {
  615. let serializedResponse = try self._responseSerializer.serialize(
  616. message,
  617. allocator: context.channel.allocator
  618. )
  619. context.write(
  620. self.wrapOutboundOut(.message(serializedResponse, metadata)),
  621. promise: promise
  622. )
  623. // Flush if we've been told to flush.
  624. flush = metadata.flush
  625. } catch {
  626. self.errorCaught(context: context, error: error)
  627. promise?.fail(error)
  628. return
  629. }
  630. case let .end(status, trailers):
  631. context.write(self.wrapOutboundOut(.end(status, trailers)), promise: promise)
  632. // Always flush on end.
  633. flush = true
  634. }
  635. if flush {
  636. context.flush()
  637. }
  638. }
  639. }