BidirectionalStreamingCallHandler.swift 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. /// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  21. ///
  22. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  23. /// If the framework user wants to return a call error (e.g. in case of authentication failure),
  24. /// they can fail the observer block future.
  25. /// - To close the call and send the status, complete `context.statusPromise`.
  26. public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
  27. public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
  28. private var eventObserver: EventLoopFuture<EventObserver>?
  29. private var callContext: StreamingResponseCallContext<ResponseMessage>?
  30. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  31. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  32. public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
  33. super.init(errorDelegate: errorDelegate)
  34. let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request, errorDelegate: errorDelegate)
  35. self.callContext = context
  36. let eventObserver = eventObserverFactory(context)
  37. self.eventObserver = eventObserver
  38. context.statusPromise.futureResult.whenComplete { _ in
  39. // When done, reset references to avoid retain cycles.
  40. self.eventObserver = nil
  41. self.callContext = nil
  42. }
  43. }
  44. public override func handlerAdded(context: ChannelHandlerContext) {
  45. guard let eventObserver = self.eventObserver,
  46. let callContext = self.callContext else { return }
  47. // Terminate the call if the future providing an observer fails.
  48. // This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
  49. // translate our outgoing `GRPCServerResponsePart<ResponseMessage>` message is already present on the channel.
  50. // Otherwise, our `OutboundOut` type would not match the `OutboundIn` type of the next handler on the channel.
  51. eventObserver.cascadeFailure(to: callContext.statusPromise)
  52. }
  53. public override func processMessage(_ message: RequestMessage) {
  54. self.eventObserver?.whenSuccess { observer in
  55. observer(.message(message))
  56. }
  57. }
  58. public override func endOfStreamReceived() throws {
  59. self.eventObserver?.whenSuccess { observer in
  60. observer(.end)
  61. }
  62. }
  63. override func sendErrorStatus(_ status: GRPCStatus) {
  64. self.callContext?.statusPromise.fail(status)
  65. }
  66. }