2
0

ClientStreamingCallHandler.swift 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  23. ///
  24. /// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
  25. /// If the framework user wants to return a call error (e.g. in case of authentication failure),
  26. /// they can fail the observer block future.
  27. /// - To close the call and send the response, complete `context.responsePromise`.
  28. public final class ClientStreamingCallHandler<
  29. RequestDeserializer: MessageDeserializer,
  30. ResponseSerializer: MessageSerializer
  31. >: _BaseCallHandler<RequestDeserializer, ResponseSerializer> {
  32. @usableFromInline
  33. internal typealias _Context = UnaryResponseCallContext<ResponsePayload>
  34. @usableFromInline
  35. internal typealias _Observer = EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  36. @usableFromInline
  37. internal var _callHandlerState: _CallHandlerState
  38. // See 'UnaryCallHandler.State'.
  39. @usableFromInline
  40. internal enum _CallHandlerState {
  41. case requestIdleResponseIdle((_Context) -> _Observer)
  42. case requestOpenResponseOpen(_Context, _Observer)
  43. case requestClosedResponseOpen(_Context)
  44. case requestClosedResponseClosed
  45. }
  46. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  47. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  48. @inlinable
  49. internal init(
  50. serializer: ResponseSerializer,
  51. deserializer: RequestDeserializer,
  52. callHandlerContext: CallHandlerContext,
  53. interceptors: [ServerInterceptor<RequestDeserializer.Output, ResponseSerializer.Input>],
  54. eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>)
  55. -> EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  56. ) {
  57. self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory)
  58. super.init(
  59. callHandlerContext: callHandlerContext,
  60. requestDeserializer: deserializer,
  61. responseSerializer: serializer,
  62. callType: .clientStreaming,
  63. interceptors: interceptors
  64. )
  65. }
  66. override public func channelInactive(context: ChannelHandlerContext) {
  67. super.channelInactive(context: context)
  68. // Fail any remaining promise.
  69. switch self._callHandlerState {
  70. case .requestIdleResponseIdle,
  71. .requestClosedResponseClosed:
  72. self._callHandlerState = .requestClosedResponseClosed
  73. case let .requestOpenResponseOpen(context, _),
  74. let .requestClosedResponseOpen(context):
  75. self._callHandlerState = .requestClosedResponseClosed
  76. context.responsePromise.fail(GRPCError.AlreadyComplete())
  77. }
  78. }
  79. /// Handle an error from the event observer.
  80. private func handleObserverError(_ error: Error) {
  81. switch self._callHandlerState {
  82. case .requestIdleResponseIdle:
  83. preconditionFailure("Invalid state: request observer hasn't been created")
  84. case let .requestOpenResponseOpen(context, _),
  85. let .requestClosedResponseOpen(context):
  86. let (status, trailers) = self.processObserverError(
  87. error,
  88. headers: context.headers,
  89. trailers: context.trailers
  90. )
  91. // This will handle the response promise as well.
  92. self.sendEnd(status: status, trailers: trailers)
  93. case .requestClosedResponseClosed:
  94. // We hit an error, but we're already closed (because we hit a library error first).
  95. ()
  96. }
  97. }
  98. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  99. private func handleLibraryError(_ error: Error) {
  100. switch self._callHandlerState {
  101. case .requestIdleResponseIdle,
  102. .requestOpenResponseOpen:
  103. // We'll never see a request message, so just send end.
  104. let (status, trailers) = self.processLibraryError(error)
  105. self.sendEnd(status: status, trailers: trailers)
  106. case .requestClosedResponseOpen:
  107. // We've invoked the observer and have seen the end of the request stream. We'll let that
  108. // play out.
  109. ()
  110. case .requestClosedResponseClosed:
  111. // We're already closed, no need to do anything here.
  112. ()
  113. }
  114. }
  115. // MARK: - Inbound
  116. override func observeLibraryError(_ error: Error) {
  117. self.handleLibraryError(error)
  118. }
  119. override internal func observeHeaders(_ headers: HPACKHeaders) {
  120. switch self._callHandlerState {
  121. case let .requestIdleResponseIdle(factory):
  122. let context = UnaryResponseCallContext<ResponsePayload>(
  123. eventLoop: self.eventLoop,
  124. headers: headers,
  125. logger: self.logger,
  126. userInfoRef: self._userInfoRef
  127. )
  128. let observer = factory(context)
  129. // Fully open. We'll send the response headers back in a moment.
  130. self._callHandlerState = .requestOpenResponseOpen(context, observer)
  131. // Register a failure callback for the observer failing.
  132. observer.whenFailure(self.handleObserverError(_:))
  133. // Register callbacks on the response promise.
  134. context.responsePromise.futureResult.whenComplete { result in
  135. switch result {
  136. case let .success(response):
  137. self.sendResponse(response)
  138. case let .failure(error):
  139. self.handleObserverError(error)
  140. }
  141. }
  142. // Write back the response headers.
  143. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  144. // The main state machine guards against this.
  145. case .requestOpenResponseOpen,
  146. .requestClosedResponseOpen,
  147. .requestClosedResponseClosed:
  148. preconditionFailure("Invalid state: request headers already received")
  149. }
  150. }
  151. override internal func observeRequest(_ message: RequestPayload) {
  152. switch self._callHandlerState {
  153. case .requestIdleResponseIdle:
  154. preconditionFailure("Invalid state: request received before headers")
  155. case let .requestOpenResponseOpen(_, observer):
  156. observer.whenSuccess {
  157. $0(.message(message))
  158. }
  159. case .requestClosedResponseOpen,
  160. .requestClosedResponseClosed:
  161. preconditionFailure("Invalid state: the request stream has already been closed")
  162. }
  163. }
  164. override internal func observeEnd() {
  165. switch self._callHandlerState {
  166. case .requestIdleResponseIdle:
  167. preconditionFailure("Invalid state: no request headers received")
  168. case let .requestOpenResponseOpen(context, observer):
  169. self._callHandlerState = .requestClosedResponseOpen(context)
  170. observer.whenSuccess {
  171. $0(.end)
  172. }
  173. case .requestClosedResponseOpen,
  174. .requestClosedResponseClosed:
  175. preconditionFailure("Invalid state: request stream is already closed")
  176. }
  177. }
  178. // MARK: - Outbound
  179. private func sendResponse(_ message: ResponsePayload) {
  180. switch self._callHandlerState {
  181. case .requestIdleResponseIdle:
  182. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  183. case let .requestOpenResponseOpen(context, _),
  184. let .requestClosedResponseOpen(context):
  185. self._callHandlerState = .requestClosedResponseClosed
  186. self.sendResponsePartFromObserver(
  187. .message(message, .init(compress: context.compressionEnabled, flush: false)),
  188. promise: nil
  189. )
  190. self.sendResponsePartFromObserver(
  191. .end(context.responseStatus, context.trailers),
  192. promise: nil
  193. )
  194. case .requestClosedResponseClosed:
  195. // We're already closed. This isn't a precondition failure because we may have encountered
  196. // an error before the observer block completed.
  197. ()
  198. }
  199. }
  200. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  201. switch self._callHandlerState {
  202. case .requestIdleResponseIdle,
  203. .requestClosedResponseOpen:
  204. self._callHandlerState = .requestClosedResponseClosed
  205. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  206. case let .requestOpenResponseOpen(context, _):
  207. self._callHandlerState = .requestClosedResponseClosed
  208. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  209. // Fail the promise.
  210. context.responsePromise.fail(status)
  211. case .requestClosedResponseClosed:
  212. // Already closed, do nothing.
  213. ()
  214. }
  215. }
  216. }