ClientStreamingServerHandler.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHPACK
  18. public final class ClientStreamingServerHandler<
  19. Serializer: MessageSerializer,
  20. Deserializer: MessageDeserializer
  21. >: GRPCServerHandlerProtocol {
  22. public typealias Request = Deserializer.Output
  23. public typealias Response = Serializer.Input
  24. /// A response serializer.
  25. @usableFromInline
  26. internal let serializer: Serializer
  27. /// A request deserializer.
  28. @usableFromInline
  29. internal let deserializer: Deserializer
  30. /// A pipeline of user provided interceptors.
  31. @usableFromInline
  32. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  33. /// Stream events which have arrived before the stream observer future has been resolved.
  34. @usableFromInline
  35. internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer()
  36. /// The context required in order create the function.
  37. @usableFromInline
  38. internal let context: CallHandlerContext
  39. /// A reference to a ``UserInfo``.
  40. @usableFromInline
  41. internal let userInfoRef: Ref<UserInfo>
  42. /// The user provided function to execute.
  43. @usableFromInline
  44. internal let handlerFactory:
  45. (UnaryResponseCallContext<Response>)
  46. -> EventLoopFuture<(StreamEvent<Request>) -> Void>
  47. /// The state of the handler.
  48. @usableFromInline
  49. internal var state: State = .idle
  50. @usableFromInline
  51. internal enum State {
  52. // Nothing has happened yet.
  53. case idle
  54. // Headers have been received, a context has been created and the user code has been called to
  55. // make an observer with. The observer future hasn't completed yet and, as such, the observer
  56. // is yet to see any events.
  57. case creatingObserver(UnaryResponseCallContext<Response>)
  58. // The observer future has succeeded, messages may have been delivered to it.
  59. case observing((StreamEvent<Request>) -> Void, UnaryResponseCallContext<Response>)
  60. // The observer has completed by completing the status promise.
  61. case completed
  62. }
  63. @inlinable
  64. public init(
  65. context: CallHandlerContext,
  66. requestDeserializer: Deserializer,
  67. responseSerializer: Serializer,
  68. interceptors: [ServerInterceptor<Request, Response>],
  69. observerFactory: @escaping (UnaryResponseCallContext<Response>)
  70. -> EventLoopFuture<(StreamEvent<Request>) -> Void>
  71. ) {
  72. self.serializer = responseSerializer
  73. self.deserializer = requestDeserializer
  74. self.context = context
  75. self.handlerFactory = observerFactory
  76. let userInfoRef = Ref(UserInfo())
  77. self.userInfoRef = userInfoRef
  78. self.interceptors = ServerInterceptorPipeline(
  79. logger: context.logger,
  80. eventLoop: context.eventLoop,
  81. path: context.path,
  82. callType: .clientStreaming,
  83. remoteAddress: context.remoteAddress,
  84. userInfoRef: userInfoRef,
  85. closeFuture: context.closeFuture,
  86. interceptors: interceptors,
  87. onRequestPart: self.receiveInterceptedPart(_:),
  88. onResponsePart: self.sendInterceptedPart(_:promise:)
  89. )
  90. }
  91. // MARK: Public API; gRPC to Handler
  92. @inlinable
  93. public func receiveMetadata(_ headers: HPACKHeaders) {
  94. self.interceptors.receive(.metadata(headers))
  95. }
  96. @inlinable
  97. public func receiveMessage(_ bytes: ByteBuffer) {
  98. do {
  99. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  100. self.interceptors.receive(.message(message))
  101. } catch {
  102. self.handleError(error)
  103. }
  104. }
  105. @inlinable
  106. public func receiveEnd() {
  107. self.interceptors.receive(.end)
  108. }
  109. @inlinable
  110. public func receiveError(_ error: Error) {
  111. self.handleError(error)
  112. self.finish()
  113. }
  114. @inlinable
  115. public func finish() {
  116. switch self.state {
  117. case .idle:
  118. self.interceptors = nil
  119. self.state = .completed
  120. case let .creatingObserver(context),
  121. let .observing(_, context):
  122. context.responsePromise.fail(GRPCStatus(code: .unavailable, message: nil))
  123. self.context.eventLoop.execute {
  124. self.interceptors = nil
  125. }
  126. case .completed:
  127. self.interceptors = nil
  128. }
  129. }
  130. // MARK: Interceptors to User Function
  131. @inlinable
  132. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  133. switch part {
  134. case let .metadata(headers):
  135. self.receiveInterceptedMetadata(headers)
  136. case let .message(message):
  137. self.receiveInterceptedMessage(message)
  138. case .end:
  139. self.receiveInterceptedEnd()
  140. }
  141. }
  142. @inlinable
  143. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  144. switch self.state {
  145. case .idle:
  146. // Make a context to invoke the observer block factory with.
  147. let context = UnaryResponseCallContext<Response>(
  148. eventLoop: self.context.eventLoop,
  149. headers: headers,
  150. logger: self.context.logger,
  151. userInfoRef: self.userInfoRef,
  152. closeFuture: self.context.closeFuture
  153. )
  154. // Move to the next state.
  155. self.state = .creatingObserver(context)
  156. // Register a callback on the response future.
  157. context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
  158. // Make an observer block and register a completion block.
  159. self.handlerFactory(context).whenComplete(self.userFunctionResolved(_:))
  160. // Send response headers back via the interceptors.
  161. self.interceptors.send(.metadata([:]), promise: nil)
  162. case .creatingObserver, .observing:
  163. self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received"))
  164. case .completed:
  165. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  166. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  167. // Dropping them is fine.
  168. ()
  169. }
  170. }
  171. @inlinable
  172. internal func receiveInterceptedMessage(_ request: Request) {
  173. switch self.state {
  174. case .idle:
  175. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  176. case .creatingObserver:
  177. self.requestBuffer.append(.message(request))
  178. case let .observing(observer, _):
  179. observer(.message(request))
  180. case .completed:
  181. // We received a message but we're already done: this may happen if we terminate the RPC
  182. // due to a channel error, for example.
  183. ()
  184. }
  185. }
  186. @inlinable
  187. internal func receiveInterceptedEnd() {
  188. switch self.state {
  189. case .idle:
  190. self.handleError(GRPCError.ProtocolViolation("end received before headers"))
  191. case .creatingObserver:
  192. self.requestBuffer.append(.end)
  193. case let .observing(observer, _):
  194. observer(.end)
  195. case .completed:
  196. // We received a message but we're already done: this may happen if we terminate the RPC
  197. // due to a channel error, for example.
  198. ()
  199. }
  200. }
  201. // MARK: User Function to Interceptors
  202. @inlinable
  203. internal func userFunctionResolved(_ result: Result<(StreamEvent<Request>) -> Void, Error>) {
  204. switch self.state {
  205. case .idle, .observing:
  206. // The observer block can't resolve if it hasn't been created ('idle') and it can't be
  207. // resolved more than once ('created').
  208. preconditionFailure()
  209. case let .creatingObserver(context):
  210. switch result {
  211. case let .success(observer):
  212. // We have an observer block now; unbuffer any requests.
  213. self.state = .observing(observer, context)
  214. while let request = self.requestBuffer.popFirst() {
  215. observer(request)
  216. }
  217. case let .failure(error):
  218. self.handleError(error, thrownFromHandler: true)
  219. }
  220. case .completed:
  221. // We've already completed. That's fine.
  222. ()
  223. }
  224. }
  225. @inlinable
  226. internal func userFunctionCompletedWithResult(_ result: Result<Response, Error>) {
  227. switch self.state {
  228. case .idle:
  229. // Invalid state: the user function can only complete if it exists..
  230. preconditionFailure()
  231. case let .creatingObserver(context),
  232. let .observing(_, context):
  233. switch result {
  234. case let .success(response):
  235. // Complete when we send end.
  236. self.state = .completed
  237. // Compression depends on whether it's enabled on the server and the setting in the caller
  238. // context.
  239. let compress = self.context.encoding.isEnabled && context.compressionEnabled
  240. let metadata = MessageMetadata(compress: compress, flush: false)
  241. self.interceptors.send(.message(response, metadata), promise: nil)
  242. self.interceptors.send(.end(context.responseStatus, context.trailers), promise: nil)
  243. case let .failure(error):
  244. self.handleError(error, thrownFromHandler: true)
  245. }
  246. case .completed:
  247. // We've already completed. Ignore this.
  248. ()
  249. }
  250. }
  251. @inlinable
  252. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  253. switch self.state {
  254. case .idle:
  255. assert(!isHandlerError)
  256. self.state = .completed
  257. // We don't have a promise to fail. Just send back end.
  258. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  259. error,
  260. delegate: self.context.errorDelegate
  261. )
  262. self.interceptors.send(.end(status, trailers), promise: nil)
  263. case let .creatingObserver(context),
  264. let .observing(_, context):
  265. // We don't have a promise to fail. Just send back end.
  266. self.state = .completed
  267. let status: GRPCStatus
  268. let trailers: HPACKHeaders
  269. if isHandlerError {
  270. (status, trailers) = ServerErrorProcessor.processObserverError(
  271. error,
  272. headers: context.headers,
  273. trailers: context.trailers,
  274. delegate: self.context.errorDelegate
  275. )
  276. } else {
  277. (status, trailers) = ServerErrorProcessor.processLibraryError(
  278. error,
  279. delegate: self.context.errorDelegate
  280. )
  281. }
  282. self.interceptors.send(.end(status, trailers), promise: nil)
  283. // We're already in the 'completed' state so failing the promise will be a no-op in the
  284. // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the
  285. // promise.)
  286. context.responsePromise.fail(error)
  287. case .completed:
  288. ()
  289. }
  290. }
  291. // MARK: Interceptor Glue
  292. @inlinable
  293. internal func sendInterceptedPart(
  294. _ part: GRPCServerResponsePart<Response>,
  295. promise: EventLoopPromise<Void>?
  296. ) {
  297. switch part {
  298. case let .metadata(headers):
  299. self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
  300. case let .message(message, metadata):
  301. do {
  302. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  303. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  304. } catch {
  305. // Serialization failed: fail the promise and send end.
  306. promise?.fail(error)
  307. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  308. error,
  309. delegate: self.context.errorDelegate
  310. )
  311. // Loop back via the interceptors.
  312. self.interceptors.send(.end(status, trailers), promise: nil)
  313. }
  314. case let .end(status, trailers):
  315. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  316. }
  317. }
  318. }