ServerStreamingCallHandler.swift 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Handles server-streaming calls. Calls the observer block with the request message.
  6. ///
  7. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  8. /// - To close the call and send the status, complete the status future returned by the observer block.
  9. public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  10. public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
  11. private var eventObserver: EventObserver?
  12. private var context: StreamingResponseCallContext<ResponseMessage>?
  13. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver) {
  14. super.init(errorDelegate: errorDelegate)
  15. let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
  16. self.context = context
  17. self.eventObserver = eventObserverFactory(context)
  18. context.statusPromise.futureResult.whenComplete {
  19. // When done, reset references to avoid retain cycles.
  20. self.eventObserver = nil
  21. self.context = nil
  22. }
  23. }
  24. public override func processMessage(_ message: RequestMessage) throws {
  25. guard let eventObserver = self.eventObserver,
  26. let context = self.context else {
  27. throw GRPCError.server(.tooManyRequests)
  28. }
  29. let resultFuture = eventObserver(message)
  30. resultFuture
  31. // Fulfill the status promise with whatever status the framework user has provided.
  32. .cascade(promise: context.statusPromise)
  33. self.eventObserver = nil
  34. }
  35. public override func endOfStreamReceived() throws {
  36. if self.eventObserver != nil {
  37. throw GRPCError.server(.noRequestsButOneExpected)
  38. }
  39. }
  40. override func sendErrorStatus(_ status: GRPCStatus) {
  41. context?.statusPromise.fail(error: status)
  42. }
  43. }