GRPCAsyncServerHandler.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. import Logging
  18. import NIOCore
  19. import NIOHPACK
  20. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  21. public struct GRPCAsyncServerHandler<
  22. Serializer: MessageSerializer,
  23. Deserializer: MessageDeserializer,
  24. Request: Sendable,
  25. Response: Sendable
  26. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  27. @usableFromInline
  28. internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
  29. public func receiveMetadata(_ metadata: HPACKHeaders) {
  30. self._handler.receiveMetadata(metadata)
  31. }
  32. public func receiveMessage(_ bytes: ByteBuffer) {
  33. self._handler.receiveMessage(bytes)
  34. }
  35. public func receiveEnd() {
  36. self._handler.receiveEnd()
  37. }
  38. public func receiveError(_ error: Error) {
  39. self._handler.receiveError(error)
  40. }
  41. public func finish() {
  42. self._handler.finish()
  43. }
  44. }
  45. // MARK: - RPC Adapters
  46. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  47. extension GRPCAsyncServerHandler {
  48. public typealias Request = Deserializer.Output
  49. public typealias Response = Serializer.Input
  50. @inlinable
  51. public init(
  52. context: CallHandlerContext,
  53. requestDeserializer: Deserializer,
  54. responseSerializer: Serializer,
  55. interceptors: [ServerInterceptor<Request, Response>],
  56. wrapping unary: @escaping @Sendable (Request, GRPCAsyncServerCallContext) async throws
  57. -> Response
  58. ) {
  59. self._handler = .init(
  60. context: context,
  61. requestDeserializer: requestDeserializer,
  62. responseSerializer: responseSerializer,
  63. callType: .unary,
  64. interceptors: interceptors,
  65. userHandler: { requestStream, responseStreamWriter, context in
  66. var iterator = requestStream.makeAsyncIterator()
  67. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  68. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  69. }
  70. let response = try await unary(request, context)
  71. try await responseStreamWriter.send(response)
  72. }
  73. )
  74. }
  75. @inlinable
  76. public init(
  77. context: CallHandlerContext,
  78. requestDeserializer: Deserializer,
  79. responseSerializer: Serializer,
  80. interceptors: [ServerInterceptor<Request, Response>],
  81. wrapping clientStreaming: @escaping @Sendable (
  82. GRPCAsyncRequestStream<Request>,
  83. GRPCAsyncServerCallContext
  84. ) async throws -> Response
  85. ) {
  86. self._handler = .init(
  87. context: context,
  88. requestDeserializer: requestDeserializer,
  89. responseSerializer: responseSerializer,
  90. callType: .clientStreaming,
  91. interceptors: interceptors,
  92. userHandler: { requestStream, responseStreamWriter, context in
  93. let response = try await clientStreaming(requestStream, context)
  94. try await responseStreamWriter.send(response)
  95. }
  96. )
  97. }
  98. @inlinable
  99. public init(
  100. context: CallHandlerContext,
  101. requestDeserializer: Deserializer,
  102. responseSerializer: Serializer,
  103. interceptors: [ServerInterceptor<Request, Response>],
  104. wrapping serverStreaming: @escaping @Sendable (
  105. Request,
  106. GRPCAsyncResponseStreamWriter<Response>,
  107. GRPCAsyncServerCallContext
  108. ) async throws -> Void
  109. ) {
  110. self._handler = .init(
  111. context: context,
  112. requestDeserializer: requestDeserializer,
  113. responseSerializer: responseSerializer,
  114. callType: .serverStreaming,
  115. interceptors: interceptors,
  116. userHandler: { requestStream, responseStreamWriter, context in
  117. var iterator = requestStream.makeAsyncIterator()
  118. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  119. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  120. }
  121. try await serverStreaming(request, responseStreamWriter, context)
  122. }
  123. )
  124. }
  125. @inlinable
  126. public init(
  127. context: CallHandlerContext,
  128. requestDeserializer: Deserializer,
  129. responseSerializer: Serializer,
  130. interceptors: [ServerInterceptor<Request, Response>],
  131. wrapping bidirectional: @escaping @Sendable (
  132. GRPCAsyncRequestStream<Request>,
  133. GRPCAsyncResponseStreamWriter<Response>,
  134. GRPCAsyncServerCallContext
  135. ) async throws -> Void
  136. ) {
  137. self._handler = .init(
  138. context: context,
  139. requestDeserializer: requestDeserializer,
  140. responseSerializer: responseSerializer,
  141. callType: .bidirectionalStreaming,
  142. interceptors: interceptors,
  143. userHandler: bidirectional
  144. )
  145. }
  146. }
  147. // MARK: - Server Handler
  148. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  149. @usableFromInline
  150. internal final class AsyncServerHandler<
  151. Serializer: MessageSerializer,
  152. Deserializer: MessageDeserializer,
  153. Request: Sendable,
  154. Response: Sendable
  155. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  156. /// A response serializer.
  157. @usableFromInline
  158. internal let serializer: Serializer
  159. /// A request deserializer.
  160. @usableFromInline
  161. internal let deserializer: Deserializer
  162. /// The event loop that this handler executes on.
  163. @usableFromInline
  164. internal let eventLoop: EventLoop
  165. /// A `ByteBuffer` allocator provided by the underlying `Channel`.
  166. @usableFromInline
  167. internal let allocator: ByteBufferAllocator
  168. /// A user-provided error delegate which, if provided, is used to transform errors and potentially
  169. /// pack errors into trailers.
  170. @usableFromInline
  171. internal let errorDelegate: ServerErrorDelegate?
  172. /// A logger.
  173. @usableFromInline
  174. internal let logger: Logger
  175. /// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
  176. /// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
  177. /// an appropriate event loop.
  178. @usableFromInline
  179. internal let userInfoRef: Ref<UserInfo>
  180. /// Whether compression is enabled on the server and an algorithm has been negotiated with
  181. /// the client
  182. @usableFromInline
  183. internal let compressionEnabledOnRPC: Bool
  184. /// Whether the RPC method would like to compress responses (if possible). Defaults to true.
  185. @usableFromInline
  186. internal var compressResponsesIfPossible: Bool
  187. /// The interceptor pipeline does not track flushing as a separate event. The flush decision is
  188. /// included with metadata alongside each message. For the status and trailers the flush is
  189. /// implicit. For headers we track whether to flush here.
  190. ///
  191. /// In most cases the flush will be delayed until the first message is flushed and this will
  192. /// remain unset. However, this may be set when the server handler
  193. /// uses ``GRPCAsyncServerCallContext/sendHeaders(_:)``.
  194. @usableFromInline
  195. internal var flushNextHeaders: Bool
  196. /// A state machine for the interceptor pipeline.
  197. @usableFromInline
  198. internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine
  199. /// The interceptor pipeline.
  200. @usableFromInline
  201. internal private(set) var interceptors: Optional<ServerInterceptorPipeline<Request, Response>>
  202. /// An object for writing intercepted responses to the channel.
  203. @usableFromInline
  204. internal private(set) var responseWriter: Optional<GRPCServerResponseWriter>
  205. /// A state machine for the user implemented function.
  206. @usableFromInline
  207. internal private(set) var handlerStateMachine: ServerHandlerStateMachine
  208. /// A bag of components used by the user handler.
  209. @usableFromInline
  210. internal private(set) var handlerComponents: Optional<ServerHandlerComponents<
  211. Request,
  212. Response,
  213. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  214. >>
  215. /// The user provided function to execute.
  216. @usableFromInline
  217. internal let userHandler: @Sendable (
  218. GRPCAsyncRequestStream<Request>,
  219. GRPCAsyncResponseStreamWriter<Response>,
  220. GRPCAsyncServerCallContext
  221. ) async throws -> Void
  222. @usableFromInline
  223. internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
  224. Request,
  225. Error,
  226. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  227. GRPCAsyncSequenceProducerDelegate
  228. >
  229. @inlinable
  230. internal init(
  231. context: CallHandlerContext,
  232. requestDeserializer: Deserializer,
  233. responseSerializer: Serializer,
  234. callType: GRPCCallType,
  235. interceptors: [ServerInterceptor<Request, Response>],
  236. userHandler: @escaping @Sendable (
  237. GRPCAsyncRequestStream<Request>,
  238. GRPCAsyncResponseStreamWriter<Response>,
  239. GRPCAsyncServerCallContext
  240. ) async throws -> Void
  241. ) {
  242. self.serializer = responseSerializer
  243. self.deserializer = requestDeserializer
  244. self.eventLoop = context.eventLoop
  245. self.allocator = context.allocator
  246. self.responseWriter = context.responseWriter
  247. self.errorDelegate = context.errorDelegate
  248. self.compressionEnabledOnRPC = context.encoding.isEnabled
  249. self.compressResponsesIfPossible = true
  250. self.flushNextHeaders = false
  251. self.logger = context.logger
  252. self.userInfoRef = Ref(UserInfo())
  253. self.handlerStateMachine = .init()
  254. self.handlerComponents = nil
  255. self.userHandler = userHandler
  256. self.interceptorStateMachine = .init()
  257. self.interceptors = nil
  258. self.interceptors = ServerInterceptorPipeline(
  259. logger: context.logger,
  260. eventLoop: context.eventLoop,
  261. path: context.path,
  262. callType: callType,
  263. remoteAddress: context.remoteAddress,
  264. userInfoRef: self.userInfoRef,
  265. closeFuture: context.closeFuture,
  266. interceptors: interceptors,
  267. onRequestPart: self.receiveInterceptedPart(_:),
  268. onResponsePart: self.sendInterceptedPart(_:promise:)
  269. )
  270. }
  271. // MARK: - GRPCServerHandlerProtocol conformance
  272. @inlinable
  273. internal func receiveMetadata(_ headers: HPACKHeaders) {
  274. switch self.interceptorStateMachine.interceptRequestMetadata() {
  275. case .intercept:
  276. self.interceptors?.receive(.metadata(headers))
  277. case .cancel:
  278. self.cancel(error: nil)
  279. case .drop:
  280. ()
  281. }
  282. }
  283. @inlinable
  284. internal func receiveMessage(_ bytes: ByteBuffer) {
  285. let request: Request
  286. do {
  287. request = try self.deserializer.deserialize(byteBuffer: bytes)
  288. } catch {
  289. return self.cancel(error: error)
  290. }
  291. switch self.interceptorStateMachine.interceptRequestMessage() {
  292. case .intercept:
  293. self.interceptors?.receive(.message(request))
  294. case .cancel:
  295. self.cancel(error: nil)
  296. case .drop:
  297. ()
  298. }
  299. }
  300. @inlinable
  301. internal func receiveEnd() {
  302. switch self.interceptorStateMachine.interceptRequestEnd() {
  303. case .intercept:
  304. self.interceptors?.receive(.end)
  305. case .cancel:
  306. self.cancel(error: nil)
  307. case .drop:
  308. ()
  309. }
  310. }
  311. @inlinable
  312. internal func receiveError(_ error: Error) {
  313. self.cancel(error: error)
  314. }
  315. @inlinable
  316. internal func finish() {
  317. self.cancel(error: nil)
  318. }
  319. @usableFromInline
  320. internal func cancel(error: Error?) {
  321. self.eventLoop.assertInEventLoop()
  322. switch self.handlerStateMachine.cancel() {
  323. case .cancelAndNilOutHandlerComponents:
  324. // Cancel handler related things (task, response writer).
  325. self.handlerComponents?.cancel()
  326. self.handlerComponents = nil
  327. // We don't distinguish between having sent the status or not; we just tell the interceptor
  328. // state machine that we want to send a response status. It will inform us whether to
  329. // generate and send one or not.
  330. switch self.interceptorStateMachine.interceptedResponseStatus() {
  331. case .forward:
  332. let error = error ?? GRPCStatus.processingError
  333. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  334. error,
  335. delegate: self.errorDelegate
  336. )
  337. self.responseWriter?.sendEnd(status: status, trailers: trailers, promise: nil)
  338. case .drop, .cancel:
  339. ()
  340. }
  341. case .none:
  342. ()
  343. }
  344. switch self.interceptorStateMachine.cancel() {
  345. case .sendStatusThenNilOutInterceptorPipeline:
  346. self.responseWriter?.sendEnd(status: .processingError, trailers: [:], promise: nil)
  347. fallthrough
  348. case .nilOutInterceptorPipeline:
  349. self.interceptors = nil
  350. self.responseWriter = nil
  351. case .none:
  352. ()
  353. }
  354. }
  355. // MARK: - Interceptors to User Function
  356. @inlinable
  357. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  358. switch part {
  359. case let .metadata(headers):
  360. self.receiveInterceptedMetadata(headers)
  361. case let .message(message):
  362. self.receiveInterceptedMessage(message)
  363. case .end:
  364. self.receiveInterceptedEnd()
  365. }
  366. }
  367. @inlinable
  368. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  369. switch self.interceptorStateMachine.interceptedRequestMetadata() {
  370. case .forward:
  371. () // continue
  372. case .cancel:
  373. return self.cancel(error: nil)
  374. case .drop:
  375. return
  376. }
  377. switch self.handlerStateMachine.handleMetadata() {
  378. case .invokeHandler:
  379. // We're going to invoke the handler. We need to create a handful of things in order to do
  380. // that:
  381. //
  382. // - A context which allows the handler to set response headers/trailers and provides them
  383. // with a logger amongst other things.
  384. // - A request source; we push request messages into this which the handler consumes via
  385. // an async sequence.
  386. // - An async writer and delegate. The delegate calls us back with responses. The writer is
  387. // passed to the handler.
  388. //
  389. // All of these components are held in a bundle ("handler components") outside of the state
  390. // machine. We release these when we eventually call cancel (either when we `self.cancel()`
  391. // as a result of an error or when `self.finish()` is called).
  392. let handlerContext = GRPCAsyncServerCallContext(
  393. headers: headers,
  394. logger: self.logger,
  395. contextProvider: self
  396. )
  397. let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
  398. lowWatermark: 10,
  399. highWatermark: 50
  400. )
  401. let requestSequenceProducer = NIOThrowingAsyncSequenceProducer.makeSequence(
  402. elementType: Request.self,
  403. failureType: Error.self,
  404. backPressureStrategy: backpressureStrategy,
  405. delegate: GRPCAsyncSequenceProducerDelegate()
  406. )
  407. let responseWriter = NIOAsyncWriter.makeWriter(
  408. isWritable: true,
  409. delegate: GRPCAsyncWriterSinkDelegate<(Response, Compression)>(
  410. didYield: self.interceptResponseMessages,
  411. didTerminate: { error in
  412. self.interceptTermination(error)
  413. }
  414. )
  415. )
  416. // Update our state before invoke the handler.
  417. self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
  418. self.handlerComponents = ServerHandlerComponents<
  419. Request,
  420. Response,
  421. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  422. >(
  423. requestSource: requestSequenceProducer.source,
  424. responseWriterSink: responseWriter.sink,
  425. task: Task {
  426. // We don't have a task cancellation handler here: we do it in `self.cancel()`.
  427. await self.invokeUserHandler(
  428. requestSequence: requestSequenceProducer,
  429. responseWriter: responseWriter.writer,
  430. callContext: handlerContext
  431. )
  432. }
  433. )
  434. case .cancel:
  435. self.cancel(error: nil)
  436. }
  437. }
  438. @Sendable
  439. @usableFromInline
  440. internal func invokeUserHandler(
  441. requestSequence: AsyncSequenceProducer.NewSequence,
  442. responseWriter: NIOAsyncWriter<
  443. (Response, Compression),
  444. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  445. >,
  446. callContext: GRPCAsyncServerCallContext
  447. ) async {
  448. defer {
  449. // It's possible the user handler completed before the end of the request stream. We
  450. // explicitly finish it to drop any unconsumed inbound messages.
  451. requestSequence.source.finish()
  452. }
  453. do {
  454. let grpcRequestStream = GRPCAsyncRequestStream(requestSequence.sequence)
  455. let grpcResponseStreamWriter = GRPCAsyncResponseStreamWriter(wrapping: responseWriter)
  456. try await self.userHandler(grpcRequestStream, grpcResponseStreamWriter, callContext)
  457. responseWriter.finish()
  458. } catch {
  459. responseWriter.finish(error: error)
  460. }
  461. }
  462. @inlinable
  463. internal func receiveInterceptedMessage(_ request: Request) {
  464. switch self.interceptorStateMachine.interceptedRequestMessage() {
  465. case .forward:
  466. switch self.handlerStateMachine.handleMessage() {
  467. case .forward:
  468. _ = self.handlerComponents?.requestSource.yield(request)
  469. case .cancel:
  470. self.cancel(error: nil)
  471. }
  472. case .cancel:
  473. self.cancel(error: nil)
  474. case .drop:
  475. ()
  476. }
  477. }
  478. @inlinable
  479. internal func receiveInterceptedEnd() {
  480. switch self.interceptorStateMachine.interceptedRequestEnd() {
  481. case .forward:
  482. switch self.handlerStateMachine.handleEnd() {
  483. case .forward:
  484. self.handlerComponents?.requestSource.finish()
  485. case .cancel:
  486. self.cancel(error: nil)
  487. }
  488. case .cancel:
  489. self.cancel(error: nil)
  490. case .drop:
  491. ()
  492. }
  493. }
  494. // MARK: - User Function To Interceptors
  495. @inlinable
  496. internal func _interceptResponseMessage(_ response: Response, compression: Compression) {
  497. self.eventLoop.assertInEventLoop()
  498. switch self.handlerStateMachine.sendMessage() {
  499. case let .intercept(.some(headers)):
  500. switch self.interceptorStateMachine.interceptResponseMetadata() {
  501. case .intercept:
  502. self.interceptors?.send(.metadata(headers), promise: nil)
  503. case .cancel:
  504. return self.cancel(error: nil)
  505. case .drop:
  506. ()
  507. }
  508. // Fall through to the next case to send the response message.
  509. fallthrough
  510. case .intercept(.none):
  511. switch self.interceptorStateMachine.interceptResponseMessage() {
  512. case .intercept:
  513. let senderWantsCompression = compression.isEnabled(
  514. callDefault: self.compressResponsesIfPossible
  515. )
  516. let compress = self.compressionEnabledOnRPC && senderWantsCompression
  517. let metadata = MessageMetadata(compress: compress, flush: true)
  518. self.interceptors?.send(.message(response, metadata), promise: nil)
  519. case .cancel:
  520. return self.cancel(error: nil)
  521. case .drop:
  522. ()
  523. }
  524. case .drop:
  525. ()
  526. }
  527. }
  528. @Sendable
  529. @inlinable
  530. internal func interceptResponseMessages(_ messages: Deque<(Response, Compression)>) {
  531. if self.eventLoop.inEventLoop {
  532. for message in messages {
  533. self._interceptResponseMessage(message.0, compression: message.1)
  534. }
  535. } else {
  536. self.eventLoop.execute {
  537. for message in messages {
  538. self._interceptResponseMessage(message.0, compression: message.1)
  539. }
  540. }
  541. }
  542. }
  543. @inlinable
  544. internal func _interceptTermination(_ error: Error?) {
  545. self.eventLoop.assertInEventLoop()
  546. let processedError: Error?
  547. if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
  548. processedError = GRPCStatus(
  549. code: .unknown,
  550. message: "Handler threw error with status code 'ok'."
  551. )
  552. } else {
  553. processedError = error
  554. }
  555. switch self.handlerStateMachine.sendStatus() {
  556. case let .intercept(requestHeaders, trailers):
  557. let status: GRPCStatus
  558. let processedTrailers: HPACKHeaders
  559. if let processedError = processedError {
  560. (status, processedTrailers) = ServerErrorProcessor.processObserverError(
  561. processedError,
  562. headers: requestHeaders,
  563. trailers: trailers,
  564. delegate: self.errorDelegate
  565. )
  566. } else {
  567. status = GRPCStatus.ok
  568. processedTrailers = trailers
  569. }
  570. switch self.interceptorStateMachine.interceptResponseStatus() {
  571. case .intercept:
  572. self.interceptors?.send(.end(status, processedTrailers), promise: nil)
  573. case .cancel:
  574. return self.cancel(error: nil)
  575. case .drop:
  576. ()
  577. }
  578. case .drop:
  579. ()
  580. }
  581. }
  582. @Sendable
  583. @inlinable
  584. internal func interceptTermination(_ status: Error?) {
  585. if self.eventLoop.inEventLoop {
  586. self._interceptTermination(status)
  587. } else {
  588. self.eventLoop.execute {
  589. self._interceptTermination(status)
  590. }
  591. }
  592. }
  593. @inlinable
  594. internal func sendInterceptedPart(
  595. _ part: GRPCServerResponsePart<Response>,
  596. promise: EventLoopPromise<Void>?
  597. ) {
  598. switch part {
  599. case let .metadata(headers):
  600. self.sendInterceptedMetadata(headers, promise: promise)
  601. case let .message(message, metadata):
  602. do {
  603. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  604. self.sendInterceptedResponse(bytes, metadata: metadata, promise: promise)
  605. } catch {
  606. promise?.fail(error)
  607. self.cancel(error: error)
  608. }
  609. case let .end(status, trailers):
  610. self.sendInterceptedStatus(status, metadata: trailers, promise: promise)
  611. }
  612. }
  613. @inlinable
  614. internal func sendInterceptedMetadata(
  615. _ metadata: HPACKHeaders,
  616. promise: EventLoopPromise<Void>?
  617. ) {
  618. switch self.interceptorStateMachine.interceptedResponseMetadata() {
  619. case .forward:
  620. if let responseWriter = self.responseWriter {
  621. let flush = self.flushNextHeaders
  622. self.flushNextHeaders = false
  623. responseWriter.sendMetadata(metadata, flush: flush, promise: promise)
  624. } else if let promise = promise {
  625. promise.fail(GRPCStatus.processingError)
  626. }
  627. case .cancel:
  628. self.cancel(error: nil)
  629. case .drop:
  630. ()
  631. }
  632. }
  633. @inlinable
  634. internal func sendInterceptedResponse(
  635. _ bytes: ByteBuffer,
  636. metadata: MessageMetadata,
  637. promise: EventLoopPromise<Void>?
  638. ) {
  639. switch self.interceptorStateMachine.interceptedResponseMessage() {
  640. case .forward:
  641. if let responseWriter = self.responseWriter {
  642. responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  643. } else if let promise = promise {
  644. promise.fail(GRPCStatus.processingError)
  645. }
  646. case .cancel:
  647. self.cancel(error: nil)
  648. case .drop:
  649. ()
  650. }
  651. }
  652. @inlinable
  653. internal func sendInterceptedStatus(
  654. _ status: GRPCStatus,
  655. metadata: HPACKHeaders,
  656. promise: EventLoopPromise<Void>?
  657. ) {
  658. switch self.interceptorStateMachine.interceptedResponseStatus() {
  659. case .forward:
  660. if let responseWriter = self.responseWriter {
  661. responseWriter.sendEnd(status: status, trailers: metadata, promise: promise)
  662. } else if let promise = promise {
  663. promise.fail(GRPCStatus.processingError)
  664. }
  665. case .cancel:
  666. self.cancel(error: nil)
  667. case .drop:
  668. ()
  669. }
  670. }
  671. }
  672. // Sendability is unchecked as all mutable state is accessed/modified from an appropriate event
  673. // loop.
  674. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  675. extension AsyncServerHandler: @unchecked Sendable {}
  676. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  677. extension AsyncServerHandler: AsyncServerCallContextProvider {
  678. @usableFromInline
  679. internal func setResponseHeaders(_ headers: HPACKHeaders) async throws {
  680. let completed = self.eventLoop.submit {
  681. if !self.handlerStateMachine.setResponseHeaders(headers) {
  682. throw GRPCStatus(
  683. code: .failedPrecondition,
  684. message: "Tried to send response headers in an invalid state"
  685. )
  686. }
  687. }
  688. try await completed.get()
  689. }
  690. @usableFromInline
  691. internal func acceptRPC(_ headers: HPACKHeaders) async {
  692. let completed = self.eventLoop.submit {
  693. guard self.handlerStateMachine.setResponseHeaders(headers) else { return }
  694. // Shh,it's a lie! We don't really have a message to send but the state machine doesn't know
  695. // (or care) about that. It will, however, tell us if we can send the headers or not.
  696. switch self.handlerStateMachine.sendMessage() {
  697. case let .intercept(.some(headers)):
  698. switch self.interceptorStateMachine.interceptResponseMetadata() {
  699. case .intercept:
  700. self.flushNextHeaders = true
  701. self.interceptors?.send(.metadata(headers), promise: nil)
  702. case .cancel:
  703. return self.cancel(error: nil)
  704. case .drop:
  705. ()
  706. }
  707. case .intercept(.none), .drop:
  708. // intercept(.none) means headers have already been sent; we should never hit this because
  709. // we guard on setting the response headers above.
  710. ()
  711. }
  712. }
  713. try? await completed.get()
  714. }
  715. @usableFromInline
  716. internal func setResponseTrailers(_ headers: HPACKHeaders) async throws {
  717. let completed = self.eventLoop.submit {
  718. self.handlerStateMachine.setResponseTrailers(headers)
  719. }
  720. try await completed.get()
  721. }
  722. @usableFromInline
  723. internal func setResponseCompression(_ enabled: Bool) async throws {
  724. let completed = self.eventLoop.submit {
  725. self.compressResponsesIfPossible = enabled
  726. }
  727. try await completed.get()
  728. }
  729. @usableFromInline
  730. func withUserInfo<Result: Sendable>(
  731. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  732. ) async throws -> Result {
  733. let result = self.eventLoop.submit {
  734. try modify(self.userInfoRef.value)
  735. }
  736. return try await result.get()
  737. }
  738. @usableFromInline
  739. func withMutableUserInfo<Result: Sendable>(
  740. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  741. ) async throws -> Result {
  742. let result = self.eventLoop.submit {
  743. try modify(&self.userInfoRef.value)
  744. }
  745. return try await result.get()
  746. }
  747. }
  748. /// This protocol exists so that the generic server handler can be erased from the
  749. /// `GRPCAsyncServerCallContext`.
  750. ///
  751. /// It provides methods which update context on the async handler by first executing onto the
  752. /// correct event loop.
  753. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  754. @usableFromInline
  755. protocol AsyncServerCallContextProvider: Sendable {
  756. func setResponseHeaders(_ headers: HPACKHeaders) async throws
  757. func acceptRPC(_ headers: HPACKHeaders) async
  758. func setResponseTrailers(_ trailers: HPACKHeaders) async throws
  759. func setResponseCompression(_ enabled: Bool) async throws
  760. func withUserInfo<Result: Sendable>(
  761. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  762. ) async throws -> Result
  763. func withMutableUserInfo<Result: Sendable>(
  764. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  765. ) async throws -> Result
  766. }
  767. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  768. @usableFromInline
  769. internal struct ServerHandlerComponents<
  770. Request: Sendable,
  771. Response: Sendable,
  772. Delegate: NIOAsyncWriterSinkDelegate
  773. > where Delegate.Element == (Response, Compression) {
  774. @usableFromInline
  775. internal typealias AsyncWriterSink = NIOAsyncWriter<(Response, Compression), Delegate>.Sink
  776. @usableFromInline
  777. internal typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer<
  778. Request,
  779. Error,
  780. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  781. GRPCAsyncSequenceProducerDelegate
  782. >.Source
  783. @usableFromInline
  784. internal let task: Task<Void, Never>
  785. @usableFromInline
  786. internal let responseWriterSink: AsyncWriterSink
  787. @usableFromInline
  788. internal let requestSource: AsyncSequenceSource
  789. @inlinable
  790. init(
  791. requestSource: AsyncSequenceSource,
  792. responseWriterSink: AsyncWriterSink,
  793. task: Task<Void, Never>
  794. ) {
  795. self.task = task
  796. self.responseWriterSink = responseWriterSink
  797. self.requestSource = requestSource
  798. }
  799. func cancel() {
  800. // Cancel the request and response streams.
  801. //
  802. // The user handler is encouraged to check for cancellation, however, we should assume
  803. // they do not. Finishing the request source stops any more requests from being delivered
  804. // to the request stream, and finishing the writer sink will ensure no more responses are
  805. // written. This should reduce how long the user handler runs for as it can no longer do
  806. // anything useful.
  807. self.requestSource.finish()
  808. self.responseWriterSink.finish(error: CancellationError())
  809. self.task.cancel()
  810. }
  811. }