2
0

BidirectionalStreamingCallHandler.swift 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  22. ///
  23. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  24. /// If the framework user wants to return a call error (e.g. in case of authentication failure),
  25. /// they can fail the observer block future.
  26. /// - To close the call and send the status, complete `context.statusPromise`.
  27. public class BidirectionalStreamingCallHandler<
  28. RequestPayload,
  29. ResponsePayload
  30. >: _BaseCallHandler<RequestPayload, ResponsePayload> {
  31. private typealias Context = StreamingResponseCallContext<ResponsePayload>
  32. private typealias Observer = EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  33. private var state: State
  34. // See 'UnaryCallHandler.State'.
  35. private enum State {
  36. case requestIdleResponseIdle((Context) -> Observer)
  37. case requestOpenResponseOpen(Context, Observer)
  38. case requestClosedResponseOpen(Context)
  39. case requestClosedResponseClosed
  40. }
  41. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  42. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  43. internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  44. serializer: Serializer,
  45. deserializer: Deserializer,
  46. callHandlerContext: CallHandlerContext,
  47. interceptors: [ServerInterceptor<Deserializer.Output, Serializer.Input>],
  48. eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>)
  49. -> EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  50. ) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
  51. self.state = .requestIdleResponseIdle(eventObserverFactory)
  52. super.init(
  53. callHandlerContext: callHandlerContext,
  54. codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer),
  55. callType: .bidirectionalStreaming,
  56. interceptors: interceptors
  57. )
  58. }
  59. override public func channelInactive(context: ChannelHandlerContext) {
  60. super.channelInactive(context: context)
  61. // Fail any remaining promise.
  62. switch self.state {
  63. case .requestIdleResponseIdle,
  64. .requestClosedResponseClosed:
  65. self.state = .requestClosedResponseClosed
  66. case let .requestOpenResponseOpen(context, _),
  67. let .requestClosedResponseOpen(context):
  68. self.state = .requestClosedResponseClosed
  69. context.statusPromise.fail(GRPCError.AlreadyComplete())
  70. }
  71. }
  72. /// Handle an error from the event observer.
  73. private func handleObserverError(_ error: Error) {
  74. switch self.state {
  75. case .requestIdleResponseIdle:
  76. preconditionFailure("Invalid state: request observer hasn't been created")
  77. case let .requestOpenResponseOpen(context, _),
  78. let .requestClosedResponseOpen(context):
  79. let (status, trailers) = self.processObserverError(
  80. error,
  81. headers: context.headers,
  82. trailers: context.trailers
  83. )
  84. // This will handle the response promise as well.
  85. self.sendEnd(status: status, trailers: trailers)
  86. case .requestClosedResponseClosed:
  87. // We hit an error, but we're already closed (because we hit a library error first).
  88. ()
  89. }
  90. }
  91. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  92. private func handleLibraryError(_ error: Error) {
  93. switch self.state {
  94. case .requestIdleResponseIdle,
  95. .requestOpenResponseOpen:
  96. // We'll never see end of stream, we'll close.
  97. let (status, trailers) = self.processLibraryError(error)
  98. self.sendEnd(status: status, trailers: trailers)
  99. case .requestClosedResponseOpen:
  100. // We've invoked the observer and seen the end of the request stream. We'll let this one play
  101. // out.
  102. ()
  103. case .requestClosedResponseClosed:
  104. // We're already closed, no need to do anything here.
  105. ()
  106. }
  107. }
  108. // MARK: - Inbound
  109. override func observeLibraryError(_ error: Error) {
  110. self.handleLibraryError(error)
  111. }
  112. override internal func observeHeaders(_ headers: HPACKHeaders) {
  113. switch self.state {
  114. case let .requestIdleResponseIdle(factory):
  115. let context = _StreamingResponseCallContext<RequestPayload, ResponsePayload>(
  116. eventLoop: self.eventLoop,
  117. headers: headers,
  118. logger: self.logger,
  119. userInfoRef: self.userInfoRef,
  120. sendResponse: self.sendResponse(_:metadata:promise:)
  121. )
  122. let observer = factory(context)
  123. // Fully open. We'll send the response headers back in a moment.
  124. self.state = .requestOpenResponseOpen(context, observer)
  125. // Register a failure callback for the observer failing.
  126. observer.whenFailure(self.handleObserverError(_:))
  127. // Register actions on the status promise.
  128. context.statusPromise.futureResult.whenComplete { result in
  129. switch result {
  130. case let .success(status):
  131. self.sendEnd(status: status, trailers: context.trailers)
  132. case let .failure(error):
  133. self.handleObserverError(error)
  134. }
  135. }
  136. // Write back the response headers.
  137. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  138. // The main state machine guards against this.
  139. case .requestOpenResponseOpen,
  140. .requestClosedResponseOpen,
  141. .requestClosedResponseClosed:
  142. preconditionFailure("Invalid state: request headers already received")
  143. }
  144. }
  145. override internal func observeRequest(_ message: RequestPayload) {
  146. switch self.state {
  147. case .requestIdleResponseIdle:
  148. preconditionFailure("Invalid state: request received before headers")
  149. case let .requestOpenResponseOpen(_, observer):
  150. observer.whenSuccess {
  151. $0(.message(message))
  152. }
  153. case .requestClosedResponseOpen,
  154. .requestClosedResponseClosed:
  155. preconditionFailure("Invalid state: the request stream has already been closed")
  156. }
  157. }
  158. override internal func observeEnd() {
  159. switch self.state {
  160. case .requestIdleResponseIdle:
  161. preconditionFailure("Invalid state: no request headers received")
  162. case let .requestOpenResponseOpen(context, observer):
  163. self.state = .requestClosedResponseOpen(context)
  164. observer.whenSuccess {
  165. $0(.end)
  166. }
  167. case .requestClosedResponseOpen,
  168. .requestClosedResponseClosed:
  169. preconditionFailure("Invalid state: request stream is already closed")
  170. }
  171. }
  172. // MARK: - Outbound
  173. private func sendResponse(
  174. _ message: ResponsePayload,
  175. metadata: MessageMetadata,
  176. promise: EventLoopPromise<Void>?
  177. ) {
  178. switch self.state {
  179. case .requestIdleResponseIdle:
  180. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  181. case .requestOpenResponseOpen,
  182. .requestClosedResponseOpen:
  183. self.sendResponsePartFromObserver(.message(message, metadata), promise: promise)
  184. case .requestClosedResponseClosed:
  185. // We're already closed. This isn't a precondition failure because we may have encountered
  186. // an error before the observer block completed.
  187. promise?.fail(GRPCError.AlreadyComplete())
  188. }
  189. }
  190. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  191. switch self.state {
  192. case .requestIdleResponseIdle,
  193. .requestClosedResponseOpen:
  194. self.state = .requestClosedResponseClosed
  195. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  196. case let .requestOpenResponseOpen(context, _):
  197. self.state = .requestClosedResponseClosed
  198. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  199. // Fail the promise.
  200. context.statusPromise.fail(status)
  201. case .requestClosedResponseClosed:
  202. // Already closed, do nothing.
  203. ()
  204. }
  205. }
  206. }