ClientStreamingCallHandler.swift 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  6. ///
  7. /// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
  8. public class ClientStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  9. public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
  10. private var eventObserver: EventLoopFuture<EventObserver>?
  11. private var context: UnaryResponseCallContext<ResponseMessage>?
  12. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  13. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  14. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
  15. super.init(errorDelegate: errorDelegate)
  16. let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
  17. self.context = context
  18. let eventObserver = eventObserverFactory(context)
  19. self.eventObserver = eventObserver
  20. // Terminate the call if no observer is provided.
  21. eventObserver.cascadeFailure(promise: context.responsePromise)
  22. context.responsePromise.futureResult.whenComplete {
  23. // When done, reset references to avoid retain cycles.
  24. self.eventObserver = nil
  25. self.context = nil
  26. }
  27. }
  28. public override func processMessage(_ message: RequestMessage) {
  29. eventObserver?.whenSuccess { observer in
  30. observer(.message(message))
  31. }
  32. }
  33. public override func endOfStreamReceived() throws {
  34. eventObserver?.whenSuccess { observer in
  35. observer(.end)
  36. }
  37. }
  38. override func sendErrorStatus(_ status: GRPCStatus) {
  39. context?.responsePromise.fail(error: status)
  40. }
  41. }