ServerStreamingCallHandler.swift 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles server-streaming calls. Calls the observer block with the request message.
  23. ///
  24. /// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
  25. /// - To close the call and send the status, complete the status future returned by the observer block.
  26. public final class ServerStreamingCallHandler<
  27. RequestDeserializer: MessageDeserializer,
  28. ResponseSerializer: MessageSerializer
  29. >: _BaseCallHandler<RequestDeserializer, ResponseSerializer> {
  30. private typealias Context = StreamingResponseCallContext<ResponsePayload>
  31. private typealias Observer = (RequestPayload) -> EventLoopFuture<GRPCStatus>
  32. private var state: State
  33. // See 'UnaryCallHandler.State'.
  34. private enum State {
  35. case requestIdleResponseIdle((Context) -> Observer)
  36. case requestOpenResponseOpen(Context, ObserverState)
  37. case requestClosedResponseOpen(Context)
  38. case requestClosedResponseClosed
  39. enum ObserverState {
  40. case notObserved(Observer)
  41. case observed
  42. }
  43. }
  44. internal init(
  45. serializer: ResponseSerializer,
  46. deserializer: RequestDeserializer,
  47. callHandlerContext: CallHandlerContext,
  48. interceptors: [ServerInterceptor<RequestDeserializer.Output, ResponseSerializer.Input>],
  49. eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>)
  50. -> (RequestPayload) -> EventLoopFuture<GRPCStatus>
  51. ) {
  52. self.state = .requestIdleResponseIdle(eventObserverFactory)
  53. super.init(
  54. callHandlerContext: callHandlerContext,
  55. requestDeserializr: deserializer,
  56. responseSerializer: serializer,
  57. callType: .serverStreaming,
  58. interceptors: interceptors
  59. )
  60. }
  61. override public func channelInactive(context: ChannelHandlerContext) {
  62. super.channelInactive(context: context)
  63. // Fail any remaining promise.
  64. switch self.state {
  65. case .requestIdleResponseIdle,
  66. .requestClosedResponseClosed:
  67. self.state = .requestClosedResponseClosed
  68. case let .requestOpenResponseOpen(context, _),
  69. let .requestClosedResponseOpen(context):
  70. self.state = .requestClosedResponseClosed
  71. context.statusPromise.fail(GRPCError.AlreadyComplete())
  72. }
  73. }
  74. /// Handle an error from the event observer.
  75. private func handleObserverError(_ error: Error) {
  76. switch self.state {
  77. case .requestIdleResponseIdle:
  78. preconditionFailure("Invalid state: request observer hasn't been created")
  79. case .requestOpenResponseOpen(_, .notObserved):
  80. preconditionFailure("Invalid state: request observer hasn't been invoked")
  81. case let .requestOpenResponseOpen(context, .observed),
  82. let .requestClosedResponseOpen(context):
  83. let (status, trailers) = self.processObserverError(
  84. error,
  85. headers: context.headers,
  86. trailers: context.trailers
  87. )
  88. // This will handle the response promise as well.
  89. self.sendEnd(status: status, trailers: trailers)
  90. case .requestClosedResponseClosed:
  91. // We hit an error, but we're already closed (because we hit a library error first).
  92. ()
  93. }
  94. }
  95. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  96. private func handleLibraryError(_ error: Error) {
  97. switch self.state {
  98. case .requestIdleResponseIdle,
  99. .requestOpenResponseOpen(_, .notObserved):
  100. // We'll never see a request message: send end.
  101. let (status, trailers) = self.processLibraryError(error)
  102. self.sendEnd(status: status, trailers: trailers)
  103. case .requestOpenResponseOpen(_, .observed),
  104. .requestClosedResponseOpen:
  105. // We've invoked the observer, we expect a response. We'll let this play out.
  106. ()
  107. case .requestClosedResponseClosed:
  108. // We're already closed, no need to do anything here.
  109. ()
  110. }
  111. }
  112. // MARK: - Inbound
  113. override func observeLibraryError(_ error: Error) {
  114. self.handleLibraryError(error)
  115. }
  116. override internal func observeHeaders(_ headers: HPACKHeaders) {
  117. switch self.state {
  118. case let .requestIdleResponseIdle(factory):
  119. let context = _StreamingResponseCallContext<RequestPayload, ResponsePayload>(
  120. eventLoop: self.eventLoop,
  121. headers: headers,
  122. logger: self.logger,
  123. userInfoRef: self.userInfoRef,
  124. sendResponse: self.sendResponse(_:metadata:promise:)
  125. )
  126. let observer = factory(context)
  127. // Fully open. We'll send the response headers back in a moment.
  128. self.state = .requestOpenResponseOpen(context, .notObserved(observer))
  129. // Register callbacks for the status promise.
  130. context.statusPromise.futureResult.whenComplete { result in
  131. switch result {
  132. case let .success(status):
  133. self.sendEnd(status: status, trailers: context.trailers)
  134. case let .failure(error):
  135. self.handleObserverError(error)
  136. }
  137. }
  138. // Write back the response headers.
  139. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  140. // The main state machine guards against this.
  141. case .requestOpenResponseOpen,
  142. .requestClosedResponseOpen,
  143. .requestClosedResponseClosed:
  144. preconditionFailure("Invalid state")
  145. }
  146. }
  147. override internal func observeRequest(_ message: RequestPayload) {
  148. switch self.state {
  149. case .requestIdleResponseIdle:
  150. preconditionFailure("Invalid state: request received before headers")
  151. case let .requestOpenResponseOpen(context, request):
  152. switch request {
  153. case .observed:
  154. // We've already observed the request message. The main state machine doesn't guard against
  155. // too many messages for unary streams. Assuming downstream handlers protect against this
  156. // then this must be an errant interceptor.
  157. ()
  158. case let .notObserved(observer):
  159. self.state = .requestOpenResponseOpen(context, .observed)
  160. // Complete the status promise with the observer block.
  161. context.statusPromise.completeWith(observer(message))
  162. }
  163. case .requestClosedResponseOpen,
  164. .requestClosedResponseClosed:
  165. preconditionFailure("Invalid state: the request stream has already been closed")
  166. }
  167. }
  168. override internal func observeEnd() {
  169. switch self.state {
  170. case .requestIdleResponseIdle:
  171. preconditionFailure("Invalid state: no request headers received")
  172. case let .requestOpenResponseOpen(context, request):
  173. switch request {
  174. case .observed:
  175. // Close the request stream.
  176. self.state = .requestClosedResponseOpen(context)
  177. case .notObserved:
  178. // We haven't received a request: this is an empty stream, the observer will never be
  179. // invoked. Fail the response promise (which will have no side effect).
  180. context.statusPromise.fail(GRPCError.StreamCardinalityViolation.request)
  181. }
  182. case .requestClosedResponseOpen,
  183. .requestClosedResponseClosed:
  184. preconditionFailure("Invalid state: request stream is already closed")
  185. }
  186. }
  187. // MARK: - Outbound
  188. private func sendResponse(
  189. _ message: ResponsePayload,
  190. metadata: MessageMetadata,
  191. promise: EventLoopPromise<Void>?
  192. ) {
  193. switch self.state {
  194. case .requestIdleResponseIdle:
  195. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  196. case .requestOpenResponseOpen(_, .notObserved):
  197. preconditionFailure("Invalid state: can't send response before receiving request")
  198. case .requestOpenResponseOpen(_, .observed),
  199. .requestClosedResponseOpen:
  200. self.sendResponsePartFromObserver(.message(message, metadata), promise: promise)
  201. case .requestClosedResponseClosed:
  202. // We're already closed. This isn't a precondition failure because we may have encountered
  203. // an error before the observer block completed.
  204. promise?.fail(GRPCError.AlreadyComplete())
  205. }
  206. }
  207. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  208. switch self.state {
  209. case .requestIdleResponseIdle,
  210. .requestClosedResponseOpen:
  211. self.state = .requestClosedResponseClosed
  212. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  213. case let .requestOpenResponseOpen(context, _):
  214. self.state = .requestClosedResponseClosed
  215. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  216. // Fail the promise.
  217. context.statusPromise.fail(status)
  218. case .requestClosedResponseClosed:
  219. // Already closed, do nothing.
  220. ()
  221. }
  222. }
  223. }