2
0

BidirectionalStreamingCallHandler.swift 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  6. ///
  7. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  8. /// If the framework user wants to return a call error (e.g. in case of authentication failure),
  9. /// they can fail the observer block future.
  10. /// - To close the call and send the status, complete `context.statusPromise`.
  11. public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  12. public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
  13. private var eventObserver: EventLoopFuture<EventObserver>?
  14. private var context: StreamingResponseCallContext<ResponseMessage>?
  15. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  16. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  17. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
  18. super.init(errorDelegate: errorDelegate)
  19. let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
  20. self.context = context
  21. let eventObserver = eventObserverFactory(context)
  22. self.eventObserver = eventObserver
  23. context.statusPromise.futureResult.whenComplete {
  24. // When done, reset references to avoid retain cycles.
  25. self.eventObserver = nil
  26. self.context = nil
  27. }
  28. }
  29. public override func handlerAdded(ctx: ChannelHandlerContext) {
  30. guard let eventObserver = eventObserver,
  31. let context = context else { return }
  32. // Terminate the call if the future providing an observer fails.
  33. // This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
  34. // translate our outgoing `GRPCServerResponsePart<ResponseMessage>` message is already present on the channel.
  35. // Otherwise, our `OutboundOut` type would not match the `OutboundIn` type of the next handler on the channel.
  36. eventObserver.cascadeFailure(promise: context.statusPromise)
  37. }
  38. public override func processMessage(_ message: RequestMessage) {
  39. eventObserver?.whenSuccess { observer in
  40. observer(.message(message))
  41. }
  42. }
  43. public override func endOfStreamReceived() throws {
  44. eventObserver?.whenSuccess { observer in
  45. observer(.end)
  46. }
  47. }
  48. override func sendErrorStatus(_ status: GRPCStatus) {
  49. context?.statusPromise.fail(error: status)
  50. }
  51. }