_BaseCallHandler.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Provides a means for decoding incoming gRPC messages into protobuf objects.
  22. ///
  23. /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
  24. /// - Important: This is **NOT** part of the public API.
  25. public class _BaseCallHandler<
  26. RequestDeserializer: MessageDeserializer,
  27. ResponseSerializer: MessageSerializer
  28. >: GRPCCallHandler, ChannelInboundHandler {
  29. public typealias RequestPayload = RequestDeserializer.Output
  30. public typealias ResponsePayload = ResponseSerializer.Input
  31. public typealias InboundIn = GRPCServerRequestPart<ByteBuffer>
  32. public typealias OutboundOut = GRPCServerResponsePart<ByteBuffer>
  33. /// An interceptor pipeline.
  34. @usableFromInline
  35. internal var _pipeline: ServerInterceptorPipeline<RequestPayload, ResponsePayload>?
  36. /// Our current state.
  37. @usableFromInline
  38. internal var _state: State = .idle
  39. /// The type of this RPC, e.g. 'unary'.
  40. @usableFromInline
  41. internal let _callType: GRPCCallType
  42. /// Some context provided to us from the routing handler.
  43. @usableFromInline
  44. internal let _callHandlerContext: CallHandlerContext
  45. /// A request deserializer.
  46. @usableFromInline
  47. internal let _requestDeserializer: RequestDeserializer
  48. /// A response serializer.
  49. @usableFromInline
  50. internal let _responseSerializer: ResponseSerializer
  51. /// The `ChannelHandlerContext`.
  52. @usableFromInline
  53. internal var _context: ChannelHandlerContext?
  54. /// The event loop this call is being handled on.
  55. @usableFromInline
  56. internal var eventLoop: EventLoop {
  57. return self._callHandlerContext.eventLoop
  58. }
  59. /// An error delegate.
  60. @usableFromInline
  61. internal var errorDelegate: ServerErrorDelegate? {
  62. return self._callHandlerContext.errorDelegate
  63. }
  64. /// A logger.
  65. @usableFromInline
  66. internal var logger: Logger {
  67. return self._callHandlerContext.logger
  68. }
  69. /// A reference to `UserInfo`.
  70. @usableFromInline
  71. internal var _userInfoRef: Ref<UserInfo>
  72. @inlinable
  73. internal init(
  74. callHandlerContext: CallHandlerContext,
  75. requestDeserializer: RequestDeserializer,
  76. responseSerializer: ResponseSerializer,
  77. callType: GRPCCallType,
  78. interceptors: [ServerInterceptor<RequestPayload, ResponsePayload>]
  79. ) {
  80. let userInfoRef = Ref(UserInfo())
  81. self._requestDeserializer = requestDeserializer
  82. self._responseSerializer = responseSerializer
  83. self._callHandlerContext = callHandlerContext
  84. self._callType = callType
  85. self._userInfoRef = userInfoRef
  86. self._pipeline = ServerInterceptorPipeline(
  87. logger: callHandlerContext.logger,
  88. eventLoop: callHandlerContext.eventLoop,
  89. path: callHandlerContext.path,
  90. callType: callType,
  91. userInfoRef: userInfoRef,
  92. interceptors: interceptors,
  93. onRequestPart: self._receiveRequestPartFromInterceptors(_:),
  94. onResponsePart: self._sendResponsePartFromInterceptors(_:promise:)
  95. )
  96. }
  97. // MARK: - ChannelHandler
  98. public func handlerAdded(context: ChannelHandlerContext) {
  99. self._state.handlerAdded()
  100. self._context = context
  101. }
  102. public func handlerRemoved(context: ChannelHandlerContext) {
  103. self._pipeline = nil
  104. self._context = nil
  105. }
  106. public func channelInactive(context: ChannelHandlerContext) {
  107. self._pipeline = nil
  108. context.fireChannelInactive()
  109. }
  110. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  111. if self._state.errorCaught() {
  112. self.observeLibraryError(error)
  113. }
  114. }
  115. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  116. let part = self.unwrapInboundIn(data)
  117. switch part {
  118. case let .metadata(headers):
  119. if self._state.channelReadMetadata() {
  120. self._receiveRequestPartInInterceptors(.metadata(headers))
  121. }
  122. case let .message(buffer):
  123. if self._state.channelReadMessage() {
  124. do {
  125. let request = try self._requestDeserializer.deserialize(byteBuffer: buffer)
  126. self._receiveRequestPartInInterceptors(.message(request))
  127. } catch {
  128. self.errorCaught(context: context, error: error)
  129. }
  130. }
  131. case .end:
  132. if self._state.channelReadEnd() {
  133. self._receiveRequestPartInInterceptors(.end)
  134. }
  135. }
  136. // We're the last handler. We don't have anything to forward.
  137. }
  138. // MARK: - Event Observer
  139. @inlinable
  140. internal func observeHeaders(_ headers: HPACKHeaders) {
  141. fatalError("must be overridden by subclasses")
  142. }
  143. @inlinable
  144. internal func observeRequest(_ message: RequestPayload) {
  145. fatalError("must be overridden by subclasses")
  146. }
  147. @inlinable
  148. internal func observeEnd() {
  149. fatalError("must be overridden by subclasses")
  150. }
  151. @inlinable
  152. internal func observeLibraryError(_ error: Error) {
  153. fatalError("must be overridden by subclasses")
  154. }
  155. /// Send a response part to the interceptor pipeline. Called by an event observer.
  156. /// - Parameters:
  157. /// - part: The response part to send.
  158. /// - promise: A promise to complete once the response part has been written.
  159. @inlinable
  160. internal final func sendResponsePartFromObserver(
  161. _ part: GRPCServerResponsePart<ResponsePayload>,
  162. promise: EventLoopPromise<Void>?
  163. ) {
  164. let forward: Bool
  165. switch part {
  166. case .metadata:
  167. forward = self._state.sendResponsePartFromObserver(.metadata)
  168. case .message:
  169. forward = self._state.sendResponsePartFromObserver(.message)
  170. case .end:
  171. forward = self._state.sendResponsePartFromObserver(.end)
  172. }
  173. if forward {
  174. self._sendResponsePartToInterceptors(part, promise: promise)
  175. } else {
  176. promise?.fail(GRPCError.AlreadyComplete())
  177. }
  178. }
  179. /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
  180. /// - Parameter error: The error to process.
  181. /// - Returns: The status and trailers to send to the client.
  182. internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) {
  183. // Observe the error if we have a delegate.
  184. self.errorDelegate?.observeLibraryError(error)
  185. // What status are we terminating this RPC with?
  186. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  187. // them with any on the call context.
  188. // - If we don't have a delegate, then try to transform the error to a status.
  189. // - Fallback to a generic error.
  190. let status: GRPCStatus
  191. let trailers: HPACKHeaders
  192. if let transformed = self.errorDelegate?.transformLibraryError(error) {
  193. status = transformed.status
  194. trailers = transformed.trailers ?? [:]
  195. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  196. status = grpcStatusTransformable.makeGRPCStatus()
  197. trailers = [:]
  198. } else {
  199. // Eh... well, we don't what status to use. Use a generic one.
  200. status = .processingError
  201. trailers = [:]
  202. }
  203. return (status, trailers)
  204. }
  205. /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
  206. internal func processObserverError(
  207. _ error: Error,
  208. headers: HPACKHeaders,
  209. trailers: HPACKHeaders
  210. ) -> (GRPCStatus, HPACKHeaders) {
  211. // Observe the error if we have a delegate.
  212. self.errorDelegate?.observeRequestHandlerError(error, headers: headers)
  213. // What status are we terminating this RPC with?
  214. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  215. // them with any on the call context.
  216. // - If we don't have a delegate, then try to transform the error to a status.
  217. // - Fallback to a generic error.
  218. let status: GRPCStatus
  219. let mergedTrailers: HPACKHeaders
  220. if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) {
  221. status = transformed.status
  222. if var transformedTrailers = transformed.trailers {
  223. // The delegate returned trailers: merge in those from the context as well.
  224. transformedTrailers.add(contentsOf: trailers)
  225. mergedTrailers = transformedTrailers
  226. } else {
  227. mergedTrailers = trailers
  228. }
  229. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  230. status = grpcStatusTransformable.makeGRPCStatus()
  231. mergedTrailers = trailers
  232. } else {
  233. // Eh... well, we don't what status to use. Use a generic one.
  234. status = .processingError
  235. mergedTrailers = trailers
  236. }
  237. return (status, mergedTrailers)
  238. }
  239. }
  240. // MARK: - Interceptor API
  241. extension _BaseCallHandler {
  242. /// Receive a request part from the interceptors pipeline to forward to the event observer.
  243. /// - Parameter part: The request part to forward.
  244. @inlinable
  245. internal func _receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  246. let forward: Bool
  247. switch part {
  248. case .metadata:
  249. forward = self._state.receiveRequestPartFromInterceptors(.metadata)
  250. case .message:
  251. forward = self._state.receiveRequestPartFromInterceptors(.message)
  252. case .end:
  253. forward = self._state.receiveRequestPartFromInterceptors(.end)
  254. }
  255. if forward {
  256. self._receiveRequestPartInObserver(part)
  257. }
  258. }
  259. /// Send a response part via the `Channel`. Called once the response part has traversed the
  260. /// interceptor pipeline.
  261. /// - Parameters:
  262. /// - part: The response part to send.
  263. /// - promise: A promise to complete once the response part has been written.
  264. @inlinable
  265. internal func _sendResponsePartFromInterceptors(
  266. _ part: GRPCServerResponsePart<ResponsePayload>,
  267. promise: EventLoopPromise<Void>?
  268. ) {
  269. let forward: Bool
  270. switch part {
  271. case .metadata:
  272. forward = self._state.sendResponsePartFromInterceptors(.metadata)
  273. case .message:
  274. forward = self._state.sendResponsePartFromInterceptors(.message)
  275. case .end:
  276. forward = self._state.sendResponsePartFromInterceptors(.end)
  277. }
  278. if forward, let context = self._context {
  279. self._writeResponsePartToChannel(context: context, part: part, promise: promise)
  280. } else {
  281. promise?.fail(GRPCError.AlreadyComplete())
  282. }
  283. }
  284. }
  285. // MARK: - State
  286. @usableFromInline
  287. internal enum State {
  288. /// Idle. We're waiting to be added to a pipeline.
  289. case idle
  290. /// We're in a pipeline and receiving from the client.
  291. case active(ActiveState)
  292. /// We're done. This state is terminal, all actions are ignored.
  293. case closed
  294. }
  295. @usableFromInline
  296. internal enum RPCStreamPart {
  297. case metadata
  298. case message
  299. case end
  300. }
  301. extension State {
  302. /// The state of the request and response streams.
  303. ///
  304. /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
  305. /// the interceptor pipeline and event observer.
  306. @usableFromInline
  307. enum StreamState {
  308. case requestIdleResponseIdle
  309. case requestOpenResponseIdle
  310. case requestOpenResponseOpen
  311. case requestClosedResponseIdle
  312. case requestClosedResponseOpen
  313. case requestClosedResponseClosed
  314. @inlinable
  315. mutating func receiveHeaders() -> Bool {
  316. switch self {
  317. case .requestIdleResponseIdle:
  318. self = .requestOpenResponseIdle
  319. return true
  320. case .requestOpenResponseIdle,
  321. .requestOpenResponseOpen,
  322. .requestClosedResponseIdle,
  323. .requestClosedResponseOpen,
  324. .requestClosedResponseClosed:
  325. return false
  326. }
  327. }
  328. @inlinable
  329. func receiveMessage() -> Bool {
  330. switch self {
  331. case .requestOpenResponseIdle,
  332. .requestOpenResponseOpen:
  333. return true
  334. case .requestIdleResponseIdle,
  335. .requestClosedResponseIdle,
  336. .requestClosedResponseOpen,
  337. .requestClosedResponseClosed:
  338. return false
  339. }
  340. }
  341. @inlinable
  342. mutating func receiveEnd() -> Bool {
  343. switch self {
  344. case .requestOpenResponseIdle:
  345. self = .requestClosedResponseIdle
  346. return true
  347. case .requestOpenResponseOpen:
  348. self = .requestClosedResponseOpen
  349. return true
  350. case .requestIdleResponseIdle,
  351. .requestClosedResponseIdle,
  352. .requestClosedResponseOpen,
  353. .requestClosedResponseClosed:
  354. return false
  355. }
  356. }
  357. @inlinable
  358. mutating func sendHeaders() -> Bool {
  359. switch self {
  360. case .requestOpenResponseIdle:
  361. self = .requestOpenResponseOpen
  362. return true
  363. case .requestClosedResponseIdle:
  364. self = .requestClosedResponseOpen
  365. return true
  366. case .requestIdleResponseIdle,
  367. .requestOpenResponseOpen,
  368. .requestClosedResponseOpen,
  369. .requestClosedResponseClosed:
  370. return false
  371. }
  372. }
  373. @inlinable
  374. func sendMessage() -> Bool {
  375. switch self {
  376. case .requestOpenResponseOpen,
  377. .requestClosedResponseOpen:
  378. return true
  379. case .requestIdleResponseIdle,
  380. .requestOpenResponseIdle,
  381. .requestClosedResponseIdle,
  382. .requestClosedResponseClosed:
  383. return false
  384. }
  385. }
  386. @inlinable
  387. mutating func sendEnd() -> Bool {
  388. switch self {
  389. case .requestIdleResponseIdle:
  390. return false
  391. case .requestOpenResponseIdle,
  392. .requestOpenResponseOpen,
  393. .requestClosedResponseIdle,
  394. .requestClosedResponseOpen:
  395. self = .requestClosedResponseClosed
  396. return true
  397. case .requestClosedResponseClosed:
  398. return false
  399. }
  400. }
  401. }
  402. @usableFromInline
  403. struct ActiveState {
  404. /// The stream state between the 'Channel' and interceptor pipeline.
  405. @usableFromInline
  406. var channelStreamState: StreamState
  407. /// The stream state between the interceptor pipeline and event observer.
  408. @usableFromInline
  409. var observerStreamState: StreamState
  410. @inlinable
  411. init() {
  412. self.channelStreamState = .requestIdleResponseIdle
  413. self.observerStreamState = .requestIdleResponseIdle
  414. }
  415. }
  416. }
  417. extension State {
  418. /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
  419. /// state. We only expect this to be called once.
  420. internal mutating func handlerAdded() {
  421. switch self {
  422. case .idle:
  423. // This is the only way we can become active.
  424. self = .active(.init())
  425. case .active:
  426. preconditionFailure("Invalid state: already active")
  427. case .closed:
  428. ()
  429. }
  430. }
  431. /// Received an error from the `Channel`.
  432. /// - Returns: True if the error should be forwarded to the error observer, or false if it should
  433. /// be dropped.
  434. internal func errorCaught() -> Bool {
  435. switch self {
  436. case .active:
  437. return true
  438. case .idle, .closed:
  439. return false
  440. }
  441. }
  442. /// Receive a metadata part from the `Channel`.
  443. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  444. internal mutating func channelReadMetadata() -> Bool {
  445. switch self {
  446. case .idle:
  447. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  448. case var .active(state):
  449. let allow = state.channelStreamState.receiveHeaders()
  450. self = .active(state)
  451. return allow
  452. case .closed:
  453. return false
  454. }
  455. }
  456. /// Receive a message part from the `Channel`.
  457. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  458. internal func channelReadMessage() -> Bool {
  459. switch self {
  460. case .idle:
  461. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  462. case let .active(state):
  463. return state.channelStreamState.receiveMessage()
  464. case .closed:
  465. return false
  466. }
  467. }
  468. /// Receive an end-stream part from the `Channel`.
  469. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  470. internal mutating func channelReadEnd() -> Bool {
  471. switch self {
  472. case .idle:
  473. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  474. case var .active(state):
  475. let allow = state.channelStreamState.receiveEnd()
  476. self = .active(state)
  477. return allow
  478. case .closed:
  479. return false
  480. }
  481. }
  482. /// Send a response part from the observer to the interceptors.
  483. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  484. @inlinable
  485. internal mutating func sendResponsePartFromObserver(_ part: RPCStreamPart) -> Bool {
  486. switch self {
  487. case .idle:
  488. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  489. case var .active(state):
  490. // Avoid CoW-ing 'state'.
  491. self = .idle
  492. let allow: Bool
  493. switch part {
  494. case .metadata:
  495. allow = state.observerStreamState.sendHeaders()
  496. case .message:
  497. allow = state.observerStreamState.sendMessage()
  498. case .end:
  499. allow = state.observerStreamState.sendEnd()
  500. }
  501. // Restore the state.
  502. self = .active(state)
  503. return allow
  504. case .closed:
  505. return false
  506. }
  507. }
  508. /// Send a response part from the interceptors to the `Channel`.
  509. /// - Returns: True if the part should be forwarded to the `Channel`, false otherwise.
  510. @inlinable
  511. internal mutating func sendResponsePartFromInterceptors(_ part: RPCStreamPart) -> Bool {
  512. switch self {
  513. case .idle:
  514. preconditionFailure("Invalid state: can't send response on idle call")
  515. case var .active(state):
  516. // Avoid CoW-ing 'state'.
  517. self = .idle
  518. let allow: Bool
  519. switch part {
  520. case .metadata:
  521. allow = state.channelStreamState.sendHeaders()
  522. self = .active(state)
  523. case .message:
  524. allow = state.channelStreamState.sendMessage()
  525. self = .active(state)
  526. case .end:
  527. allow = state.channelStreamState.sendEnd()
  528. // We're sending end, we're no longer active.
  529. self = .closed
  530. }
  531. return allow
  532. case .closed:
  533. // We're already closed.
  534. return false
  535. }
  536. }
  537. /// A request part has traversed the interceptor pipeline, now send it to the observer.
  538. /// - Returns: True if the part should be forwarded to the observer, false otherwise.
  539. @inlinable
  540. internal mutating func receiveRequestPartFromInterceptors(_ part: RPCStreamPart) -> Bool {
  541. switch self {
  542. case .idle:
  543. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  544. case var .active(state):
  545. // Avoid CoW-ing `state`.
  546. self = .idle
  547. let allow: Bool
  548. // Does the active state allow us to send this?
  549. switch part {
  550. case .metadata:
  551. allow = state.observerStreamState.receiveHeaders()
  552. case .message:
  553. allow = state.observerStreamState.receiveMessage()
  554. case .end:
  555. allow = state.observerStreamState.receiveEnd()
  556. }
  557. // Put `state` back.
  558. self = .active(state)
  559. return allow
  560. case .closed:
  561. // We're closed, just ignore this.
  562. return false
  563. }
  564. }
  565. }
  566. // MARK: State Actions
  567. extension _BaseCallHandler {
  568. /// Receives a request part in the interceptor pipeline.
  569. @inlinable
  570. internal func _receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  571. self._pipeline?.receive(part)
  572. }
  573. /// Observe a request part. This just farms out to the subclass implementation for the
  574. /// appropriate part.
  575. @inlinable
  576. internal func _receiveRequestPartInObserver(_ part: GRPCServerRequestPart<RequestPayload>) {
  577. switch part {
  578. case let .metadata(headers):
  579. self.observeHeaders(headers)
  580. case let .message(request):
  581. self.observeRequest(request)
  582. case .end:
  583. self.observeEnd()
  584. }
  585. }
  586. /// Sends a response part into the interceptor pipeline.
  587. @inlinable
  588. internal func _sendResponsePartToInterceptors(
  589. _ part: GRPCServerResponsePart<ResponsePayload>,
  590. promise: EventLoopPromise<Void>?
  591. ) {
  592. if let pipeline = self._pipeline {
  593. pipeline.send(part, promise: promise)
  594. } else {
  595. promise?.fail(GRPCError.AlreadyComplete())
  596. }
  597. }
  598. /// Writes a response part to the `Channel`.
  599. @inlinable
  600. internal func _writeResponsePartToChannel(
  601. context: ChannelHandlerContext,
  602. part: GRPCServerResponsePart<ResponsePayload>,
  603. promise: EventLoopPromise<Void>?
  604. ) {
  605. let flush: Bool
  606. switch part {
  607. case let .metadata(headers):
  608. // Only flush if we're not unary: if we're unary we'll wait for the response and end before
  609. // emitting the flush.
  610. flush = self._callType != .unary
  611. context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise)
  612. case let .message(message, metadata):
  613. do {
  614. let serializedResponse = try self._responseSerializer.serialize(
  615. message,
  616. allocator: context.channel.allocator
  617. )
  618. context.write(
  619. self.wrapOutboundOut(.message(serializedResponse, metadata)),
  620. promise: promise
  621. )
  622. // Flush if we've been told to flush.
  623. flush = metadata.flush
  624. } catch {
  625. self.errorCaught(context: context, error: error)
  626. promise?.fail(error)
  627. return
  628. }
  629. case let .end(status, trailers):
  630. context.write(self.wrapOutboundOut(.end(status, trailers)), promise: promise)
  631. // Always flush on end.
  632. flush = true
  633. }
  634. if flush {
  635. context.flush()
  636. }
  637. }
  638. }