BidirectionalStreamingCallHandler.swift 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  6. ///
  7. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  8. /// - To close the call and send the status, fulfill `context.statusPromise`.
  9. public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  10. public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
  11. private var eventObserver: EventLoopFuture<EventObserver>?
  12. private var context: StreamingResponseCallContext<ResponseMessage>?
  13. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  14. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  15. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
  16. super.init(errorDelegate: errorDelegate)
  17. let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
  18. self.context = context
  19. let eventObserver = eventObserverFactory(context)
  20. self.eventObserver = eventObserver
  21. // Terminate the call if no observer is provided.
  22. eventObserver.cascadeFailure(promise: context.statusPromise)
  23. context.statusPromise.futureResult.whenComplete {
  24. // When done, reset references to avoid retain cycles.
  25. self.eventObserver = nil
  26. self.context = nil
  27. }
  28. }
  29. public override func processMessage(_ message: RequestMessage) {
  30. eventObserver?.whenSuccess { observer in
  31. observer(.message(message))
  32. }
  33. }
  34. public override func endOfStreamReceived() throws {
  35. eventObserver?.whenSuccess { observer in
  36. observer(.end)
  37. }
  38. }
  39. override func sendErrorStatus(_ status: GRPCStatus) {
  40. context?.statusPromise.fail(error: status)
  41. }
  42. }