ServerStreamingServerHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHPACK
  18. public final class ServerStreamingServerHandler<
  19. Serializer: MessageSerializer,
  20. Deserializer: MessageDeserializer
  21. >: GRPCServerHandlerProtocol {
  22. public typealias Request = Deserializer.Output
  23. public typealias Response = Serializer.Input
  24. /// A response serializer.
  25. @usableFromInline
  26. internal let serializer: Serializer
  27. /// A request deserializer.
  28. @usableFromInline
  29. internal let deserializer: Deserializer
  30. /// A pipeline of user provided interceptors.
  31. @usableFromInline
  32. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  33. /// The context required in order create the function.
  34. @usableFromInline
  35. internal let context: CallHandlerContext
  36. /// A reference to a `UserInfo`.
  37. @usableFromInline
  38. internal let userInfoRef: Ref<UserInfo>
  39. /// The user provided function to execute.
  40. @usableFromInline
  41. internal let userFunction:
  42. (Request, StreamingResponseCallContext<Response>)
  43. -> EventLoopFuture<GRPCStatus>
  44. /// The state of the handler.
  45. @usableFromInline
  46. internal var state: State = .idle
  47. @usableFromInline
  48. internal enum State {
  49. // Initial state. Nothing has happened yet.
  50. case idle
  51. // Headers have been received and now we're holding a context with which to invoke the user
  52. // function when we receive a message.
  53. case createdContext(_StreamingResponseCallContext<Request, Response>)
  54. // The user function has been invoked, we're waiting for the status promise to be completed.
  55. case invokedFunction(_StreamingResponseCallContext<Request, Response>)
  56. // The function has completed or we are no longer proceeding with execution (because of an error
  57. // or unexpected closure).
  58. case completed
  59. }
  60. @inlinable
  61. public init(
  62. context: CallHandlerContext,
  63. requestDeserializer: Deserializer,
  64. responseSerializer: Serializer,
  65. interceptors: [ServerInterceptor<Request, Response>],
  66. userFunction: @escaping (Request, StreamingResponseCallContext<Response>)
  67. -> EventLoopFuture<GRPCStatus>
  68. ) {
  69. self.serializer = responseSerializer
  70. self.deserializer = requestDeserializer
  71. self.context = context
  72. self.userFunction = userFunction
  73. let userInfoRef = Ref(UserInfo())
  74. self.userInfoRef = userInfoRef
  75. self.interceptors = ServerInterceptorPipeline(
  76. logger: context.logger,
  77. eventLoop: context.eventLoop,
  78. path: context.path,
  79. callType: .serverStreaming,
  80. remoteAddress: context.remoteAddress,
  81. userInfoRef: userInfoRef,
  82. closeFuture: context.closeFuture,
  83. interceptors: interceptors,
  84. onRequestPart: self.receiveInterceptedPart(_:),
  85. onResponsePart: self.sendInterceptedPart(_:promise:)
  86. )
  87. }
  88. // MARK: Public API; gRPC to Handler
  89. @inlinable
  90. public func receiveMetadata(_ headers: HPACKHeaders) {
  91. self.interceptors.receive(.metadata(headers))
  92. }
  93. @inlinable
  94. public func receiveMessage(_ bytes: ByteBuffer) {
  95. do {
  96. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  97. self.interceptors.receive(.message(message))
  98. } catch {
  99. self.handleError(error)
  100. }
  101. }
  102. @inlinable
  103. public func receiveEnd() {
  104. self.interceptors.receive(.end)
  105. }
  106. @inlinable
  107. public func receiveError(_ error: Error) {
  108. self.handleError(error)
  109. self.finish()
  110. }
  111. @inlinable
  112. public func finish() {
  113. switch self.state {
  114. case .idle:
  115. self.interceptors = nil
  116. self.state = .completed
  117. case let .createdContext(context),
  118. let .invokedFunction(context):
  119. context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
  120. self.context.eventLoop.execute {
  121. self.interceptors = nil
  122. }
  123. case .completed:
  124. self.interceptors = nil
  125. }
  126. }
  127. // MARK: - Interceptors to User Function
  128. @inlinable
  129. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  130. switch part {
  131. case let .metadata(headers):
  132. self.receiveInterceptedMetadata(headers)
  133. case let .message(message):
  134. self.receiveInterceptedMessage(message)
  135. case .end:
  136. self.receiveInterceptedEnd()
  137. }
  138. }
  139. @inlinable
  140. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  141. switch self.state {
  142. case .idle:
  143. // Make a context to invoke the observer block factory with.
  144. let context = _StreamingResponseCallContext<Request, Response>(
  145. eventLoop: self.context.eventLoop,
  146. headers: headers,
  147. logger: self.context.logger,
  148. userInfoRef: self.userInfoRef,
  149. compressionIsEnabled: self.context.encoding.isEnabled,
  150. closeFuture: self.context.closeFuture,
  151. sendResponse: self.interceptResponse(_:metadata:promise:)
  152. )
  153. // Move to the next state.
  154. self.state = .createdContext(context)
  155. // Register a callback on the status future.
  156. context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
  157. // Send response headers back via the interceptors.
  158. self.interceptors.send(.metadata([:]), promise: nil)
  159. case .createdContext, .invokedFunction:
  160. self.handleError(GRPCError.InvalidState("Protocol violation: already received headers"))
  161. case .completed:
  162. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  163. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  164. // Dropping them is fine.
  165. ()
  166. }
  167. }
  168. @inlinable
  169. internal func receiveInterceptedMessage(_ request: Request) {
  170. switch self.state {
  171. case .idle:
  172. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  173. case let .createdContext(context):
  174. self.state = .invokedFunction(context)
  175. // Complete the status promise with the function outcome.
  176. context.statusPromise.completeWith(self.userFunction(request, context))
  177. case .invokedFunction:
  178. let error = GRPCError.ProtocolViolation("Multiple messages received on server streaming RPC")
  179. self.handleError(error)
  180. case .completed:
  181. // We received a message but we're already done: this may happen if we terminate the RPC
  182. // due to a channel error, for example.
  183. ()
  184. }
  185. }
  186. @inlinable
  187. internal func receiveInterceptedEnd() {
  188. switch self.state {
  189. case .idle:
  190. self.handleError(GRPCError.ProtocolViolation("End received before headers"))
  191. case .createdContext:
  192. self.handleError(GRPCError.ProtocolViolation("End received before message"))
  193. case .invokedFunction, .completed:
  194. ()
  195. }
  196. }
  197. // MARK: - User Function To Interceptors
  198. @inlinable
  199. internal func interceptResponse(
  200. _ response: Response,
  201. metadata: MessageMetadata,
  202. promise: EventLoopPromise<Void>?
  203. ) {
  204. switch self.state {
  205. case .idle:
  206. // The observer block can't send responses if it doesn't exist.
  207. preconditionFailure()
  208. case .createdContext, .invokedFunction:
  209. // The user has access to the response context before returning a future observer,
  210. // so 'createdContext' is valid here (if a little strange).
  211. self.interceptors.send(.message(response, metadata), promise: promise)
  212. case .completed:
  213. promise?.fail(GRPCError.AlreadyComplete())
  214. }
  215. }
  216. @inlinable
  217. internal func userFunctionCompletedWithResult(_ result: Result<GRPCStatus, Error>) {
  218. switch self.state {
  219. case .idle:
  220. // Invalid state: the user function can only completed if it was created.
  221. preconditionFailure()
  222. case let .createdContext(context),
  223. let .invokedFunction(context):
  224. switch result {
  225. case let .success(status):
  226. // We're sending end back, we're done.
  227. self.state = .completed
  228. self.interceptors.send(.end(status, context.trailers), promise: nil)
  229. case let .failure(error):
  230. self.handleError(error, thrownFromHandler: true)
  231. }
  232. case .completed:
  233. // We've already completed. Ignore this.
  234. ()
  235. }
  236. }
  237. @inlinable
  238. internal func sendInterceptedPart(
  239. _ part: GRPCServerResponsePart<Response>,
  240. promise: EventLoopPromise<Void>?
  241. ) {
  242. switch part {
  243. case let .metadata(headers):
  244. self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
  245. case let .message(message, metadata):
  246. do {
  247. let bytes = try self.serializer.serialize(message, allocator: self.context.allocator)
  248. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  249. } catch {
  250. // Serialization failed: fail the promise and send end.
  251. promise?.fail(error)
  252. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  253. error,
  254. delegate: self.context.errorDelegate
  255. )
  256. // Loop back via the interceptors.
  257. self.interceptors.send(.end(status, trailers), promise: nil)
  258. }
  259. case let .end(status, trailers):
  260. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  261. }
  262. }
  263. @inlinable
  264. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  265. switch self.state {
  266. case .idle:
  267. assert(!isHandlerError)
  268. self.state = .completed
  269. // We don't have a promise to fail. Just send back end.
  270. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  271. error,
  272. delegate: self.context.errorDelegate
  273. )
  274. self.interceptors.send(.end(status, trailers), promise: nil)
  275. case let .createdContext(context),
  276. let .invokedFunction(context):
  277. // We don't have a promise to fail. Just send back end.
  278. self.state = .completed
  279. let status: GRPCStatus
  280. let trailers: HPACKHeaders
  281. if isHandlerError {
  282. (status, trailers) = ServerErrorProcessor.processObserverError(
  283. error,
  284. headers: context.headers,
  285. trailers: context.trailers,
  286. delegate: self.context.errorDelegate
  287. )
  288. } else {
  289. (status, trailers) = ServerErrorProcessor.processLibraryError(
  290. error,
  291. delegate: self.context.errorDelegate
  292. )
  293. }
  294. self.interceptors.send(.end(status, trailers), promise: nil)
  295. // We're already in the 'completed' state so failing the promise will be a no-op in the
  296. // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the
  297. // promise.)
  298. context.statusPromise.fail(error)
  299. case .completed:
  300. ()
  301. }
  302. }
  303. }