ServerStreamingCallHandler.swift 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles server-streaming calls. Calls the observer block with the request message.
  23. ///
  24. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  25. /// - To close the call and send the status, complete the status future returned by the observer block.
  26. public final class ServerStreamingCallHandler<
  27. RequestPayload,
  28. ResponsePayload
  29. >: _BaseCallHandler<RequestPayload, ResponsePayload> {
  30. public typealias EventObserver = (RequestPayload) -> EventLoopFuture<GRPCStatus>
  31. private var eventObserver: EventObserver?
  32. private var callContext: StreamingResponseCallContext<ResponsePayload>?
  33. private let eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
  34. internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  35. serializer: Serializer,
  36. deserializer: Deserializer,
  37. callHandlerContext: CallHandlerContext,
  38. eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
  39. ) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
  40. self.eventObserverFactory = eventObserverFactory
  41. super.init(
  42. callHandlerContext: callHandlerContext,
  43. codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
  44. )
  45. }
  46. override internal func processHeaders(_ headers: HPACKHeaders, context: ChannelHandlerContext) {
  47. let callContext = StreamingResponseCallContextImpl<ResponsePayload>(
  48. channel: context.channel,
  49. headers: headers,
  50. errorDelegate: self.callHandlerContext.errorDelegate,
  51. logger: self.callHandlerContext.logger
  52. )
  53. self.callContext = callContext
  54. self.eventObserver = self.eventObserverFactory(callContext)
  55. callContext.statusPromise.futureResult.whenComplete { _ in
  56. // When done, reset references to avoid retain cycles.
  57. self.eventObserver = nil
  58. self.callContext = nil
  59. }
  60. context.writeAndFlush(self.wrapOutboundOut(.headers([:])), promise: nil)
  61. }
  62. override internal func processMessage(_ message: RequestPayload) throws {
  63. guard let eventObserver = self.eventObserver,
  64. let callContext = self.callContext else {
  65. self.logger.error(
  66. "processMessage(_:) called before the call started or after the call completed",
  67. source: "GRPC"
  68. )
  69. throw GRPCError.StreamCardinalityViolation.request.captureContext()
  70. }
  71. let resultFuture = eventObserver(message)
  72. resultFuture
  73. // Fulfil the status promise with whatever status the framework user has provided.
  74. .cascade(to: callContext.statusPromise)
  75. self.eventObserver = nil
  76. }
  77. override internal func endOfStreamReceived() throws {
  78. if self.eventObserver != nil {
  79. throw GRPCError.StreamCardinalityViolation.request.captureContext()
  80. }
  81. }
  82. override internal func sendErrorStatusAndMetadata(_ statusAndMetadata: GRPCStatusAndTrailers) {
  83. if let trailers = statusAndMetadata.trailers {
  84. self.callContext?.trailers.add(contentsOf: trailers)
  85. }
  86. self.callContext?.statusPromise.fail(statusAndMetadata.status)
  87. }
  88. }