UnaryCallHandler.swift 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Handles unary calls. Calls the observer block with the request message.
  6. ///
  7. /// - The observer block is implemented by the framework user and returns a future containing the call result.
  8. /// - To return a response to the client, the framework user should complete that future
  9. /// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
  10. public class UnaryCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  11. public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage>
  12. private var eventObserver: EventObserver?
  13. private var context: UnaryResponseCallContext<ResponseMessage>?
  14. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventObserver) {
  15. super.init(errorDelegate: errorDelegate)
  16. let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request)
  17. self.context = context
  18. self.eventObserver = eventObserverFactory(context)
  19. context.responsePromise.futureResult.whenComplete {
  20. // When done, reset references to avoid retain cycles.
  21. self.eventObserver = nil
  22. self.context = nil
  23. }
  24. }
  25. public override func processMessage(_ message: RequestMessage) throws {
  26. guard let eventObserver = self.eventObserver,
  27. let context = self.context else {
  28. throw GRPCError.server(.tooManyRequests)
  29. }
  30. let resultFuture = eventObserver(message)
  31. resultFuture
  32. // Fulfill the response promise with whatever response (or error) the framework user has provided.
  33. .cascade(promise: context.responsePromise)
  34. self.eventObserver = nil
  35. }
  36. public override func endOfStreamReceived() throws {
  37. if self.eventObserver != nil {
  38. throw GRPCError.server(.noRequestsButOneExpected)
  39. }
  40. }
  41. override func sendErrorStatus(_ status: GRPCStatus) {
  42. context?.responsePromise.fail(error: status)
  43. }
  44. }