_BaseCallHandler.swift 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Provides a means for decoding incoming gRPC messages into protobuf objects.
  22. ///
  23. /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
  24. /// - Important: This is **NOT** part of the public API.
  25. public class _BaseCallHandler<
  26. RequestDeserializer: MessageDeserializer,
  27. ResponseSerializer: MessageSerializer
  28. >: GRPCCallHandler, ChannelInboundHandler {
  29. public typealias RequestPayload = RequestDeserializer.Output
  30. public typealias ResponsePayload = ResponseSerializer.Input
  31. public typealias InboundIn = _RawGRPCServerRequestPart
  32. public typealias OutboundOut = _RawGRPCServerResponsePart
  33. /// An interceptor pipeline.
  34. private var pipeline: ServerInterceptorPipeline<RequestPayload, ResponsePayload>?
  35. /// Our current state.
  36. private var state: State = .idle
  37. /// The type of this RPC, e.g. 'unary'.
  38. private let callType: GRPCCallType
  39. /// Some context provided to us from the routing handler.
  40. private let callHandlerContext: CallHandlerContext
  41. /// A request deserializer.
  42. private let requestDeserializer: RequestDeserializer
  43. /// A response serializer.
  44. private let responseSerializer: ResponseSerializer
  45. /// The event loop this call is being handled on.
  46. internal var eventLoop: EventLoop {
  47. return self.callHandlerContext.eventLoop
  48. }
  49. /// An error delegate.
  50. internal var errorDelegate: ServerErrorDelegate? {
  51. return self.callHandlerContext.errorDelegate
  52. }
  53. /// A logger.
  54. internal var logger: Logger {
  55. return self.callHandlerContext.logger
  56. }
  57. /// A reference to `UserInfo`.
  58. internal var userInfoRef: Ref<UserInfo>
  59. internal init(
  60. callHandlerContext: CallHandlerContext,
  61. requestDeserializr: RequestDeserializer,
  62. responseSerializer: ResponseSerializer,
  63. callType: GRPCCallType,
  64. interceptors: [ServerInterceptor<RequestPayload, ResponsePayload>]
  65. ) {
  66. let userInfoRef = Ref(UserInfo())
  67. self.requestDeserializer = requestDeserializr
  68. self.responseSerializer = responseSerializer
  69. self.callHandlerContext = callHandlerContext
  70. self.callType = callType
  71. self.userInfoRef = userInfoRef
  72. self.pipeline = ServerInterceptorPipeline(
  73. logger: callHandlerContext.logger,
  74. eventLoop: callHandlerContext.eventLoop,
  75. path: callHandlerContext.path,
  76. callType: callType,
  77. userInfoRef: userInfoRef,
  78. interceptors: interceptors,
  79. onRequestPart: self.receiveRequestPartFromInterceptors(_:),
  80. onResponsePart: self.sendResponsePartFromInterceptors(_:promise:)
  81. )
  82. }
  83. // MARK: - ChannelHandler
  84. public func handlerAdded(context: ChannelHandlerContext) {
  85. self.act(on: self.state.handlerAdded(context: context))
  86. }
  87. public func handlerRemoved(context: ChannelHandlerContext) {
  88. self.pipeline = nil
  89. }
  90. public func channelInactive(context: ChannelHandlerContext) {
  91. self.pipeline = nil
  92. context.fireChannelInactive()
  93. }
  94. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  95. self.act(on: self.state.errorCaught(error))
  96. }
  97. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  98. let part = self.unwrapInboundIn(data)
  99. switch part {
  100. case let .headers(headers):
  101. self.act(on: self.state.channelRead(.headers(headers)))
  102. case let .message(buffer):
  103. do {
  104. let request = try self.requestDeserializer.deserialize(byteBuffer: buffer)
  105. self.act(on: self.state.channelRead(.message(request)))
  106. } catch {
  107. self.errorCaught(context: context, error: error)
  108. }
  109. case .end:
  110. self.act(on: self.state.channelRead(.end))
  111. }
  112. // We're the last handler. We don't have anything to forward.
  113. }
  114. // MARK: - Event Observer
  115. internal func observeHeaders(_ headers: HPACKHeaders) {
  116. fatalError("must be overridden by subclasses")
  117. }
  118. internal func observeRequest(_ message: RequestPayload) {
  119. fatalError("must be overridden by subclasses")
  120. }
  121. internal func observeEnd() {
  122. fatalError("must be overridden by subclasses")
  123. }
  124. internal func observeLibraryError(_ error: Error) {
  125. fatalError("must be overridden by subclasses")
  126. }
  127. /// Send a response part to the interceptor pipeline. Called by an event observer.
  128. /// - Parameters:
  129. /// - part: The response part to send.
  130. /// - promise: A promise to complete once the response part has been written.
  131. internal func sendResponsePartFromObserver(
  132. _ part: GRPCServerResponsePart<ResponsePayload>,
  133. promise: EventLoopPromise<Void>?
  134. ) {
  135. self.act(on: self.state.sendResponsePartFromObserver(part, promise: promise))
  136. }
  137. /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
  138. /// - Parameter error: The error to process.
  139. /// - Returns: The status and trailers to send to the client.
  140. internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) {
  141. // Observe the error if we have a delegate.
  142. self.errorDelegate?.observeLibraryError(error)
  143. // What status are we terminating this RPC with?
  144. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  145. // them with any on the call context.
  146. // - If we don't have a delegate, then try to transform the error to a status.
  147. // - Fallback to a generic error.
  148. let status: GRPCStatus
  149. let trailers: HPACKHeaders
  150. if let transformed = self.errorDelegate?.transformLibraryError(error) {
  151. status = transformed.status
  152. trailers = transformed.trailers ?? [:]
  153. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  154. status = grpcStatusTransformable.makeGRPCStatus()
  155. trailers = [:]
  156. } else {
  157. // Eh... well, we don't what status to use. Use a generic one.
  158. status = .processingError
  159. trailers = [:]
  160. }
  161. return (status, trailers)
  162. }
  163. /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
  164. internal func processObserverError(
  165. _ error: Error,
  166. headers: HPACKHeaders,
  167. trailers: HPACKHeaders
  168. ) -> (GRPCStatus, HPACKHeaders) {
  169. // Observe the error if we have a delegate.
  170. self.errorDelegate?.observeRequestHandlerError(error, headers: headers)
  171. // What status are we terminating this RPC with?
  172. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  173. // them with any on the call context.
  174. // - If we don't have a delegate, then try to transform the error to a status.
  175. // - Fallback to a generic error.
  176. let status: GRPCStatus
  177. let mergedTrailers: HPACKHeaders
  178. if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) {
  179. status = transformed.status
  180. if var transformedTrailers = transformed.trailers {
  181. // The delegate returned trailers: merge in those from the context as well.
  182. transformedTrailers.add(contentsOf: trailers)
  183. mergedTrailers = transformedTrailers
  184. } else {
  185. mergedTrailers = trailers
  186. }
  187. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  188. status = grpcStatusTransformable.makeGRPCStatus()
  189. mergedTrailers = trailers
  190. } else {
  191. // Eh... well, we don't what status to use. Use a generic one.
  192. status = .processingError
  193. mergedTrailers = trailers
  194. }
  195. return (status, mergedTrailers)
  196. }
  197. }
  198. // MARK: - Interceptor API
  199. extension _BaseCallHandler {
  200. /// Receive a request part from the interceptors pipeline to forward to the event observer.
  201. /// - Parameter part: The request part to forward.
  202. private func receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  203. self.act(on: self.state.receiveRequestPartFromInterceptors(part))
  204. }
  205. /// Send a response part via the `Channel`. Called once the response part has traversed the
  206. /// interceptor pipeline.
  207. /// - Parameters:
  208. /// - part: The response part to send.
  209. /// - promise: A promise to complete once the response part has been written.
  210. private func sendResponsePartFromInterceptors(
  211. _ part: GRPCServerResponsePart<ResponsePayload>,
  212. promise: EventLoopPromise<Void>?
  213. ) {
  214. self.act(on: self.state.sendResponsePartFromInterceptors(part, promise: promise))
  215. }
  216. }
  217. // MARK: - State
  218. extension _BaseCallHandler {
  219. fileprivate enum State {
  220. /// Idle. We're waiting to be added to a pipeline.
  221. case idle
  222. /// We're in a pipeline and receiving from the client.
  223. case active(ActiveState)
  224. /// We're done. This state is terminal, all actions are ignored.
  225. case closed
  226. }
  227. }
  228. extension _BaseCallHandler.State {
  229. /// The state of the request and response streams.
  230. ///
  231. /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
  232. /// the interceptor pipeline and event observer.
  233. fileprivate enum StreamState {
  234. case requestIdleResponseIdle
  235. case requestOpenResponseIdle
  236. case requestOpenResponseOpen
  237. case requestClosedResponseIdle
  238. case requestClosedResponseOpen
  239. case requestClosedResponseClosed
  240. enum Filter {
  241. case allow
  242. case drop
  243. }
  244. mutating func receiveHeaders() -> Filter {
  245. switch self {
  246. case .requestIdleResponseIdle:
  247. self = .requestOpenResponseIdle
  248. return .allow
  249. case .requestOpenResponseIdle,
  250. .requestOpenResponseOpen,
  251. .requestClosedResponseIdle,
  252. .requestClosedResponseOpen,
  253. .requestClosedResponseClosed:
  254. return .drop
  255. }
  256. }
  257. func receiveMessage() -> Filter {
  258. switch self {
  259. case .requestOpenResponseIdle,
  260. .requestOpenResponseOpen:
  261. return .allow
  262. case .requestIdleResponseIdle,
  263. .requestClosedResponseIdle,
  264. .requestClosedResponseOpen,
  265. .requestClosedResponseClosed:
  266. return .drop
  267. }
  268. }
  269. mutating func receiveEnd() -> Filter {
  270. switch self {
  271. case .requestOpenResponseIdle:
  272. self = .requestClosedResponseIdle
  273. return .allow
  274. case .requestOpenResponseOpen:
  275. self = .requestClosedResponseOpen
  276. return .allow
  277. case .requestIdleResponseIdle,
  278. .requestClosedResponseIdle,
  279. .requestClosedResponseOpen,
  280. .requestClosedResponseClosed:
  281. return .drop
  282. }
  283. }
  284. mutating func sendHeaders() -> Filter {
  285. switch self {
  286. case .requestOpenResponseIdle:
  287. self = .requestOpenResponseOpen
  288. return .allow
  289. case .requestClosedResponseIdle:
  290. self = .requestClosedResponseOpen
  291. return .allow
  292. case .requestIdleResponseIdle,
  293. .requestOpenResponseOpen,
  294. .requestClosedResponseOpen,
  295. .requestClosedResponseClosed:
  296. return .drop
  297. }
  298. }
  299. func sendMessage() -> Filter {
  300. switch self {
  301. case .requestOpenResponseOpen,
  302. .requestClosedResponseOpen:
  303. return .allow
  304. case .requestIdleResponseIdle,
  305. .requestOpenResponseIdle,
  306. .requestClosedResponseIdle,
  307. .requestClosedResponseClosed:
  308. return .drop
  309. }
  310. }
  311. mutating func sendEnd() -> Filter {
  312. switch self {
  313. case .requestIdleResponseIdle:
  314. return .drop
  315. case .requestOpenResponseIdle,
  316. .requestOpenResponseOpen,
  317. .requestClosedResponseIdle,
  318. .requestClosedResponseOpen:
  319. self = .requestClosedResponseClosed
  320. return .allow
  321. case .requestClosedResponseClosed:
  322. return .drop
  323. }
  324. }
  325. }
  326. fileprivate struct ActiveState {
  327. var context: ChannelHandlerContext
  328. /// The stream state between the 'Channel' and interceptor pipeline.
  329. var channelStreamState: StreamState
  330. /// The stream state between the interceptor pipeline and event observer.
  331. var observerStreamState: StreamState
  332. init(context: ChannelHandlerContext) {
  333. self.context = context
  334. self.channelStreamState = .requestIdleResponseIdle
  335. self.observerStreamState = .requestIdleResponseIdle
  336. }
  337. }
  338. }
  339. extension _BaseCallHandler.State {
  340. fileprivate enum Action {
  341. /// Do nothing.
  342. case none
  343. /// Receive the request part in the interceptor pipeline.
  344. case receiveRequestPartInInterceptors(GRPCServerRequestPart<_BaseCallHandler.RequestPayload>)
  345. /// Receive the request part in the observer.
  346. case receiveRequestPartInObserver(GRPCServerRequestPart<_BaseCallHandler.RequestPayload>)
  347. /// Receive an error in the observer.
  348. case receiveLibraryErrorInObserver(Error)
  349. /// Send a response part to the interceptor pipeline.
  350. case sendResponsePartToInterceptors(
  351. GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
  352. EventLoopPromise<Void>?
  353. )
  354. /// Write the response part to the `Channel`.
  355. case writeResponsePartToChannel(
  356. ChannelHandlerContext,
  357. GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
  358. promise: EventLoopPromise<Void>?
  359. )
  360. /// Complete the promise with the result.
  361. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  362. /// Perform multiple actions.
  363. indirect case multiple([Action])
  364. }
  365. }
  366. extension _BaseCallHandler.State {
  367. /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
  368. /// state. We only expect this to be called once.
  369. internal mutating func handlerAdded(context: ChannelHandlerContext) -> Action {
  370. switch self {
  371. case .idle:
  372. // This is the only way we can become active.
  373. self = .active(.init(context: context))
  374. return .none
  375. case .active:
  376. preconditionFailure("Invalid state: already active")
  377. case .closed:
  378. return .none
  379. }
  380. }
  381. /// Received an error from the `Channel`.
  382. internal mutating func errorCaught(_ error: Error) -> Action {
  383. switch self {
  384. case .active:
  385. return .receiveLibraryErrorInObserver(error)
  386. case .idle, .closed:
  387. return .none
  388. }
  389. }
  390. /// Receive a request part from the `Channel`. If we're active we just forward these through the
  391. /// pipeline. We validate at the other end.
  392. internal mutating func channelRead(
  393. _ requestPart: _GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
  394. ) -> Action {
  395. switch self {
  396. case .idle:
  397. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  398. case var .active(state):
  399. // Avoid CoW-ing state.
  400. self = .idle
  401. let filter: StreamState.Filter
  402. let part: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
  403. switch requestPart {
  404. case let .headers(headers):
  405. filter = state.channelStreamState.receiveHeaders()
  406. part = .metadata(headers)
  407. case let .message(message):
  408. filter = state.channelStreamState.receiveMessage()
  409. part = .message(message)
  410. case .end:
  411. filter = state.channelStreamState.receiveEnd()
  412. part = .end
  413. }
  414. // Restore state.
  415. self = .active(state)
  416. switch filter {
  417. case .allow:
  418. return .receiveRequestPartInInterceptors(part)
  419. case .drop:
  420. return .none
  421. }
  422. case .closed:
  423. return .none
  424. }
  425. }
  426. /// Send a response part from the observer to the interceptors.
  427. internal mutating func sendResponsePartFromObserver(
  428. _ part: GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
  429. promise: EventLoopPromise<Void>?
  430. ) -> Action {
  431. switch self {
  432. case .idle:
  433. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  434. case var .active(state):
  435. // Avoid CoW-ing 'state'.
  436. self = .idle
  437. let filter: StreamState.Filter
  438. switch part {
  439. case .metadata:
  440. filter = state.observerStreamState.sendHeaders()
  441. case .message:
  442. filter = state.observerStreamState.sendMessage()
  443. case .end:
  444. filter = state.observerStreamState.sendEnd()
  445. }
  446. // Restore the state.
  447. self = .active(state)
  448. switch filter {
  449. case .allow:
  450. return .sendResponsePartToInterceptors(part, promise)
  451. case .drop:
  452. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  453. }
  454. case .closed:
  455. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  456. }
  457. }
  458. /// Send a response part from the interceptors to the `Channel`.
  459. internal mutating func sendResponsePartFromInterceptors(
  460. _ part: GRPCServerResponsePart<_BaseCallHandler.ResponsePayload>,
  461. promise: EventLoopPromise<Void>?
  462. ) -> Action {
  463. switch self {
  464. case .idle:
  465. preconditionFailure("Invalid state: can't send response on idle call")
  466. case var .active(state):
  467. // Avoid CoW-ing 'state'.
  468. self = .idle
  469. let filter: StreamState.Filter
  470. switch part {
  471. case .metadata:
  472. filter = state.channelStreamState.sendHeaders()
  473. self = .active(state)
  474. case .message:
  475. filter = state.channelStreamState.sendMessage()
  476. self = .active(state)
  477. case .end:
  478. filter = state.channelStreamState.sendEnd()
  479. // We're sending end, we're no longer active.
  480. self = .closed
  481. }
  482. switch filter {
  483. case .allow:
  484. return .writeResponsePartToChannel(state.context, part, promise: promise)
  485. case .drop:
  486. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  487. }
  488. case .closed:
  489. // We're already closed, fail any promise.
  490. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  491. }
  492. }
  493. /// A request part has traversed the interceptor pipeline, now send it to the observer.
  494. internal mutating func receiveRequestPartFromInterceptors(
  495. _ part: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
  496. ) -> Action {
  497. switch self {
  498. case .idle:
  499. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  500. case var .active(state):
  501. // Avoid CoW-ing `state`.
  502. self = .idle
  503. let filter: StreamState.Filter
  504. // Does the active state allow us to send this?
  505. switch part {
  506. case .metadata:
  507. filter = state.observerStreamState.receiveHeaders()
  508. case .message:
  509. filter = state.observerStreamState.receiveMessage()
  510. case .end:
  511. filter = state.observerStreamState.receiveEnd()
  512. }
  513. // Put `state` back.
  514. self = .active(state)
  515. switch filter {
  516. case .allow:
  517. return .receiveRequestPartInObserver(part)
  518. case .drop:
  519. return .none
  520. }
  521. case .closed:
  522. // We're closed, just ignore this.
  523. return .none
  524. }
  525. }
  526. }
  527. // MARK: State Actions
  528. extension _BaseCallHandler {
  529. private func act(on action: State.Action) {
  530. switch action {
  531. case .none:
  532. ()
  533. case let .receiveRequestPartInInterceptors(part):
  534. self.receiveRequestPartInInterceptors(part)
  535. case let .receiveRequestPartInObserver(part):
  536. self.receiveRequestPartInObserver(part)
  537. case let .receiveLibraryErrorInObserver(error):
  538. self.observeLibraryError(error)
  539. case let .sendResponsePartToInterceptors(part, promise):
  540. self.sendResponsePartToInterceptors(part, promise: promise)
  541. case let .writeResponsePartToChannel(context, part, promise):
  542. self.writeResponsePartToChannel(context: context, part: part, promise: promise)
  543. case let .completePromise(promise, result):
  544. promise?.completeWith(result)
  545. case let .multiple(actions):
  546. for action in actions {
  547. self.act(on: action)
  548. }
  549. }
  550. }
  551. /// Receives a request part in the interceptor pipeline.
  552. private func receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  553. self.pipeline?.receive(part)
  554. }
  555. /// Observe a request part. This just farms out to the subclass implementation for the
  556. /// appropriate part.
  557. private func receiveRequestPartInObserver(_ part: GRPCServerRequestPart<RequestPayload>) {
  558. switch part {
  559. case let .metadata(headers):
  560. self.observeHeaders(headers)
  561. case let .message(request):
  562. self.observeRequest(request)
  563. case .end:
  564. self.observeEnd()
  565. }
  566. }
  567. /// Sends a response part into the interceptor pipeline.
  568. private func sendResponsePartToInterceptors(
  569. _ part: GRPCServerResponsePart<ResponsePayload>,
  570. promise: EventLoopPromise<Void>?
  571. ) {
  572. if let pipeline = self.pipeline {
  573. pipeline.send(part, promise: promise)
  574. } else {
  575. promise?.fail(GRPCError.AlreadyComplete())
  576. }
  577. }
  578. /// Writes a response part to the `Channel`.
  579. private func writeResponsePartToChannel(
  580. context: ChannelHandlerContext,
  581. part: GRPCServerResponsePart<ResponsePayload>,
  582. promise: EventLoopPromise<Void>?
  583. ) {
  584. let flush: Bool
  585. switch part {
  586. case let .metadata(headers):
  587. // Only flush if we're streaming responses, if we're not streaming responses then we'll wait
  588. // for the response and end before emitting the flush.
  589. flush = self.callType.isStreamingResponses
  590. context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)
  591. case let .message(message, metadata):
  592. do {
  593. let serializedResponse = try self.responseSerializer.serialize(
  594. message,
  595. allocator: context.channel.allocator
  596. )
  597. context.write(
  598. self.wrapOutboundOut(.message(.init(serializedResponse, compressed: metadata.compress))),
  599. promise: promise
  600. )
  601. // Flush if we've been told to flush.
  602. flush = metadata.flush
  603. } catch {
  604. self.errorCaught(context: context, error: error)
  605. promise?.fail(error)
  606. return
  607. }
  608. case let .end(status, trailers):
  609. context.write(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
  610. // Always flush on end.
  611. flush = true
  612. }
  613. if flush {
  614. context.flush()
  615. }
  616. }
  617. }