ServerStreamingCallHandler.swift 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles server-streaming calls. Calls the observer block with the request message.
  23. ///
  24. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  25. /// - To close the call and send the status, complete the status future returned by the observer block.
  26. public final class ServerStreamingCallHandler<
  27. RequestDeserializer: MessageDeserializer,
  28. ResponseSerializer: MessageSerializer
  29. >: _BaseCallHandler<RequestDeserializer, ResponseSerializer> {
  30. @usableFromInline
  31. internal typealias _Context = StreamingResponseCallContext<ResponsePayload>
  32. @usableFromInline
  33. internal typealias _Observer = (RequestPayload) -> EventLoopFuture<GRPCStatus>
  34. @usableFromInline
  35. internal var _callHandlerState: _CallHandlerState
  36. // See 'UnaryCallHandler.State'.
  37. @usableFromInline
  38. internal enum _CallHandlerState {
  39. case requestIdleResponseIdle((_Context) -> _Observer)
  40. case requestOpenResponseOpen(_Context, ObserverState)
  41. case requestClosedResponseOpen(_Context)
  42. case requestClosedResponseClosed
  43. @usableFromInline
  44. enum ObserverState {
  45. case notObserved(_Observer)
  46. case observed
  47. }
  48. }
  49. @inlinable
  50. internal init(
  51. serializer: ResponseSerializer,
  52. deserializer: RequestDeserializer,
  53. callHandlerContext: CallHandlerContext,
  54. interceptors: [ServerInterceptor<RequestDeserializer.Output, ResponseSerializer.Input>],
  55. eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>)
  56. -> (RequestPayload) -> EventLoopFuture<GRPCStatus>
  57. ) {
  58. self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory)
  59. super.init(
  60. callHandlerContext: callHandlerContext,
  61. requestDeserializer: deserializer,
  62. responseSerializer: serializer,
  63. callType: .serverStreaming,
  64. interceptors: interceptors
  65. )
  66. }
  67. override public func channelInactive(context: ChannelHandlerContext) {
  68. super.channelInactive(context: context)
  69. // Fail any remaining promise.
  70. switch self._callHandlerState {
  71. case .requestIdleResponseIdle,
  72. .requestClosedResponseClosed:
  73. self._callHandlerState = .requestClosedResponseClosed
  74. case let .requestOpenResponseOpen(context, _),
  75. let .requestClosedResponseOpen(context):
  76. self._callHandlerState = .requestClosedResponseClosed
  77. context.statusPromise.fail(GRPCError.AlreadyComplete())
  78. }
  79. }
  80. /// Handle an error from the event observer.
  81. private func handleObserverError(_ error: Error) {
  82. switch self._callHandlerState {
  83. case .requestIdleResponseIdle:
  84. preconditionFailure("Invalid state: request observer hasn't been created")
  85. case .requestOpenResponseOpen(_, .notObserved):
  86. preconditionFailure("Invalid state: request observer hasn't been invoked")
  87. case let .requestOpenResponseOpen(context, .observed),
  88. let .requestClosedResponseOpen(context):
  89. let (status, trailers) = self.processObserverError(
  90. error,
  91. headers: context.headers,
  92. trailers: context.trailers
  93. )
  94. // This will handle the response promise as well.
  95. self.sendEnd(status: status, trailers: trailers)
  96. case .requestClosedResponseClosed:
  97. // We hit an error, but we're already closed (because we hit a library error first).
  98. ()
  99. }
  100. }
  101. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  102. private func handleLibraryError(_ error: Error) {
  103. switch self._callHandlerState {
  104. case .requestIdleResponseIdle,
  105. .requestOpenResponseOpen(_, .notObserved):
  106. // We'll never see a request message: send end.
  107. let (status, trailers) = self.processLibraryError(error)
  108. self.sendEnd(status: status, trailers: trailers)
  109. case .requestOpenResponseOpen(_, .observed),
  110. .requestClosedResponseOpen:
  111. // We've invoked the observer, we expect a response. We'll let this play out.
  112. ()
  113. case .requestClosedResponseClosed:
  114. // We're already closed, no need to do anything here.
  115. ()
  116. }
  117. }
  118. // MARK: - Inbound
  119. override func observeLibraryError(_ error: Error) {
  120. self.handleLibraryError(error)
  121. }
  122. override internal func observeHeaders(_ headers: HPACKHeaders) {
  123. switch self._callHandlerState {
  124. case let .requestIdleResponseIdle(factory):
  125. let context = _StreamingResponseCallContext<RequestPayload, ResponsePayload>(
  126. eventLoop: self.eventLoop,
  127. headers: headers,
  128. logger: self.logger,
  129. userInfoRef: self._userInfoRef,
  130. sendResponse: self.sendResponse(_:metadata:promise:)
  131. )
  132. let observer = factory(context)
  133. // Fully open. We'll send the response headers back in a moment.
  134. self._callHandlerState = .requestOpenResponseOpen(context, .notObserved(observer))
  135. // Register callbacks for the status promise.
  136. context.statusPromise.futureResult.whenComplete { result in
  137. switch result {
  138. case let .success(status):
  139. self.sendEnd(status: status, trailers: context.trailers)
  140. case let .failure(error):
  141. self.handleObserverError(error)
  142. }
  143. }
  144. // Write back the response headers.
  145. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  146. // The main state machine guards against this.
  147. case .requestOpenResponseOpen,
  148. .requestClosedResponseOpen,
  149. .requestClosedResponseClosed:
  150. preconditionFailure("Invalid state")
  151. }
  152. }
  153. override internal func observeRequest(_ message: RequestPayload) {
  154. switch self._callHandlerState {
  155. case .requestIdleResponseIdle:
  156. preconditionFailure("Invalid state: request received before headers")
  157. case let .requestOpenResponseOpen(context, request):
  158. switch request {
  159. case .observed:
  160. // We've already observed the request message. The main state machine doesn't guard against
  161. // too many messages for unary streams. Assuming downstream handlers protect against this
  162. // then this must be an errant interceptor.
  163. ()
  164. case let .notObserved(observer):
  165. self._callHandlerState = .requestOpenResponseOpen(context, .observed)
  166. // Complete the status promise with the observer block.
  167. context.statusPromise.completeWith(observer(message))
  168. }
  169. case .requestClosedResponseOpen,
  170. .requestClosedResponseClosed:
  171. preconditionFailure("Invalid state: the request stream has already been closed")
  172. }
  173. }
  174. override internal func observeEnd() {
  175. switch self._callHandlerState {
  176. case .requestIdleResponseIdle:
  177. preconditionFailure("Invalid state: no request headers received")
  178. case let .requestOpenResponseOpen(context, request):
  179. switch request {
  180. case .observed:
  181. // Close the request stream.
  182. self._callHandlerState = .requestClosedResponseOpen(context)
  183. case .notObserved:
  184. // We haven't received a request: this is an empty stream, the observer will never be
  185. // invoked. Fail the response promise (which will have no side effect).
  186. context.statusPromise.fail(GRPCError.StreamCardinalityViolation.request)
  187. }
  188. case .requestClosedResponseOpen,
  189. .requestClosedResponseClosed:
  190. preconditionFailure("Invalid state: request stream is already closed")
  191. }
  192. }
  193. // MARK: - Outbound
  194. private func sendResponse(
  195. _ message: ResponsePayload,
  196. metadata: MessageMetadata,
  197. promise: EventLoopPromise<Void>?
  198. ) {
  199. switch self._callHandlerState {
  200. case .requestIdleResponseIdle:
  201. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  202. case .requestOpenResponseOpen(_, .notObserved):
  203. preconditionFailure("Invalid state: can't send response before receiving request")
  204. case .requestOpenResponseOpen(_, .observed),
  205. .requestClosedResponseOpen:
  206. self.sendResponsePartFromObserver(.message(message, metadata), promise: promise)
  207. case .requestClosedResponseClosed:
  208. // We're already closed. This isn't a precondition failure because we may have encountered
  209. // an error before the observer block completed.
  210. promise?.fail(GRPCError.AlreadyComplete())
  211. }
  212. }
  213. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  214. switch self._callHandlerState {
  215. case .requestIdleResponseIdle,
  216. .requestClosedResponseOpen:
  217. self._callHandlerState = .requestClosedResponseClosed
  218. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  219. case let .requestOpenResponseOpen(context, _):
  220. self._callHandlerState = .requestClosedResponseClosed
  221. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  222. // Fail the promise.
  223. context.statusPromise.fail(status)
  224. case .requestClosedResponseClosed:
  225. // Already closed, do nothing.
  226. ()
  227. }
  228. }
  229. }