UnaryCallHandler.swift 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// Handles unary calls. Calls the observer block with the request message.
  22. ///
  23. /// - The observer block is implemented by the framework user and returns a future containing the call result.
  24. /// - To return a response to the client, the framework user should complete that future
  25. /// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
  26. public final class UnaryCallHandler<
  27. RequestPayload: GRPCPayload,
  28. ResponsePayload: GRPCPayload
  29. >: _BaseCallHandler<RequestPayload, ResponsePayload> {
  30. public typealias EventObserver = (RequestPayload) -> EventLoopFuture<ResponsePayload>
  31. private var eventObserver: EventObserver?
  32. private var callContext: UnaryResponseCallContext<ResponsePayload>?
  33. private let eventObserverFactory: (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
  34. public init(
  35. callHandlerContext: CallHandlerContext,
  36. eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
  37. ) {
  38. self.eventObserverFactory = eventObserverFactory
  39. super.init(callHandlerContext: callHandlerContext)
  40. }
  41. internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
  42. let callContext = UnaryResponseCallContextImpl<ResponsePayload>(
  43. channel: context.channel,
  44. request: head,
  45. errorDelegate: self.errorDelegate,
  46. logger: self.logger
  47. )
  48. self.callContext = callContext
  49. self.eventObserver = self.eventObserverFactory(callContext)
  50. callContext.responsePromise.futureResult.whenComplete { _ in
  51. // When done, reset references to avoid retain cycles.
  52. self.eventObserver = nil
  53. self.callContext = nil
  54. }
  55. context.writeAndFlush(self.wrapOutboundOut(.headers([:])), promise: nil)
  56. }
  57. internal override func processMessage(_ message: RequestPayload) throws {
  58. guard let eventObserver = self.eventObserver,
  59. let context = self.callContext else {
  60. self.logger.error("processMessage(_:) called before the call started or after the call completed")
  61. throw GRPCError.StreamCardinalityViolation.request.captureContext()
  62. }
  63. let resultFuture = eventObserver(message)
  64. resultFuture
  65. // Fulfil the response promise with whatever response (or error) the framework user has provided.
  66. .cascade(to: context.responsePromise)
  67. self.eventObserver = nil
  68. }
  69. internal override func endOfStreamReceived() throws {
  70. if self.eventObserver != nil {
  71. throw GRPCError.StreamCardinalityViolation.request.captureContext()
  72. }
  73. }
  74. internal override func sendErrorStatus(_ status: GRPCStatus) {
  75. callContext?.responsePromise.fail(status)
  76. }
  77. }