BidirectionalStreamingCallHandler.swift 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  22. ///
  23. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  24. /// If the framework user wants to return a call error (e.g. in case of authentication failure),
  25. /// they can fail the observer block future.
  26. /// - To close the call and send the status, complete `context.statusPromise`.
  27. public class BidirectionalStreamingCallHandler<
  28. RequestDeserializer: MessageDeserializer,
  29. ResponseSerializer: MessageSerializer
  30. >: _BaseCallHandler<RequestDeserializer, ResponseSerializer> {
  31. @usableFromInline
  32. internal typealias _Context = StreamingResponseCallContext<ResponsePayload>
  33. @usableFromInline
  34. internal typealias _Observer = EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  35. @usableFromInline
  36. internal var _callHandlerState: _CallHandlerState
  37. // See 'UnaryCallHandler.State'.
  38. @usableFromInline
  39. internal enum _CallHandlerState {
  40. case requestIdleResponseIdle((_Context) -> _Observer)
  41. case requestOpenResponseOpen(_Context, _Observer)
  42. case requestClosedResponseOpen(_Context)
  43. case requestClosedResponseClosed
  44. }
  45. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  46. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  47. @inlinable
  48. internal init(
  49. serializer: ResponseSerializer,
  50. deserializer: RequestDeserializer,
  51. callHandlerContext: CallHandlerContext,
  52. interceptors: [ServerInterceptor<RequestDeserializer.Output, ResponseSerializer.Input>],
  53. eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>)
  54. -> EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  55. ) {
  56. self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory)
  57. super.init(
  58. callHandlerContext: callHandlerContext,
  59. requestDeserializer: deserializer,
  60. responseSerializer: serializer,
  61. callType: .bidirectionalStreaming,
  62. interceptors: interceptors
  63. )
  64. }
  65. override public func channelInactive(context: ChannelHandlerContext) {
  66. super.channelInactive(context: context)
  67. // Fail any remaining promise.
  68. switch self._callHandlerState {
  69. case .requestIdleResponseIdle,
  70. .requestClosedResponseClosed:
  71. self._callHandlerState = .requestClosedResponseClosed
  72. case let .requestOpenResponseOpen(context, _),
  73. let .requestClosedResponseOpen(context):
  74. self._callHandlerState = .requestClosedResponseClosed
  75. context.statusPromise.fail(GRPCError.AlreadyComplete())
  76. }
  77. }
  78. /// Handle an error from the event observer.
  79. private func handleObserverError(_ error: Error) {
  80. switch self._callHandlerState {
  81. case .requestIdleResponseIdle:
  82. preconditionFailure("Invalid state: request observer hasn't been created")
  83. case let .requestOpenResponseOpen(context, _),
  84. let .requestClosedResponseOpen(context):
  85. let (status, trailers) = self.processObserverError(
  86. error,
  87. headers: context.headers,
  88. trailers: context.trailers
  89. )
  90. // This will handle the response promise as well.
  91. self.sendEnd(status: status, trailers: trailers)
  92. case .requestClosedResponseClosed:
  93. // We hit an error, but we're already closed (because we hit a library error first).
  94. ()
  95. }
  96. }
  97. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  98. private func handleLibraryError(_ error: Error) {
  99. switch self._callHandlerState {
  100. case .requestIdleResponseIdle,
  101. .requestOpenResponseOpen:
  102. // We'll never see end of stream, we'll close.
  103. let (status, trailers) = self.processLibraryError(error)
  104. self.sendEnd(status: status, trailers: trailers)
  105. case .requestClosedResponseOpen:
  106. // We've invoked the observer and seen the end of the request stream. We'll let this one play
  107. // out.
  108. ()
  109. case .requestClosedResponseClosed:
  110. // We're already closed, no need to do anything here.
  111. ()
  112. }
  113. }
  114. // MARK: - Inbound
  115. override func observeLibraryError(_ error: Error) {
  116. self.handleLibraryError(error)
  117. }
  118. override internal func observeHeaders(_ headers: HPACKHeaders) {
  119. switch self._callHandlerState {
  120. case let .requestIdleResponseIdle(factory):
  121. let context = _StreamingResponseCallContext<RequestPayload, ResponsePayload>(
  122. eventLoop: self.eventLoop,
  123. headers: headers,
  124. logger: self.logger,
  125. userInfoRef: self._userInfoRef,
  126. sendResponse: self.sendResponse(_:metadata:promise:)
  127. )
  128. let observer = factory(context)
  129. // Fully open. We'll send the response headers back in a moment.
  130. self._callHandlerState = .requestOpenResponseOpen(context, observer)
  131. // Register a failure callback for the observer failing.
  132. observer.whenFailure(self.handleObserverError(_:))
  133. // Register actions on the status promise.
  134. context.statusPromise.futureResult.whenComplete { result in
  135. switch result {
  136. case let .success(status):
  137. self.sendEnd(status: status, trailers: context.trailers)
  138. case let .failure(error):
  139. self.handleObserverError(error)
  140. }
  141. }
  142. // Write back the response headers.
  143. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  144. // The main state machine guards against this.
  145. case .requestOpenResponseOpen,
  146. .requestClosedResponseOpen,
  147. .requestClosedResponseClosed:
  148. preconditionFailure("Invalid state: request headers already received")
  149. }
  150. }
  151. override internal func observeRequest(_ message: RequestPayload) {
  152. switch self._callHandlerState {
  153. case .requestIdleResponseIdle:
  154. preconditionFailure("Invalid state: request received before headers")
  155. case let .requestOpenResponseOpen(_, observer):
  156. observer.whenSuccess {
  157. $0(.message(message))
  158. }
  159. case .requestClosedResponseOpen,
  160. .requestClosedResponseClosed:
  161. preconditionFailure("Invalid state: the request stream has already been closed")
  162. }
  163. }
  164. override internal func observeEnd() {
  165. switch self._callHandlerState {
  166. case .requestIdleResponseIdle:
  167. preconditionFailure("Invalid state: no request headers received")
  168. case let .requestOpenResponseOpen(context, observer):
  169. self._callHandlerState = .requestClosedResponseOpen(context)
  170. observer.whenSuccess {
  171. $0(.end)
  172. }
  173. case .requestClosedResponseOpen,
  174. .requestClosedResponseClosed:
  175. preconditionFailure("Invalid state: request stream is already closed")
  176. }
  177. }
  178. // MARK: - Outbound
  179. private func sendResponse(
  180. _ message: ResponsePayload,
  181. metadata: MessageMetadata,
  182. promise: EventLoopPromise<Void>?
  183. ) {
  184. switch self._callHandlerState {
  185. case .requestIdleResponseIdle:
  186. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  187. case .requestOpenResponseOpen,
  188. .requestClosedResponseOpen:
  189. self.sendResponsePartFromObserver(.message(message, metadata), promise: promise)
  190. case .requestClosedResponseClosed:
  191. // We're already closed. This isn't a precondition failure because we may have encountered
  192. // an error before the observer block completed.
  193. promise?.fail(GRPCError.AlreadyComplete())
  194. }
  195. }
  196. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  197. switch self._callHandlerState {
  198. case .requestIdleResponseIdle,
  199. .requestClosedResponseOpen:
  200. self._callHandlerState = .requestClosedResponseClosed
  201. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  202. case let .requestOpenResponseOpen(context, _):
  203. self._callHandlerState = .requestClosedResponseClosed
  204. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  205. // Fail the promise.
  206. context.statusPromise.fail(status)
  207. case .requestClosedResponseClosed:
  208. // Already closed, do nothing.
  209. ()
  210. }
  211. }
  212. }