BidirectionalStreamingServerHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHPACK
  18. public final class BidirectionalStreamingServerHandler<
  19. Serializer: MessageSerializer,
  20. Deserializer: MessageDeserializer
  21. >: GRPCServerHandlerProtocol {
  22. public typealias Request = Deserializer.Output
  23. public typealias Response = Serializer.Input
  24. /// A response serializer.
  25. @usableFromInline
  26. internal let serializer: Serializer
  27. /// A request deserializer.
  28. @usableFromInline
  29. internal let deserializer: Deserializer
  30. /// A pipeline of user provided interceptors.
  31. @usableFromInline
  32. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  33. /// Stream events which have arrived before the stream observer future has been resolved.
  34. @usableFromInline
  35. internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer()
  36. /// The context required in order create the function.
  37. @usableFromInline
  38. internal let context: CallHandlerContext
  39. /// A reference to a `UserInfo`.
  40. @usableFromInline
  41. internal let userInfoRef: Ref<UserInfo>
  42. /// The user provided function to execute.
  43. @usableFromInline
  44. internal let observerFactory: (_StreamingResponseCallContext<Request, Response>)
  45. -> EventLoopFuture<(StreamEvent<Request>) -> Void>
  46. /// The state of the handler.
  47. @usableFromInline
  48. internal var state: State = .idle
  49. @usableFromInline
  50. internal enum State {
  51. // No headers have been received.
  52. case idle
  53. // Headers have been received, a context has been created and the user code has been called to
  54. // make a stream observer with. The observer is yet to see any messages.
  55. case creatingObserver(_StreamingResponseCallContext<Request, Response>)
  56. // The observer future has resolved and the observer may have seen messages.
  57. case observing((StreamEvent<Request>) -> Void, _StreamingResponseCallContext<Request, Response>)
  58. // The observer has completed by completing the status promise.
  59. case completed
  60. }
  61. @inlinable
  62. public init(
  63. context: CallHandlerContext,
  64. requestDeserializer: Deserializer,
  65. responseSerializer: Serializer,
  66. interceptors: [ServerInterceptor<Request, Response>],
  67. observerFactory: @escaping (StreamingResponseCallContext<Response>)
  68. -> EventLoopFuture<(StreamEvent<Request>) -> Void>
  69. ) {
  70. self.serializer = responseSerializer
  71. self.deserializer = requestDeserializer
  72. self.context = context
  73. self.observerFactory = observerFactory
  74. let userInfoRef = Ref(UserInfo())
  75. self.userInfoRef = userInfoRef
  76. self.interceptors = ServerInterceptorPipeline(
  77. logger: context.logger,
  78. eventLoop: context.eventLoop,
  79. path: context.path,
  80. callType: .bidirectionalStreaming,
  81. remoteAddress: context.remoteAddress,
  82. userInfoRef: userInfoRef,
  83. interceptors: interceptors,
  84. onRequestPart: self.receiveInterceptedPart(_:),
  85. onResponsePart: self.sendInterceptedPart(_:promise:)
  86. )
  87. }
  88. // MARK: - Public API: gRPC to Handler
  89. @inlinable
  90. public func receiveMetadata(_ headers: HPACKHeaders) {
  91. self.interceptors.receive(.metadata(headers))
  92. }
  93. @inlinable
  94. public func receiveMessage(_ bytes: ByteBuffer) {
  95. do {
  96. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  97. self.interceptors.receive(.message(message))
  98. } catch {
  99. self.handleError(error)
  100. }
  101. }
  102. @inlinable
  103. public func receiveEnd() {
  104. self.interceptors.receive(.end)
  105. }
  106. @inlinable
  107. public func receiveError(_ error: Error) {
  108. self._finish(error: error)
  109. }
  110. @inlinable
  111. public func finish() {
  112. self._finish(error: nil)
  113. }
  114. @inlinable
  115. internal func _finish(error: Error?) {
  116. switch self.state {
  117. case .idle:
  118. self.interceptors = nil
  119. self.state = .completed
  120. case let .creatingObserver(context),
  121. let .observing(_, context):
  122. let error = error ?? GRPCStatus(code: .unavailable, message: nil)
  123. context.statusPromise.fail(error)
  124. case .completed:
  125. self.interceptors = nil
  126. }
  127. }
  128. // MARK: - Interceptors to User Function
  129. @inlinable
  130. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  131. switch part {
  132. case let .metadata(headers):
  133. self.receiveInterceptedMetadata(headers)
  134. case let .message(message):
  135. self.receiveInterceptedMessage(message)
  136. case .end:
  137. self.receiveInterceptedEnd()
  138. }
  139. }
  140. @inlinable
  141. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  142. switch self.state {
  143. case .idle:
  144. // Make a context to invoke the observer block factory with.
  145. let context = _StreamingResponseCallContext<Request, Response>(
  146. eventLoop: self.context.eventLoop,
  147. headers: headers,
  148. logger: self.context.logger,
  149. userInfoRef: self.userInfoRef,
  150. sendResponse: self.interceptResponse(_:metadata:promise:)
  151. )
  152. // Move to the next state.
  153. self.state = .creatingObserver(context)
  154. // Send response headers back via the interceptors.
  155. self.interceptors.send(.metadata([:]), promise: nil)
  156. // Register callbacks on the status future.
  157. context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
  158. // Make an observer block and register a completion block.
  159. self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
  160. case .creatingObserver, .observing:
  161. self.handleError(GRPCError.InvalidState("Protocol violation: already received headers"))
  162. case .completed:
  163. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  164. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  165. // Dropping them is fine.
  166. ()
  167. }
  168. }
  169. @inlinable
  170. internal func receiveInterceptedMessage(_ request: Request) {
  171. switch self.state {
  172. case .idle:
  173. self.handleError(
  174. GRPCError.InvalidState("Protocol violation: message received before headers")
  175. )
  176. case .creatingObserver:
  177. self.requestBuffer.append(.message(request))
  178. case let .observing(observer, _):
  179. observer(.message(request))
  180. case .completed:
  181. // We received a message but we're already done: this may happen if we terminate the RPC
  182. // due to a channel error, for example.
  183. ()
  184. }
  185. }
  186. @inlinable
  187. internal func receiveInterceptedEnd() {
  188. switch self.state {
  189. case .idle:
  190. self.handleError(
  191. GRPCError.InvalidState("Protocol violation: end of stream received before headers")
  192. )
  193. case .creatingObserver:
  194. self.requestBuffer.append(.end)
  195. case let .observing(observer, _):
  196. observer(.end)
  197. case .completed:
  198. // We received a message but we're already done: this may happen if we terminate the RPC
  199. // due to a channel error, for example.
  200. ()
  201. }
  202. }
  203. // MARK: - User Function To Interceptors
  204. @inlinable
  205. internal func userFunctionResolvedWithResult(
  206. _ result: Result<(StreamEvent<Request>) -> Void, Error>
  207. ) {
  208. switch self.state {
  209. case .idle, .observing:
  210. // The observer block can't resolve if it hasn't been created ('idle') and it can't be
  211. // resolved more than once ('observing').
  212. preconditionFailure()
  213. case let .creatingObserver(context):
  214. switch result {
  215. case let .success(observer):
  216. // We have an observer block now; unbuffer any requests.
  217. self.state = .observing(observer, context)
  218. while let request = self.requestBuffer.popFirst() {
  219. observer(request)
  220. }
  221. case let .failure(error):
  222. self.handleError(error)
  223. }
  224. case .completed:
  225. // We've already completed. That's fine.
  226. ()
  227. }
  228. }
  229. @inlinable
  230. internal func interceptResponse(
  231. _ response: Response,
  232. metadata: MessageMetadata,
  233. promise: EventLoopPromise<Void>?
  234. ) {
  235. switch self.state {
  236. case .idle:
  237. // The observer block can't end responses if it doesn't exist!
  238. preconditionFailure()
  239. case .creatingObserver, .observing:
  240. // The user has access to the response context before returning a future observer,
  241. // so 'creatingObserver' is valid here (if a little strange).
  242. self.interceptors.send(.message(response, metadata), promise: promise)
  243. case .completed:
  244. promise?.fail(GRPCError.AlreadyComplete())
  245. }
  246. }
  247. @inlinable
  248. internal func userFunctionStatusResolved(_ result: Result<GRPCStatus, Error>) {
  249. switch self.state {
  250. case .idle:
  251. // The promise can't fail before we create it.
  252. preconditionFailure()
  253. // Making is possible, the user can complete the status before returning a stream handler.
  254. case let .creatingObserver(context), let .observing(_, context):
  255. switch result {
  256. case let .success(status):
  257. self.interceptors.send(.end(status, context.trailers), promise: nil)
  258. case let .failure(error):
  259. let (status, trailers) = ServerErrorProcessor.processObserverError(
  260. error,
  261. headers: context.headers,
  262. trailers: context.trailers,
  263. delegate: self.context.errorDelegate
  264. )
  265. self.interceptors.send(.end(status, trailers), promise: nil)
  266. }
  267. case .completed:
  268. ()
  269. }
  270. }
  271. @inlinable
  272. internal func handleError(_ error: Error) {
  273. switch self.state {
  274. case .idle:
  275. // We don't have a promise to fail. Just send back end.
  276. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  277. error,
  278. delegate: self.context.errorDelegate
  279. )
  280. self.interceptors.send(.end(status, trailers), promise: nil)
  281. case let .creatingObserver(context):
  282. context.statusPromise.fail(error)
  283. case let .observing(_, context):
  284. context.statusPromise.fail(error)
  285. case .completed:
  286. ()
  287. }
  288. }
  289. @inlinable
  290. internal func sendInterceptedPart(
  291. _ part: GRPCServerResponsePart<Response>,
  292. promise: EventLoopPromise<Void>?
  293. ) {
  294. switch part {
  295. case let .metadata(headers):
  296. self.context.responseWriter.sendMetadata(headers, promise: promise)
  297. case let .message(message, metadata):
  298. do {
  299. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  300. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  301. } catch {
  302. // Serialization failed: fail the promise and send end.
  303. promise?.fail(error)
  304. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  305. error,
  306. delegate: self.context.errorDelegate
  307. )
  308. // Loop back via the interceptors.
  309. self.interceptors.send(.end(status, trailers), promise: nil)
  310. }
  311. case let .end(status, trailers):
  312. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  313. }
  314. }
  315. }