ClientStreamingServerHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHPACK
  18. public final class ClientStreamingServerHandler<
  19. Serializer: MessageSerializer,
  20. Deserializer: MessageDeserializer
  21. >: GRPCServerHandlerProtocol {
  22. public typealias Request = Deserializer.Output
  23. public typealias Response = Serializer.Input
  24. /// A response serializer.
  25. @usableFromInline
  26. internal let serializer: Serializer
  27. /// A request deserializer.
  28. @usableFromInline
  29. internal let deserializer: Deserializer
  30. /// A pipeline of user provided interceptors.
  31. @usableFromInline
  32. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  33. /// Stream events which have arrived before the stream observer future has been resolved.
  34. @usableFromInline
  35. internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer()
  36. /// The context required in order create the function.
  37. @usableFromInline
  38. internal let context: CallHandlerContext
  39. /// A reference to a `UserInfo`.
  40. @usableFromInline
  41. internal let userInfoRef: Ref<UserInfo>
  42. /// The user provided function to execute.
  43. @usableFromInline
  44. internal let handlerFactory: (UnaryResponseCallContext<Response>)
  45. -> EventLoopFuture<(StreamEvent<Request>) -> Void>
  46. /// The state of the handler.
  47. @usableFromInline
  48. internal var state: State = .idle
  49. @usableFromInline
  50. internal enum State {
  51. // Nothing has happened yet.
  52. case idle
  53. // Headers have been received, a context has been created and the user code has been called to
  54. // make an observer with. The observer future hasn't completed yet and, as such, the observer
  55. // is yet to see any events.
  56. case creatingObserver(UnaryResponseCallContext<Response>)
  57. // The observer future has succeeded, messages may have been delivered to it.
  58. case observing((StreamEvent<Request>) -> Void, UnaryResponseCallContext<Response>)
  59. // The observer has completed by completing the status promise.
  60. case completed
  61. }
  62. @inlinable
  63. public init(
  64. context: CallHandlerContext,
  65. requestDeserializer: Deserializer,
  66. responseSerializer: Serializer,
  67. interceptors: [ServerInterceptor<Request, Response>],
  68. observerFactory: @escaping (UnaryResponseCallContext<Response>)
  69. -> EventLoopFuture<(StreamEvent<Request>) -> Void>
  70. ) {
  71. self.serializer = responseSerializer
  72. self.deserializer = requestDeserializer
  73. self.context = context
  74. self.handlerFactory = observerFactory
  75. let userInfoRef = Ref(UserInfo())
  76. self.userInfoRef = userInfoRef
  77. self.interceptors = ServerInterceptorPipeline(
  78. logger: context.logger,
  79. eventLoop: context.eventLoop,
  80. path: context.path,
  81. callType: .clientStreaming,
  82. remoteAddress: context.remoteAddress,
  83. userInfoRef: userInfoRef,
  84. interceptors: interceptors,
  85. onRequestPart: self.receiveInterceptedPart(_:),
  86. onResponsePart: self.sendInterceptedPart(_:promise:)
  87. )
  88. }
  89. // MARK: Public API; gRPC to Handler
  90. @inlinable
  91. public func receiveMetadata(_ headers: HPACKHeaders) {
  92. self.interceptors.receive(.metadata(headers))
  93. }
  94. @inlinable
  95. public func receiveMessage(_ bytes: ByteBuffer) {
  96. do {
  97. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  98. self.interceptors.receive(.message(message))
  99. } catch {
  100. self.handleError(error)
  101. }
  102. }
  103. @inlinable
  104. public func receiveEnd() {
  105. self.interceptors.receive(.end)
  106. }
  107. @inlinable
  108. public func receiveError(_ error: Error) {
  109. self._finish(error: error)
  110. }
  111. @inlinable
  112. public func finish() {
  113. self._finish(error: nil)
  114. }
  115. @inlinable
  116. internal func _finish(error: Error?) {
  117. switch self.state {
  118. case .idle:
  119. self.interceptors = nil
  120. self.state = .completed
  121. case let .creatingObserver(context),
  122. let .observing(_, context):
  123. let error = error ?? GRPCStatus(code: .unavailable, message: nil)
  124. context.responsePromise.fail(error)
  125. case .completed:
  126. self.interceptors = nil
  127. }
  128. }
  129. // MARK: Interceptors to User Function
  130. @inlinable
  131. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  132. switch part {
  133. case let .metadata(headers):
  134. self.receiveInterceptedMetadata(headers)
  135. case let .message(message):
  136. self.receiveInterceptedMessage(message)
  137. case .end:
  138. self.receiveInterceptedEnd()
  139. }
  140. }
  141. @inlinable
  142. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  143. switch self.state {
  144. case .idle:
  145. // Make a context to invoke the observer block factory with.
  146. let context = UnaryResponseCallContext<Response>(
  147. eventLoop: self.context.eventLoop,
  148. headers: headers,
  149. logger: self.context.logger,
  150. userInfoRef: self.userInfoRef
  151. )
  152. // Move to the next state.
  153. self.state = .creatingObserver(context)
  154. // Register a callback on the response future.
  155. context.responsePromise.futureResult.whenComplete(self.userFunctionCompleted(with:))
  156. // Make an observer block and register a completion block.
  157. self.handlerFactory(context).whenComplete(self.userFunctionResolved(_:))
  158. // Send response headers back via the interceptors.
  159. self.interceptors.send(.metadata([:]), promise: nil)
  160. case .creatingObserver, .observing:
  161. self.handleError(GRPCError.InvalidState("Protocol violation: already received headers"))
  162. case .completed:
  163. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  164. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  165. // Dropping them is fine.
  166. ()
  167. }
  168. }
  169. @inlinable
  170. internal func receiveInterceptedMessage(_ request: Request) {
  171. switch self.state {
  172. case .idle:
  173. self
  174. .handleError(GRPCError.InvalidState("Protocol violation: message received before headers"))
  175. case .creatingObserver:
  176. self.requestBuffer.append(.message(request))
  177. case let .observing(observer, _):
  178. observer(.message(request))
  179. case .completed:
  180. // We received a message but we're already done: this may happen if we terminate the RPC
  181. // due to a channel error, for example.
  182. ()
  183. }
  184. }
  185. @inlinable
  186. internal func receiveInterceptedEnd() {
  187. switch self.state {
  188. case .idle:
  189. self.handleError(GRPCError.InvalidState("Protocol violation: 'end received before headers'"))
  190. case .creatingObserver:
  191. self.requestBuffer.append(.end)
  192. case let .observing(observer, _):
  193. observer(.end)
  194. case .completed:
  195. // We received a message but we're already done: this may happen if we terminate the RPC
  196. // due to a channel error, for example.
  197. ()
  198. }
  199. }
  200. // MARK: User Function to Interceptors
  201. @inlinable
  202. internal func userFunctionResolved(_ result: Result<(StreamEvent<Request>) -> Void, Error>) {
  203. switch self.state {
  204. case .idle, .observing:
  205. // The observer block can't resolve if it hasn't been created ('idle') and it can't be
  206. // resolved more than once ('created').
  207. preconditionFailure()
  208. case let .creatingObserver(context):
  209. switch result {
  210. case let .success(observer):
  211. // We have an observer block now; unbuffer any requests.
  212. self.state = .observing(observer, context)
  213. while let request = self.requestBuffer.popFirst() {
  214. observer(request)
  215. }
  216. case let .failure(error):
  217. self.handleError(error)
  218. }
  219. case .completed:
  220. // We've already completed. That's fine.
  221. ()
  222. }
  223. }
  224. @inlinable
  225. internal func userFunctionCompleted(with result: Result<Response, Error>) {
  226. switch self.state {
  227. case .idle:
  228. // Invalid state: the user function can only complete if it exists..
  229. preconditionFailure()
  230. case let .creatingObserver(context),
  231. let .observing(_, context):
  232. self.state = .completed
  233. switch result {
  234. case let .success(response):
  235. let metadata = MessageMetadata(compress: false, flush: false)
  236. self.interceptors.send(.message(response, metadata), promise: nil)
  237. self.interceptors.send(.end(context.responseStatus, context.trailers), promise: nil)
  238. case let .failure(error):
  239. let (status, trailers) = ServerErrorProcessor.processObserverError(
  240. error,
  241. headers: context.headers,
  242. trailers: context.trailers,
  243. delegate: self.context.errorDelegate
  244. )
  245. self.interceptors.send(.end(status, trailers), promise: nil)
  246. }
  247. case .completed:
  248. // We've already completed. Ignore this.
  249. ()
  250. }
  251. }
  252. @inlinable
  253. internal func handleError(_ error: Error) {
  254. switch self.state {
  255. case .idle:
  256. // We don't have a promise to fail. Just send back end.
  257. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  258. error,
  259. delegate: self.context.errorDelegate
  260. )
  261. self.interceptors.send(.end(status, trailers), promise: nil)
  262. case let .creatingObserver(context),
  263. let .observing(_, context):
  264. context.responsePromise.fail(error)
  265. case .completed:
  266. ()
  267. }
  268. }
  269. // MARK: Interceptor Glue
  270. @inlinable
  271. internal func sendInterceptedPart(
  272. _ part: GRPCServerResponsePart<Response>,
  273. promise: EventLoopPromise<Void>?
  274. ) {
  275. switch part {
  276. case let .metadata(headers):
  277. self.context.responseWriter.sendMetadata(headers, promise: promise)
  278. case let .message(message, metadata):
  279. do {
  280. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  281. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  282. } catch {
  283. // Serialization failed: fail the promise and send end.
  284. promise?.fail(error)
  285. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  286. error,
  287. delegate: self.context.errorDelegate
  288. )
  289. // Loop back via the interceptors.
  290. self.interceptors.send(.end(status, trailers), promise: nil)
  291. }
  292. case let .end(status, trailers):
  293. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  294. }
  295. }
  296. }