ClientStreamingCallHandler.swift 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
  23. ///
  24. /// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
  25. /// If the framework user wants to return a call error (e.g. in case of authentication failure),
  26. /// they can fail the observer block future.
  27. /// - To close the call and send the response, complete `context.responsePromise`.
  28. public final class ClientStreamingCallHandler<
  29. RequestDeserializer: MessageDeserializer,
  30. ResponseSerializer: MessageSerializer
  31. >: _BaseCallHandler<RequestDeserializer, ResponseSerializer> {
  32. private typealias Context = UnaryResponseCallContext<ResponsePayload>
  33. private typealias Observer = EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  34. private var state: State
  35. // See 'UnaryCallHandler.State'.
  36. private enum State {
  37. case requestIdleResponseIdle((Context) -> Observer)
  38. case requestOpenResponseOpen(Context, Observer)
  39. case requestClosedResponseOpen(Context)
  40. case requestClosedResponseClosed
  41. }
  42. // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
  43. // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
  44. internal init(
  45. serializer: ResponseSerializer,
  46. deserializer: RequestDeserializer,
  47. callHandlerContext: CallHandlerContext,
  48. interceptors: [ServerInterceptor<RequestDeserializer.Output, ResponseSerializer.Input>],
  49. eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>)
  50. -> EventLoopFuture<(StreamEvent<RequestPayload>) -> Void>
  51. ) {
  52. self.state = .requestIdleResponseIdle(eventObserverFactory)
  53. super.init(
  54. callHandlerContext: callHandlerContext,
  55. requestDeserializr: deserializer,
  56. responseSerializer: serializer,
  57. callType: .clientStreaming,
  58. interceptors: interceptors
  59. )
  60. }
  61. override public func channelInactive(context: ChannelHandlerContext) {
  62. super.channelInactive(context: context)
  63. // Fail any remaining promise.
  64. switch self.state {
  65. case .requestIdleResponseIdle,
  66. .requestClosedResponseClosed:
  67. self.state = .requestClosedResponseClosed
  68. case let .requestOpenResponseOpen(context, _),
  69. let .requestClosedResponseOpen(context):
  70. self.state = .requestClosedResponseClosed
  71. context.responsePromise.fail(GRPCError.AlreadyComplete())
  72. }
  73. }
  74. /// Handle an error from the event observer.
  75. private func handleObserverError(_ error: Error) {
  76. switch self.state {
  77. case .requestIdleResponseIdle:
  78. preconditionFailure("Invalid state: request observer hasn't been created")
  79. case let .requestOpenResponseOpen(context, _),
  80. let .requestClosedResponseOpen(context):
  81. let (status, trailers) = self.processObserverError(
  82. error,
  83. headers: context.headers,
  84. trailers: context.trailers
  85. )
  86. // This will handle the response promise as well.
  87. self.sendEnd(status: status, trailers: trailers)
  88. case .requestClosedResponseClosed:
  89. // We hit an error, but we're already closed (because we hit a library error first).
  90. ()
  91. }
  92. }
  93. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  94. private func handleLibraryError(_ error: Error) {
  95. switch self.state {
  96. case .requestIdleResponseIdle,
  97. .requestOpenResponseOpen:
  98. // We'll never see a request message, so just send end.
  99. let (status, trailers) = self.processLibraryError(error)
  100. self.sendEnd(status: status, trailers: trailers)
  101. case .requestClosedResponseOpen:
  102. // We've invoked the observer and have seen the end of the request stream. We'll let that
  103. // play out.
  104. ()
  105. case .requestClosedResponseClosed:
  106. // We're already closed, no need to do anything here.
  107. ()
  108. }
  109. }
  110. // MARK: - Inbound
  111. override func observeLibraryError(_ error: Error) {
  112. self.handleLibraryError(error)
  113. }
  114. override internal func observeHeaders(_ headers: HPACKHeaders) {
  115. switch self.state {
  116. case let .requestIdleResponseIdle(factory):
  117. let context = UnaryResponseCallContext<ResponsePayload>(
  118. eventLoop: self.eventLoop,
  119. headers: headers,
  120. logger: self.logger,
  121. userInfoRef: self.userInfoRef
  122. )
  123. let observer = factory(context)
  124. // Fully open. We'll send the response headers back in a moment.
  125. self.state = .requestOpenResponseOpen(context, observer)
  126. // Register a failure callback for the observer failing.
  127. observer.whenFailure(self.handleObserverError(_:))
  128. // Register callbacks on the response promise.
  129. context.responsePromise.futureResult.whenComplete { result in
  130. switch result {
  131. case let .success(response):
  132. self.sendResponse(response)
  133. case let .failure(error):
  134. self.handleObserverError(error)
  135. }
  136. }
  137. // Write back the response headers.
  138. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  139. // The main state machine guards against this.
  140. case .requestOpenResponseOpen,
  141. .requestClosedResponseOpen,
  142. .requestClosedResponseClosed:
  143. preconditionFailure("Invalid state: request headers already received")
  144. }
  145. }
  146. override internal func observeRequest(_ message: RequestPayload) {
  147. switch self.state {
  148. case .requestIdleResponseIdle:
  149. preconditionFailure("Invalid state: request received before headers")
  150. case let .requestOpenResponseOpen(_, observer):
  151. observer.whenSuccess {
  152. $0(.message(message))
  153. }
  154. case .requestClosedResponseOpen,
  155. .requestClosedResponseClosed:
  156. preconditionFailure("Invalid state: the request stream has already been closed")
  157. }
  158. }
  159. override internal func observeEnd() {
  160. switch self.state {
  161. case .requestIdleResponseIdle:
  162. preconditionFailure("Invalid state: no request headers received")
  163. case let .requestOpenResponseOpen(context, observer):
  164. self.state = .requestClosedResponseOpen(context)
  165. observer.whenSuccess {
  166. $0(.end)
  167. }
  168. case .requestClosedResponseOpen,
  169. .requestClosedResponseClosed:
  170. preconditionFailure("Invalid state: request stream is already closed")
  171. }
  172. }
  173. // MARK: - Outbound
  174. private func sendResponse(_ message: ResponsePayload) {
  175. switch self.state {
  176. case .requestIdleResponseIdle:
  177. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  178. case let .requestOpenResponseOpen(context, _),
  179. let .requestClosedResponseOpen(context):
  180. self.state = .requestClosedResponseClosed
  181. self.sendResponsePartFromObserver(
  182. .message(message, .init(compress: context.compressionEnabled, flush: false)),
  183. promise: nil
  184. )
  185. self.sendResponsePartFromObserver(
  186. .end(context.responseStatus, context.trailers),
  187. promise: nil
  188. )
  189. case .requestClosedResponseClosed:
  190. // We're already closed. This isn't a precondition failure because we may have encountered
  191. // an error before the observer block completed.
  192. ()
  193. }
  194. }
  195. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  196. switch self.state {
  197. case .requestIdleResponseIdle,
  198. .requestClosedResponseOpen:
  199. self.state = .requestClosedResponseClosed
  200. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  201. case let .requestOpenResponseOpen(context, _):
  202. self.state = .requestClosedResponseClosed
  203. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  204. // Fail the promise.
  205. context.responsePromise.fail(status)
  206. case .requestClosedResponseClosed:
  207. // Already closed, do nothing.
  208. ()
  209. }
  210. }
  211. }