2
0

ServerStreamingCallHandler.swift 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Handles server-streaming calls. Calls the observer block with the request message.
  6. ///
  7. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  8. /// - To close the call and send the status, complete the status future returned by the observer block.
  9. public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  10. public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
  11. private var eventObserver: EventObserver?
  12. private var context: StreamingResponseCallContext<ResponseMessage>?
  13. public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver) {
  14. super.init()
  15. let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
  16. self.context = context
  17. self.eventObserver = eventObserverFactory(context)
  18. context.statusPromise.futureResult.whenComplete {
  19. // When done, reset references to avoid retain cycles.
  20. self.eventObserver = nil
  21. self.context = nil
  22. }
  23. }
  24. public override func processMessage(_ message: RequestMessage) {
  25. guard let eventObserver = self.eventObserver,
  26. let context = self.context else {
  27. //! FIXME: Better handle this error?
  28. print("multiple messages received on unary call")
  29. return
  30. }
  31. let resultFuture = eventObserver(message)
  32. resultFuture
  33. // Fulfill the status promise with whatever status the framework user has provided.
  34. .cascade(promise: context.statusPromise)
  35. self.eventObserver = nil
  36. }
  37. }