ServerStreamingCallHandler.swift 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. /// Handles server-streaming calls. Calls the observer block with the request message.
  21. ///
  22. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  23. /// - To close the call and send the status, complete the status future returned by the observer block.
  24. public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  25. public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus>
  26. private var eventObserver: EventObserver?
  27. private var callContext: StreamingResponseCallContext<ResponseMessage>?
  28. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver) {
  29. super.init(errorDelegate: errorDelegate)
  30. let callContext = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request, errorDelegate: errorDelegate)
  31. self.callContext = callContext
  32. self.eventObserver = eventObserverFactory(callContext)
  33. callContext.statusPromise.futureResult.whenComplete { _ in
  34. // When done, reset references to avoid retain cycles.
  35. self.eventObserver = nil
  36. self.callContext = nil
  37. }
  38. }
  39. public override func processMessage(_ message: RequestMessage) throws {
  40. guard let eventObserver = self.eventObserver,
  41. let callContext = self.callContext else {
  42. throw GRPCError.server(.tooManyRequests)
  43. }
  44. let resultFuture = eventObserver(message)
  45. resultFuture
  46. // Fulfill the status promise with whatever status the framework user has provided.
  47. .cascade(to: callContext.statusPromise)
  48. self.eventObserver = nil
  49. }
  50. public override func endOfStreamReceived() throws {
  51. if self.eventObserver != nil {
  52. throw GRPCError.server(.noRequestsButOneExpected)
  53. }
  54. }
  55. override func sendErrorStatus(_ status: GRPCStatus) {
  56. self.callContext?.statusPromise.fail(status)
  57. }
  58. }