GRPCAsyncServerHandler.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. import Logging
  18. import NIOCore
  19. import NIOHPACK
  20. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  21. public struct GRPCAsyncServerHandler<
  22. Serializer: MessageSerializer,
  23. Deserializer: MessageDeserializer,
  24. Request: Sendable,
  25. Response: Sendable
  26. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  27. @usableFromInline
  28. internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
  29. public func receiveMetadata(_ metadata: HPACKHeaders) {
  30. self._handler.receiveMetadata(metadata)
  31. }
  32. public func receiveMessage(_ bytes: ByteBuffer) {
  33. self._handler.receiveMessage(bytes)
  34. }
  35. public func receiveEnd() {
  36. self._handler.receiveEnd()
  37. }
  38. public func receiveError(_ error: Error) {
  39. self._handler.receiveError(error)
  40. }
  41. public func finish() {
  42. self._handler.finish()
  43. }
  44. }
  45. // MARK: - RPC Adapters
  46. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  47. extension GRPCAsyncServerHandler {
  48. public typealias Request = Deserializer.Output
  49. public typealias Response = Serializer.Input
  50. @inlinable
  51. public init(
  52. context: CallHandlerContext,
  53. requestDeserializer: Deserializer,
  54. responseSerializer: Serializer,
  55. interceptors: [ServerInterceptor<Request, Response>],
  56. wrapping unary: @escaping @Sendable (Request, GRPCAsyncServerCallContext) async throws
  57. -> Response
  58. ) {
  59. self._handler = .init(
  60. context: context,
  61. requestDeserializer: requestDeserializer,
  62. responseSerializer: responseSerializer,
  63. callType: .unary,
  64. interceptors: interceptors,
  65. userHandler: { requestStream, responseStreamWriter, context in
  66. var iterator = requestStream.makeAsyncIterator()
  67. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  68. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  69. }
  70. let response = try await unary(request, context)
  71. try await responseStreamWriter.send(response)
  72. }
  73. )
  74. }
  75. @inlinable
  76. public init(
  77. context: CallHandlerContext,
  78. requestDeserializer: Deserializer,
  79. responseSerializer: Serializer,
  80. interceptors: [ServerInterceptor<Request, Response>],
  81. wrapping clientStreaming: @escaping @Sendable (
  82. GRPCAsyncRequestStream<Request>,
  83. GRPCAsyncServerCallContext
  84. ) async throws -> Response
  85. ) {
  86. self._handler = .init(
  87. context: context,
  88. requestDeserializer: requestDeserializer,
  89. responseSerializer: responseSerializer,
  90. callType: .clientStreaming,
  91. interceptors: interceptors,
  92. userHandler: { requestStream, responseStreamWriter, context in
  93. let response = try await clientStreaming(requestStream, context)
  94. try await responseStreamWriter.send(response)
  95. }
  96. )
  97. }
  98. @inlinable
  99. public init(
  100. context: CallHandlerContext,
  101. requestDeserializer: Deserializer,
  102. responseSerializer: Serializer,
  103. interceptors: [ServerInterceptor<Request, Response>],
  104. wrapping serverStreaming: @escaping @Sendable (
  105. Request,
  106. GRPCAsyncResponseStreamWriter<Response>,
  107. GRPCAsyncServerCallContext
  108. ) async throws -> Void
  109. ) {
  110. self._handler = .init(
  111. context: context,
  112. requestDeserializer: requestDeserializer,
  113. responseSerializer: responseSerializer,
  114. callType: .serverStreaming,
  115. interceptors: interceptors,
  116. userHandler: { requestStream, responseStreamWriter, context in
  117. var iterator = requestStream.makeAsyncIterator()
  118. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  119. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  120. }
  121. try await serverStreaming(request, responseStreamWriter, context)
  122. }
  123. )
  124. }
  125. @inlinable
  126. public init(
  127. context: CallHandlerContext,
  128. requestDeserializer: Deserializer,
  129. responseSerializer: Serializer,
  130. interceptors: [ServerInterceptor<Request, Response>],
  131. wrapping bidirectional: @escaping @Sendable (
  132. GRPCAsyncRequestStream<Request>,
  133. GRPCAsyncResponseStreamWriter<Response>,
  134. GRPCAsyncServerCallContext
  135. ) async throws -> Void
  136. ) {
  137. self._handler = .init(
  138. context: context,
  139. requestDeserializer: requestDeserializer,
  140. responseSerializer: responseSerializer,
  141. callType: .bidirectionalStreaming,
  142. interceptors: interceptors,
  143. userHandler: bidirectional
  144. )
  145. }
  146. }
  147. // MARK: - Server Handler
  148. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  149. @usableFromInline
  150. internal final class AsyncServerHandler<
  151. Serializer: MessageSerializer,
  152. Deserializer: MessageDeserializer,
  153. Request: Sendable,
  154. Response: Sendable
  155. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  156. /// A response serializer.
  157. @usableFromInline
  158. internal let serializer: Serializer
  159. /// A request deserializer.
  160. @usableFromInline
  161. internal let deserializer: Deserializer
  162. /// The event loop that this handler executes on.
  163. @usableFromInline
  164. internal let eventLoop: EventLoop
  165. /// A `ByteBuffer` allocator provided by the underlying `Channel`.
  166. @usableFromInline
  167. internal let allocator: ByteBufferAllocator
  168. /// A user-provided error delegate which, if provided, is used to transform errors and potentially
  169. /// pack errors into trailers.
  170. @usableFromInline
  171. internal let errorDelegate: ServerErrorDelegate?
  172. /// A logger.
  173. @usableFromInline
  174. internal let logger: Logger
  175. /// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
  176. /// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
  177. /// an appropriate event loop.
  178. @usableFromInline
  179. internal let userInfoRef: Ref<UserInfo>
  180. /// Whether compression is enabled on the server and an algorithm has been negotiated with
  181. /// the client
  182. @usableFromInline
  183. internal let compressionEnabledOnRPC: Bool
  184. /// Whether the RPC method would like to compress responses (if possible). Defaults to true.
  185. @usableFromInline
  186. internal var compressResponsesIfPossible: Bool
  187. /// The interceptor pipeline does not track flushing as a separate event. The flush decision is
  188. /// included with metadata alongside each message. For the status and trailers the flush is
  189. /// implicit. For headers we track whether to flush here.
  190. ///
  191. /// In most cases the flush will be delayed until the first message is flushed and this will
  192. /// remain unset. However, this may be set when the server handler
  193. /// uses ``GRPCAsyncServerCallContext/sendHeaders(_:)``.
  194. @usableFromInline
  195. internal var flushNextHeaders: Bool
  196. /// A state machine for the interceptor pipeline.
  197. @usableFromInline
  198. internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine
  199. /// The interceptor pipeline.
  200. @usableFromInline
  201. internal private(set) var interceptors: Optional<ServerInterceptorPipeline<Request, Response>>
  202. /// An object for writing intercepted responses to the channel.
  203. @usableFromInline
  204. internal private(set) var responseWriter: Optional<GRPCServerResponseWriter>
  205. /// A state machine for the user implemented function.
  206. @usableFromInline
  207. internal private(set) var handlerStateMachine: ServerHandlerStateMachine
  208. /// A bag of components used by the user handler.
  209. @usableFromInline
  210. internal private(set) var handlerComponents:
  211. Optional<
  212. ServerHandlerComponents<
  213. Request,
  214. Response,
  215. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  216. >
  217. >
  218. /// The user provided function to execute.
  219. @usableFromInline
  220. internal let userHandler:
  221. @Sendable (
  222. GRPCAsyncRequestStream<Request>,
  223. GRPCAsyncResponseStreamWriter<Response>,
  224. GRPCAsyncServerCallContext
  225. ) async throws -> Void
  226. @usableFromInline
  227. internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
  228. Request,
  229. Error,
  230. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  231. GRPCAsyncSequenceProducerDelegate
  232. >
  233. @inlinable
  234. internal init(
  235. context: CallHandlerContext,
  236. requestDeserializer: Deserializer,
  237. responseSerializer: Serializer,
  238. callType: GRPCCallType,
  239. interceptors: [ServerInterceptor<Request, Response>],
  240. userHandler: @escaping @Sendable (
  241. GRPCAsyncRequestStream<Request>,
  242. GRPCAsyncResponseStreamWriter<Response>,
  243. GRPCAsyncServerCallContext
  244. ) async throws -> Void
  245. ) {
  246. self.serializer = responseSerializer
  247. self.deserializer = requestDeserializer
  248. self.eventLoop = context.eventLoop
  249. self.allocator = context.allocator
  250. self.responseWriter = context.responseWriter
  251. self.errorDelegate = context.errorDelegate
  252. self.compressionEnabledOnRPC = context.encoding.isEnabled
  253. self.compressResponsesIfPossible = true
  254. self.flushNextHeaders = false
  255. self.logger = context.logger
  256. self.userInfoRef = Ref(UserInfo())
  257. self.handlerStateMachine = .init()
  258. self.handlerComponents = nil
  259. self.userHandler = userHandler
  260. self.interceptorStateMachine = .init()
  261. self.interceptors = nil
  262. self.interceptors = ServerInterceptorPipeline(
  263. logger: context.logger,
  264. eventLoop: context.eventLoop,
  265. path: context.path,
  266. callType: callType,
  267. remoteAddress: context.remoteAddress,
  268. userInfoRef: self.userInfoRef,
  269. closeFuture: context.closeFuture,
  270. interceptors: interceptors,
  271. onRequestPart: self.receiveInterceptedPart(_:),
  272. onResponsePart: self.sendInterceptedPart(_:promise:)
  273. )
  274. }
  275. // MARK: - GRPCServerHandlerProtocol conformance
  276. @inlinable
  277. internal func receiveMetadata(_ headers: HPACKHeaders) {
  278. switch self.interceptorStateMachine.interceptRequestMetadata() {
  279. case .intercept:
  280. self.interceptors?.receive(.metadata(headers))
  281. case .cancel:
  282. self.cancel(error: nil)
  283. case .drop:
  284. ()
  285. }
  286. }
  287. @inlinable
  288. internal func receiveMessage(_ bytes: ByteBuffer) {
  289. let request: Request
  290. do {
  291. request = try self.deserializer.deserialize(byteBuffer: bytes)
  292. } catch {
  293. return self.cancel(error: error)
  294. }
  295. switch self.interceptorStateMachine.interceptRequestMessage() {
  296. case .intercept:
  297. self.interceptors?.receive(.message(request))
  298. case .cancel:
  299. self.cancel(error: nil)
  300. case .drop:
  301. ()
  302. }
  303. }
  304. @inlinable
  305. internal func receiveEnd() {
  306. switch self.interceptorStateMachine.interceptRequestEnd() {
  307. case .intercept:
  308. self.interceptors?.receive(.end)
  309. case .cancel:
  310. self.cancel(error: nil)
  311. case .drop:
  312. ()
  313. }
  314. }
  315. @inlinable
  316. internal func receiveError(_ error: Error) {
  317. self.cancel(error: error)
  318. }
  319. @inlinable
  320. internal func finish() {
  321. self.cancel(error: nil)
  322. }
  323. @usableFromInline
  324. internal func cancel(error: Error?) {
  325. self.eventLoop.assertInEventLoop()
  326. switch self.handlerStateMachine.cancel() {
  327. case .cancelAndNilOutHandlerComponents:
  328. // Cancel handler related things (task, response writer).
  329. self.handlerComponents?.cancel()
  330. self.handlerComponents = nil
  331. // We don't distinguish between having sent the status or not; we just tell the interceptor
  332. // state machine that we want to send a response status. It will inform us whether to
  333. // generate and send one or not.
  334. switch self.interceptorStateMachine.interceptedResponseStatus() {
  335. case .forward:
  336. let error = error ?? GRPCStatus.processingError
  337. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  338. error,
  339. delegate: self.errorDelegate
  340. )
  341. self.responseWriter?.sendEnd(status: status, trailers: trailers, promise: nil)
  342. case .drop, .cancel:
  343. ()
  344. }
  345. case .none:
  346. ()
  347. }
  348. switch self.interceptorStateMachine.cancel() {
  349. case .sendStatusThenNilOutInterceptorPipeline:
  350. self.responseWriter?.sendEnd(status: .processingError, trailers: [:], promise: nil)
  351. fallthrough
  352. case .nilOutInterceptorPipeline:
  353. self.interceptors = nil
  354. self.responseWriter = nil
  355. case .none:
  356. ()
  357. }
  358. }
  359. // MARK: - Interceptors to User Function
  360. @inlinable
  361. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  362. switch part {
  363. case let .metadata(headers):
  364. self.receiveInterceptedMetadata(headers)
  365. case let .message(message):
  366. self.receiveInterceptedMessage(message)
  367. case .end:
  368. self.receiveInterceptedEnd()
  369. }
  370. }
  371. @inlinable
  372. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  373. switch self.interceptorStateMachine.interceptedRequestMetadata() {
  374. case .forward:
  375. () // continue
  376. case .cancel:
  377. return self.cancel(error: nil)
  378. case .drop:
  379. return
  380. }
  381. switch self.handlerStateMachine.handleMetadata() {
  382. case .invokeHandler:
  383. // We're going to invoke the handler. We need to create a handful of things in order to do
  384. // that:
  385. //
  386. // - A context which allows the handler to set response headers/trailers and provides them
  387. // with a logger amongst other things.
  388. // - A request source; we push request messages into this which the handler consumes via
  389. // an async sequence.
  390. // - An async writer and delegate. The delegate calls us back with responses. The writer is
  391. // passed to the handler.
  392. //
  393. // All of these components are held in a bundle ("handler components") outside of the state
  394. // machine. We release these when we eventually call cancel (either when we `self.cancel()`
  395. // as a result of an error or when `self.finish()` is called).
  396. let handlerContext = GRPCAsyncServerCallContext(
  397. headers: headers,
  398. logger: self.logger,
  399. contextProvider: self
  400. )
  401. let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
  402. lowWatermark: 10,
  403. highWatermark: 50
  404. )
  405. let requestSequenceProducer = NIOThrowingAsyncSequenceProducer.makeSequence(
  406. elementType: Request.self,
  407. failureType: Error.self,
  408. backPressureStrategy: backpressureStrategy,
  409. delegate: GRPCAsyncSequenceProducerDelegate()
  410. )
  411. let responseWriter = NIOAsyncWriter.makeWriter(
  412. isWritable: true,
  413. delegate: GRPCAsyncWriterSinkDelegate<(Response, Compression)>(
  414. didYield: self.interceptResponseMessages,
  415. didTerminate: { error in
  416. self.interceptTermination(error)
  417. }
  418. )
  419. )
  420. // Update our state before invoke the handler.
  421. self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
  422. self.handlerComponents = ServerHandlerComponents<
  423. Request,
  424. Response,
  425. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  426. >(
  427. requestSource: requestSequenceProducer.source,
  428. responseWriterSink: responseWriter.sink,
  429. task: Task {
  430. // We don't have a task cancellation handler here: we do it in `self.cancel()`.
  431. await self.invokeUserHandler(
  432. requestSequence: requestSequenceProducer,
  433. responseWriter: responseWriter.writer,
  434. callContext: handlerContext
  435. )
  436. }
  437. )
  438. case .cancel:
  439. self.cancel(error: nil)
  440. }
  441. }
  442. @Sendable
  443. @usableFromInline
  444. internal func invokeUserHandler(
  445. requestSequence: AsyncSequenceProducer.NewSequence,
  446. responseWriter: NIOAsyncWriter<
  447. (Response, Compression),
  448. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  449. >,
  450. callContext: GRPCAsyncServerCallContext
  451. ) async {
  452. defer {
  453. // It's possible the user handler completed before the end of the request stream. We
  454. // explicitly finish it to drop any unconsumed inbound messages.
  455. requestSequence.source.finish()
  456. }
  457. do {
  458. let grpcRequestStream = GRPCAsyncRequestStream(requestSequence.sequence)
  459. let grpcResponseStreamWriter = GRPCAsyncResponseStreamWriter(wrapping: responseWriter)
  460. try await self.userHandler(grpcRequestStream, grpcResponseStreamWriter, callContext)
  461. responseWriter.finish()
  462. } catch {
  463. responseWriter.finish(error: error)
  464. }
  465. }
  466. @inlinable
  467. internal func receiveInterceptedMessage(_ request: Request) {
  468. switch self.interceptorStateMachine.interceptedRequestMessage() {
  469. case .forward:
  470. switch self.handlerStateMachine.handleMessage() {
  471. case .forward:
  472. _ = self.handlerComponents?.requestSource.yield(request)
  473. case .cancel:
  474. self.cancel(error: nil)
  475. }
  476. case .cancel:
  477. self.cancel(error: nil)
  478. case .drop:
  479. ()
  480. }
  481. }
  482. @inlinable
  483. internal func receiveInterceptedEnd() {
  484. switch self.interceptorStateMachine.interceptedRequestEnd() {
  485. case .forward:
  486. switch self.handlerStateMachine.handleEnd() {
  487. case .forward:
  488. self.handlerComponents?.requestSource.finish()
  489. case .cancel:
  490. self.cancel(error: nil)
  491. }
  492. case .cancel:
  493. self.cancel(error: nil)
  494. case .drop:
  495. ()
  496. }
  497. }
  498. // MARK: - User Function To Interceptors
  499. @inlinable
  500. internal func _interceptResponseMessage(_ response: Response, compression: Compression) {
  501. self.eventLoop.assertInEventLoop()
  502. switch self.handlerStateMachine.sendMessage() {
  503. case let .intercept(.some(headers)):
  504. switch self.interceptorStateMachine.interceptResponseMetadata() {
  505. case .intercept:
  506. self.interceptors?.send(.metadata(headers), promise: nil)
  507. case .cancel:
  508. return self.cancel(error: nil)
  509. case .drop:
  510. ()
  511. }
  512. // Fall through to the next case to send the response message.
  513. fallthrough
  514. case .intercept(.none):
  515. switch self.interceptorStateMachine.interceptResponseMessage() {
  516. case .intercept:
  517. let senderWantsCompression = compression.isEnabled(
  518. callDefault: self.compressResponsesIfPossible
  519. )
  520. let compress = self.compressionEnabledOnRPC && senderWantsCompression
  521. let metadata = MessageMetadata(compress: compress, flush: true)
  522. self.interceptors?.send(.message(response, metadata), promise: nil)
  523. case .cancel:
  524. return self.cancel(error: nil)
  525. case .drop:
  526. ()
  527. }
  528. case .drop:
  529. ()
  530. }
  531. }
  532. @Sendable
  533. @inlinable
  534. internal func interceptResponseMessages(_ messages: Deque<(Response, Compression)>) {
  535. if self.eventLoop.inEventLoop {
  536. for message in messages {
  537. self._interceptResponseMessage(message.0, compression: message.1)
  538. }
  539. } else {
  540. self.eventLoop.execute {
  541. for message in messages {
  542. self._interceptResponseMessage(message.0, compression: message.1)
  543. }
  544. }
  545. }
  546. }
  547. @inlinable
  548. internal func _interceptTermination(_ error: Error?) {
  549. self.eventLoop.assertInEventLoop()
  550. let processedError: Error?
  551. if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
  552. processedError = GRPCStatus(
  553. code: .unknown,
  554. message: "Handler threw error with status code 'ok'."
  555. )
  556. } else {
  557. processedError = error
  558. }
  559. switch self.handlerStateMachine.sendStatus() {
  560. case let .intercept(requestHeaders, trailers):
  561. let status: GRPCStatus
  562. let processedTrailers: HPACKHeaders
  563. if let processedError = processedError {
  564. (status, processedTrailers) = ServerErrorProcessor.processObserverError(
  565. processedError,
  566. headers: requestHeaders,
  567. trailers: trailers,
  568. delegate: self.errorDelegate
  569. )
  570. } else {
  571. status = GRPCStatus.ok
  572. processedTrailers = trailers
  573. }
  574. switch self.interceptorStateMachine.interceptResponseStatus() {
  575. case .intercept:
  576. self.interceptors?.send(.end(status, processedTrailers), promise: nil)
  577. case .cancel:
  578. return self.cancel(error: nil)
  579. case .drop:
  580. ()
  581. }
  582. case .drop:
  583. ()
  584. }
  585. }
  586. @Sendable
  587. @inlinable
  588. internal func interceptTermination(_ status: Error?) {
  589. if self.eventLoop.inEventLoop {
  590. self._interceptTermination(status)
  591. } else {
  592. self.eventLoop.execute {
  593. self._interceptTermination(status)
  594. }
  595. }
  596. }
  597. @inlinable
  598. internal func sendInterceptedPart(
  599. _ part: GRPCServerResponsePart<Response>,
  600. promise: EventLoopPromise<Void>?
  601. ) {
  602. switch part {
  603. case let .metadata(headers):
  604. self.sendInterceptedMetadata(headers, promise: promise)
  605. case let .message(message, metadata):
  606. do {
  607. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  608. self.sendInterceptedResponse(bytes, metadata: metadata, promise: promise)
  609. } catch {
  610. promise?.fail(error)
  611. self.cancel(error: error)
  612. }
  613. case let .end(status, trailers):
  614. self.sendInterceptedStatus(status, metadata: trailers, promise: promise)
  615. }
  616. }
  617. @inlinable
  618. internal func sendInterceptedMetadata(
  619. _ metadata: HPACKHeaders,
  620. promise: EventLoopPromise<Void>?
  621. ) {
  622. switch self.interceptorStateMachine.interceptedResponseMetadata() {
  623. case .forward:
  624. if let responseWriter = self.responseWriter {
  625. let flush = self.flushNextHeaders
  626. self.flushNextHeaders = false
  627. responseWriter.sendMetadata(metadata, flush: flush, promise: promise)
  628. } else if let promise = promise {
  629. promise.fail(GRPCStatus.processingError)
  630. }
  631. case .cancel:
  632. self.cancel(error: nil)
  633. case .drop:
  634. ()
  635. }
  636. }
  637. @inlinable
  638. internal func sendInterceptedResponse(
  639. _ bytes: ByteBuffer,
  640. metadata: MessageMetadata,
  641. promise: EventLoopPromise<Void>?
  642. ) {
  643. switch self.interceptorStateMachine.interceptedResponseMessage() {
  644. case .forward:
  645. if let responseWriter = self.responseWriter {
  646. responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  647. } else if let promise = promise {
  648. promise.fail(GRPCStatus.processingError)
  649. }
  650. case .cancel:
  651. self.cancel(error: nil)
  652. case .drop:
  653. ()
  654. }
  655. }
  656. @inlinable
  657. internal func sendInterceptedStatus(
  658. _ status: GRPCStatus,
  659. metadata: HPACKHeaders,
  660. promise: EventLoopPromise<Void>?
  661. ) {
  662. switch self.interceptorStateMachine.interceptedResponseStatus() {
  663. case .forward:
  664. if let responseWriter = self.responseWriter {
  665. responseWriter.sendEnd(status: status, trailers: metadata, promise: promise)
  666. } else if let promise = promise {
  667. promise.fail(GRPCStatus.processingError)
  668. }
  669. case .cancel:
  670. self.cancel(error: nil)
  671. case .drop:
  672. ()
  673. }
  674. }
  675. }
  676. // Sendability is unchecked as all mutable state is accessed/modified from an appropriate event
  677. // loop.
  678. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  679. extension AsyncServerHandler: @unchecked Sendable {}
  680. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  681. extension AsyncServerHandler: AsyncServerCallContextProvider {
  682. @usableFromInline
  683. internal func setResponseHeaders(_ headers: HPACKHeaders) async throws {
  684. let completed = self.eventLoop.submit {
  685. if !self.handlerStateMachine.setResponseHeaders(headers) {
  686. throw GRPCStatus(
  687. code: .failedPrecondition,
  688. message: "Tried to send response headers in an invalid state"
  689. )
  690. }
  691. }
  692. try await completed.get()
  693. }
  694. @usableFromInline
  695. internal func acceptRPC(_ headers: HPACKHeaders) async {
  696. let completed = self.eventLoop.submit {
  697. guard self.handlerStateMachine.setResponseHeaders(headers) else { return }
  698. // Shh,it's a lie! We don't really have a message to send but the state machine doesn't know
  699. // (or care) about that. It will, however, tell us if we can send the headers or not.
  700. switch self.handlerStateMachine.sendMessage() {
  701. case let .intercept(.some(headers)):
  702. switch self.interceptorStateMachine.interceptResponseMetadata() {
  703. case .intercept:
  704. self.flushNextHeaders = true
  705. self.interceptors?.send(.metadata(headers), promise: nil)
  706. case .cancel:
  707. return self.cancel(error: nil)
  708. case .drop:
  709. ()
  710. }
  711. case .intercept(.none), .drop:
  712. // intercept(.none) means headers have already been sent; we should never hit this because
  713. // we guard on setting the response headers above.
  714. ()
  715. }
  716. }
  717. try? await completed.get()
  718. }
  719. @usableFromInline
  720. internal func setResponseTrailers(_ headers: HPACKHeaders) async throws {
  721. let completed = self.eventLoop.submit {
  722. self.handlerStateMachine.setResponseTrailers(headers)
  723. }
  724. try await completed.get()
  725. }
  726. @usableFromInline
  727. internal func setResponseCompression(_ enabled: Bool) async throws {
  728. let completed = self.eventLoop.submit {
  729. self.compressResponsesIfPossible = enabled
  730. }
  731. try await completed.get()
  732. }
  733. @usableFromInline
  734. func withUserInfo<Result: Sendable>(
  735. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  736. ) async throws -> Result {
  737. let result = self.eventLoop.submit {
  738. try modify(self.userInfoRef.value)
  739. }
  740. return try await result.get()
  741. }
  742. @usableFromInline
  743. func withMutableUserInfo<Result: Sendable>(
  744. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  745. ) async throws -> Result {
  746. let result = self.eventLoop.submit {
  747. try modify(&self.userInfoRef.value)
  748. }
  749. return try await result.get()
  750. }
  751. }
  752. /// This protocol exists so that the generic server handler can be erased from the
  753. /// `GRPCAsyncServerCallContext`.
  754. ///
  755. /// It provides methods which update context on the async handler by first executing onto the
  756. /// correct event loop.
  757. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  758. @usableFromInline
  759. protocol AsyncServerCallContextProvider: Sendable {
  760. func setResponseHeaders(_ headers: HPACKHeaders) async throws
  761. func acceptRPC(_ headers: HPACKHeaders) async
  762. func setResponseTrailers(_ trailers: HPACKHeaders) async throws
  763. func setResponseCompression(_ enabled: Bool) async throws
  764. func withUserInfo<Result: Sendable>(
  765. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  766. ) async throws -> Result
  767. func withMutableUserInfo<Result: Sendable>(
  768. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  769. ) async throws -> Result
  770. }
  771. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  772. @usableFromInline
  773. internal struct ServerHandlerComponents<
  774. Request: Sendable,
  775. Response: Sendable,
  776. Delegate: NIOAsyncWriterSinkDelegate
  777. > where Delegate.Element == (Response, Compression) {
  778. @usableFromInline
  779. internal typealias AsyncWriterSink = NIOAsyncWriter<(Response, Compression), Delegate>.Sink
  780. @usableFromInline
  781. internal typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer<
  782. Request,
  783. Error,
  784. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  785. GRPCAsyncSequenceProducerDelegate
  786. >.Source
  787. @usableFromInline
  788. internal let task: Task<Void, Never>
  789. @usableFromInline
  790. internal let responseWriterSink: AsyncWriterSink
  791. @usableFromInline
  792. internal let requestSource: AsyncSequenceSource
  793. @inlinable
  794. init(
  795. requestSource: AsyncSequenceSource,
  796. responseWriterSink: AsyncWriterSink,
  797. task: Task<Void, Never>
  798. ) {
  799. self.task = task
  800. self.responseWriterSink = responseWriterSink
  801. self.requestSource = requestSource
  802. }
  803. func cancel() {
  804. // Cancel the request and response streams.
  805. //
  806. // The user handler is encouraged to check for cancellation, however, we should assume
  807. // they do not. Finishing the request source stops any more requests from being delivered
  808. // to the request stream, and finishing the writer sink will ensure no more responses are
  809. // written. This should reduce how long the user handler runs for as it can no longer do
  810. // anything useful.
  811. self.requestSource.finish()
  812. self.responseWriterSink.finish(error: CancellationError())
  813. self.task.cancel()
  814. }
  815. }