UnaryCallHandler.swift 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// Handles unary calls. Calls the observer block with the request message.
  22. ///
  23. /// - The observer block is implemented by the framework user and returns a future containing the call result.
  24. /// - To return a response to the client, the framework user should complete that future
  25. /// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
  26. public class UnaryCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  27. public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage>
  28. private var eventObserver: EventObserver?
  29. private var callContext: UnaryResponseCallContext<ResponseMessage>?
  30. public init(callHandlerContext: CallHandlerContext, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventObserver) {
  31. super.init(callHandlerContext: callHandlerContext)
  32. let callContext = UnaryResponseCallContextImpl<ResponseMessage>(
  33. channel: self.callHandlerContext.channel,
  34. request: self.callHandlerContext.request,
  35. errorDelegate: self.callHandlerContext.errorDelegate,
  36. logger: self.callHandlerContext.logger
  37. )
  38. self.callContext = callContext
  39. self.eventObserver = eventObserverFactory(callContext)
  40. callContext.responsePromise.futureResult.whenComplete { _ in
  41. // When done, reset references to avoid retain cycles.
  42. self.eventObserver = nil
  43. self.callContext = nil
  44. }
  45. }
  46. public override func processMessage(_ message: RequestMessage) throws {
  47. guard let eventObserver = self.eventObserver,
  48. let context = self.callContext else {
  49. self.logger.error("processMessage(_:) called before the call started or after the call completed")
  50. throw GRPCError.server(.tooManyRequests)
  51. }
  52. let resultFuture = eventObserver(message)
  53. resultFuture
  54. // Fulfill the response promise with whatever response (or error) the framework user has provided.
  55. .cascade(to: context.responsePromise)
  56. self.eventObserver = nil
  57. }
  58. public override func endOfStreamReceived() throws {
  59. if self.eventObserver != nil {
  60. throw GRPCError.server(.noRequestsButOneExpected)
  61. }
  62. }
  63. override func sendErrorStatus(_ status: GRPCStatus) {
  64. callContext?.responsePromise.fail(status)
  65. }
  66. }