GRPCAsyncServerHandler.swift 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import DequeModule
  18. import Logging
  19. import NIOCore
  20. import NIOHPACK
  21. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  22. public struct GRPCAsyncServerHandler<
  23. Serializer: MessageSerializer,
  24. Deserializer: MessageDeserializer,
  25. Request: Sendable,
  26. Response: Sendable
  27. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  28. @usableFromInline
  29. internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
  30. public func receiveMetadata(_ metadata: HPACKHeaders) {
  31. self._handler.receiveMetadata(metadata)
  32. }
  33. public func receiveMessage(_ bytes: ByteBuffer) {
  34. self._handler.receiveMessage(bytes)
  35. }
  36. public func receiveEnd() {
  37. self._handler.receiveEnd()
  38. }
  39. public func receiveError(_ error: Error) {
  40. self._handler.receiveError(error)
  41. }
  42. public func finish() {
  43. self._handler.finish()
  44. }
  45. }
  46. // MARK: - RPC Adapters
  47. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  48. extension GRPCAsyncServerHandler {
  49. public typealias Request = Deserializer.Output
  50. public typealias Response = Serializer.Input
  51. @inlinable
  52. public init(
  53. context: CallHandlerContext,
  54. requestDeserializer: Deserializer,
  55. responseSerializer: Serializer,
  56. interceptors: [ServerInterceptor<Request, Response>],
  57. wrapping unary: @escaping @Sendable (Request, GRPCAsyncServerCallContext) async throws
  58. -> Response
  59. ) {
  60. self._handler = .init(
  61. context: context,
  62. requestDeserializer: requestDeserializer,
  63. responseSerializer: responseSerializer,
  64. callType: .unary,
  65. interceptors: interceptors,
  66. userHandler: { requestStream, responseStreamWriter, context in
  67. var iterator = requestStream.makeAsyncIterator()
  68. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  69. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  70. }
  71. let response = try await unary(request, context)
  72. try await responseStreamWriter.send(response)
  73. }
  74. )
  75. }
  76. @inlinable
  77. public init(
  78. context: CallHandlerContext,
  79. requestDeserializer: Deserializer,
  80. responseSerializer: Serializer,
  81. interceptors: [ServerInterceptor<Request, Response>],
  82. wrapping clientStreaming: @escaping @Sendable (
  83. GRPCAsyncRequestStream<Request>,
  84. GRPCAsyncServerCallContext
  85. ) async throws -> Response
  86. ) {
  87. self._handler = .init(
  88. context: context,
  89. requestDeserializer: requestDeserializer,
  90. responseSerializer: responseSerializer,
  91. callType: .clientStreaming,
  92. interceptors: interceptors,
  93. userHandler: { requestStream, responseStreamWriter, context in
  94. let response = try await clientStreaming(requestStream, context)
  95. try await responseStreamWriter.send(response)
  96. }
  97. )
  98. }
  99. @inlinable
  100. public init(
  101. context: CallHandlerContext,
  102. requestDeserializer: Deserializer,
  103. responseSerializer: Serializer,
  104. interceptors: [ServerInterceptor<Request, Response>],
  105. wrapping serverStreaming: @escaping @Sendable (
  106. Request,
  107. GRPCAsyncResponseStreamWriter<Response>,
  108. GRPCAsyncServerCallContext
  109. ) async throws -> Void
  110. ) {
  111. self._handler = .init(
  112. context: context,
  113. requestDeserializer: requestDeserializer,
  114. responseSerializer: responseSerializer,
  115. callType: .serverStreaming,
  116. interceptors: interceptors,
  117. userHandler: { requestStream, responseStreamWriter, context in
  118. var iterator = requestStream.makeAsyncIterator()
  119. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  120. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  121. }
  122. try await serverStreaming(request, responseStreamWriter, context)
  123. }
  124. )
  125. }
  126. @inlinable
  127. public init(
  128. context: CallHandlerContext,
  129. requestDeserializer: Deserializer,
  130. responseSerializer: Serializer,
  131. interceptors: [ServerInterceptor<Request, Response>],
  132. wrapping bidirectional: @escaping @Sendable (
  133. GRPCAsyncRequestStream<Request>,
  134. GRPCAsyncResponseStreamWriter<Response>,
  135. GRPCAsyncServerCallContext
  136. ) async throws -> Void
  137. ) {
  138. self._handler = .init(
  139. context: context,
  140. requestDeserializer: requestDeserializer,
  141. responseSerializer: responseSerializer,
  142. callType: .bidirectionalStreaming,
  143. interceptors: interceptors,
  144. userHandler: bidirectional
  145. )
  146. }
  147. }
  148. // MARK: - Server Handler
  149. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  150. @usableFromInline
  151. internal final class AsyncServerHandler<
  152. Serializer: MessageSerializer,
  153. Deserializer: MessageDeserializer,
  154. Request: Sendable,
  155. Response: Sendable
  156. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  157. /// A response serializer.
  158. @usableFromInline
  159. internal let serializer: Serializer
  160. /// A request deserializer.
  161. @usableFromInline
  162. internal let deserializer: Deserializer
  163. /// The event loop that this handler executes on.
  164. @usableFromInline
  165. internal let eventLoop: EventLoop
  166. /// A `ByteBuffer` allocator provided by the underlying `Channel`.
  167. @usableFromInline
  168. internal let allocator: ByteBufferAllocator
  169. /// A user-provided error delegate which, if provided, is used to transform errors and potentially
  170. /// pack errors into trailers.
  171. @usableFromInline
  172. internal let errorDelegate: ServerErrorDelegate?
  173. /// A logger.
  174. @usableFromInline
  175. internal let logger: Logger
  176. /// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
  177. /// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
  178. /// an appropriate event loop.
  179. @usableFromInline
  180. internal let userInfoRef: Ref<UserInfo>
  181. /// Whether compression is enabled on the server and an algorithm has been negotiated with
  182. /// the client
  183. @usableFromInline
  184. internal let compressionEnabledOnRPC: Bool
  185. /// Whether the RPC method would like to compress responses (if possible). Defaults to true.
  186. @usableFromInline
  187. internal var compressResponsesIfPossible: Bool
  188. /// A state machine for the interceptor pipeline.
  189. @usableFromInline
  190. internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine
  191. /// The interceptor pipeline.
  192. @usableFromInline
  193. internal private(set) var interceptors: Optional<ServerInterceptorPipeline<Request, Response>>
  194. /// An object for writing intercepted responses to the channel.
  195. @usableFromInline
  196. internal private(set) var responseWriter: Optional<GRPCServerResponseWriter>
  197. /// A state machine for the user implemented function.
  198. @usableFromInline
  199. internal private(set) var handlerStateMachine: ServerHandlerStateMachine
  200. /// A bag of components used by the user handler.
  201. @usableFromInline
  202. internal private(set) var handlerComponents: Optional<ServerHandlerComponents<
  203. Request,
  204. Response,
  205. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  206. >>
  207. /// The user provided function to execute.
  208. @usableFromInline
  209. internal let userHandler: @Sendable (
  210. GRPCAsyncRequestStream<Request>,
  211. GRPCAsyncResponseStreamWriter<Response>,
  212. GRPCAsyncServerCallContext
  213. ) async throws -> Void
  214. @usableFromInline
  215. internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
  216. Request,
  217. Error,
  218. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  219. GRPCAsyncSequenceProducerDelegate
  220. >
  221. @inlinable
  222. internal init(
  223. context: CallHandlerContext,
  224. requestDeserializer: Deserializer,
  225. responseSerializer: Serializer,
  226. callType: GRPCCallType,
  227. interceptors: [ServerInterceptor<Request, Response>],
  228. userHandler: @escaping @Sendable (
  229. GRPCAsyncRequestStream<Request>,
  230. GRPCAsyncResponseStreamWriter<Response>,
  231. GRPCAsyncServerCallContext
  232. ) async throws -> Void
  233. ) {
  234. self.serializer = responseSerializer
  235. self.deserializer = requestDeserializer
  236. self.eventLoop = context.eventLoop
  237. self.allocator = context.allocator
  238. self.responseWriter = context.responseWriter
  239. self.errorDelegate = context.errorDelegate
  240. self.compressionEnabledOnRPC = context.encoding.isEnabled
  241. self.compressResponsesIfPossible = true
  242. self.logger = context.logger
  243. self.userInfoRef = Ref(UserInfo())
  244. self.handlerStateMachine = .init()
  245. self.handlerComponents = nil
  246. self.userHandler = userHandler
  247. self.interceptorStateMachine = .init()
  248. self.interceptors = nil
  249. self.interceptors = ServerInterceptorPipeline(
  250. logger: context.logger,
  251. eventLoop: context.eventLoop,
  252. path: context.path,
  253. callType: callType,
  254. remoteAddress: context.remoteAddress,
  255. userInfoRef: self.userInfoRef,
  256. closeFuture: context.closeFuture,
  257. interceptors: interceptors,
  258. onRequestPart: self.receiveInterceptedPart(_:),
  259. onResponsePart: self.sendInterceptedPart(_:promise:)
  260. )
  261. }
  262. // MARK: - GRPCServerHandlerProtocol conformance
  263. @inlinable
  264. internal func receiveMetadata(_ headers: HPACKHeaders) {
  265. switch self.interceptorStateMachine.interceptRequestMetadata() {
  266. case .intercept:
  267. self.interceptors?.receive(.metadata(headers))
  268. case .cancel:
  269. self.cancel(error: nil)
  270. case .drop:
  271. ()
  272. }
  273. }
  274. @inlinable
  275. internal func receiveMessage(_ bytes: ByteBuffer) {
  276. let request: Request
  277. do {
  278. request = try self.deserializer.deserialize(byteBuffer: bytes)
  279. } catch {
  280. return self.cancel(error: error)
  281. }
  282. switch self.interceptorStateMachine.interceptRequestMessage() {
  283. case .intercept:
  284. self.interceptors?.receive(.message(request))
  285. case .cancel:
  286. self.cancel(error: nil)
  287. case .drop:
  288. ()
  289. }
  290. }
  291. @inlinable
  292. internal func receiveEnd() {
  293. switch self.interceptorStateMachine.interceptRequestEnd() {
  294. case .intercept:
  295. self.interceptors?.receive(.end)
  296. case .cancel:
  297. self.cancel(error: nil)
  298. case .drop:
  299. ()
  300. }
  301. }
  302. @inlinable
  303. internal func receiveError(_ error: Error) {
  304. self.cancel(error: error)
  305. }
  306. @inlinable
  307. internal func finish() {
  308. self.cancel(error: nil)
  309. }
  310. @usableFromInline
  311. internal func cancel(error: Error?) {
  312. self.eventLoop.assertInEventLoop()
  313. switch self.handlerStateMachine.cancel() {
  314. case .cancelAndNilOutHandlerComponents:
  315. // Cancel handler related things (task, response writer).
  316. self.handlerComponents?.cancel()
  317. self.handlerComponents = nil
  318. // We don't distinguish between having sent the status or not; we just tell the interceptor
  319. // state machine that we want to send a response status. It will inform us whether to
  320. // generate and send one or not.
  321. switch self.interceptorStateMachine.interceptedResponseStatus() {
  322. case .forward:
  323. let error = error ?? GRPCStatus.processingError
  324. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  325. error,
  326. delegate: self.errorDelegate
  327. )
  328. self.responseWriter?.sendEnd(status: status, trailers: trailers, promise: nil)
  329. case .drop, .cancel:
  330. ()
  331. }
  332. case .none:
  333. ()
  334. }
  335. switch self.interceptorStateMachine.cancel() {
  336. case .sendStatusThenNilOutInterceptorPipeline:
  337. self.responseWriter?.sendEnd(status: .processingError, trailers: [:], promise: nil)
  338. fallthrough
  339. case .nilOutInterceptorPipeline:
  340. self.interceptors = nil
  341. self.responseWriter = nil
  342. case .none:
  343. ()
  344. }
  345. }
  346. // MARK: - Interceptors to User Function
  347. @inlinable
  348. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  349. switch part {
  350. case let .metadata(headers):
  351. self.receiveInterceptedMetadata(headers)
  352. case let .message(message):
  353. self.receiveInterceptedMessage(message)
  354. case .end:
  355. self.receiveInterceptedEnd()
  356. }
  357. }
  358. @inlinable
  359. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  360. switch self.interceptorStateMachine.interceptedRequestMetadata() {
  361. case .forward:
  362. () // continue
  363. case .cancel:
  364. return self.cancel(error: nil)
  365. case .drop:
  366. return
  367. }
  368. switch self.handlerStateMachine.handleMetadata() {
  369. case .invokeHandler:
  370. // We're going to invoke the handler. We need to create a handful of things in order to do
  371. // that:
  372. //
  373. // - A context which allows the handler to set response headers/trailers and provides them
  374. // with a logger amongst other things.
  375. // - A request source; we push request messages into this which the handler consumes via
  376. // an async sequence.
  377. // - An async writer and delegate. The delegate calls us back with responses. The writer is
  378. // passed to the handler.
  379. //
  380. // All of these components are held in a bundle ("handler components") outside of the state
  381. // machine. We release these when we eventually call cancel (either when we `self.cancel()`
  382. // as a result of an error or when `self.finish()` is called).
  383. let handlerContext = GRPCAsyncServerCallContext(
  384. headers: headers,
  385. logger: self.logger,
  386. contextProvider: self
  387. )
  388. let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
  389. lowWatermark: 10,
  390. highWatermark: 50
  391. )
  392. let requestSequenceProducer = NIOThrowingAsyncSequenceProducer.makeSequence(
  393. elementType: Request.self,
  394. failureType: Error.self,
  395. backPressureStrategy: backpressureStrategy,
  396. delegate: GRPCAsyncSequenceProducerDelegate()
  397. )
  398. let responseWriter = NIOAsyncWriter.makeWriter(
  399. isWritable: true,
  400. delegate: GRPCAsyncWriterSinkDelegate<(Response, Compression)>(
  401. didYield: self.interceptResponseMessages,
  402. didTerminate: { error in
  403. self.interceptTermination(error)
  404. }
  405. )
  406. )
  407. // Update our state before invoke the handler.
  408. self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
  409. self.handlerComponents = ServerHandlerComponents<
  410. Request,
  411. Response,
  412. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  413. >(
  414. requestSource: requestSequenceProducer.source,
  415. responseWriterSink: responseWriter.sink,
  416. task: Task {
  417. // We don't have a task cancellation handler here: we do it in `self.cancel()`.
  418. await self.invokeUserHandler(
  419. requestSequence: requestSequenceProducer,
  420. responseWriter: responseWriter.writer,
  421. callContext: handlerContext
  422. )
  423. }
  424. )
  425. case .cancel:
  426. self.cancel(error: nil)
  427. }
  428. }
  429. @Sendable
  430. @usableFromInline
  431. internal func invokeUserHandler(
  432. requestSequence: AsyncSequenceProducer.NewSequence,
  433. responseWriter: NIOAsyncWriter<
  434. (Response, Compression),
  435. GRPCAsyncWriterSinkDelegate<(Response, Compression)>
  436. >,
  437. callContext: GRPCAsyncServerCallContext
  438. ) async {
  439. defer {
  440. // It's possible the user handler completed before the end of the request stream. We
  441. // explicitly finish it to drop any unconsumed inbound messages.
  442. requestSequence.source.finish()
  443. }
  444. do {
  445. let grpcRequestStream = GRPCAsyncRequestStream(requestSequence.sequence)
  446. let grpcResponseStreamWriter = GRPCAsyncResponseStreamWriter(wrapping: responseWriter)
  447. try await self.userHandler(grpcRequestStream, grpcResponseStreamWriter, callContext)
  448. responseWriter.finish()
  449. } catch {
  450. responseWriter.finish(error: error)
  451. }
  452. }
  453. @inlinable
  454. internal func receiveInterceptedMessage(_ request: Request) {
  455. switch self.interceptorStateMachine.interceptedRequestMessage() {
  456. case .forward:
  457. switch self.handlerStateMachine.handleMessage() {
  458. case .forward:
  459. _ = self.handlerComponents?.requestSource.yield(request)
  460. case .cancel:
  461. self.cancel(error: nil)
  462. }
  463. case .cancel:
  464. self.cancel(error: nil)
  465. case .drop:
  466. ()
  467. }
  468. }
  469. @inlinable
  470. internal func receiveInterceptedEnd() {
  471. switch self.interceptorStateMachine.interceptedRequestEnd() {
  472. case .forward:
  473. switch self.handlerStateMachine.handleEnd() {
  474. case .forward:
  475. self.handlerComponents?.requestSource.finish()
  476. case .cancel:
  477. self.cancel(error: nil)
  478. }
  479. case .cancel:
  480. self.cancel(error: nil)
  481. case .drop:
  482. ()
  483. }
  484. }
  485. // MARK: - User Function To Interceptors
  486. @inlinable
  487. internal func _interceptResponseMessage(_ response: Response, compression: Compression) {
  488. self.eventLoop.assertInEventLoop()
  489. switch self.handlerStateMachine.sendMessage() {
  490. case let .intercept(.some(headers)):
  491. switch self.interceptorStateMachine.interceptResponseMetadata() {
  492. case .intercept:
  493. self.interceptors?.send(.metadata(headers), promise: nil)
  494. case .cancel:
  495. return self.cancel(error: nil)
  496. case .drop:
  497. ()
  498. }
  499. // Fall through to the next case to send the response message.
  500. fallthrough
  501. case .intercept(.none):
  502. switch self.interceptorStateMachine.interceptResponseMessage() {
  503. case .intercept:
  504. let senderWantsCompression = compression.isEnabled(
  505. callDefault: self.compressResponsesIfPossible
  506. )
  507. let compress = self.compressionEnabledOnRPC && senderWantsCompression
  508. let metadata = MessageMetadata(compress: compress, flush: true)
  509. self.interceptors?.send(.message(response, metadata), promise: nil)
  510. case .cancel:
  511. return self.cancel(error: nil)
  512. case .drop:
  513. ()
  514. }
  515. case .drop:
  516. ()
  517. }
  518. }
  519. @Sendable
  520. @inlinable
  521. internal func interceptResponseMessages(_ messages: Deque<(Response, Compression)>) {
  522. if self.eventLoop.inEventLoop {
  523. for message in messages {
  524. self._interceptResponseMessage(message.0, compression: message.1)
  525. }
  526. } else {
  527. self.eventLoop.execute {
  528. for message in messages {
  529. self._interceptResponseMessage(message.0, compression: message.1)
  530. }
  531. }
  532. }
  533. }
  534. @inlinable
  535. internal func _interceptTermination(_ error: Error?) {
  536. self.eventLoop.assertInEventLoop()
  537. let processedError: Error?
  538. if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
  539. processedError = GRPCStatus(
  540. code: .unknown,
  541. message: "Handler threw error with status code 'ok'."
  542. )
  543. } else {
  544. processedError = error
  545. }
  546. switch self.handlerStateMachine.sendStatus() {
  547. case let .intercept(requestHeaders, trailers):
  548. let status: GRPCStatus
  549. let processedTrailers: HPACKHeaders
  550. if let processedError = processedError {
  551. (status, processedTrailers) = ServerErrorProcessor.processObserverError(
  552. processedError,
  553. headers: requestHeaders,
  554. trailers: trailers,
  555. delegate: self.errorDelegate
  556. )
  557. } else {
  558. status = GRPCStatus.ok
  559. processedTrailers = trailers
  560. }
  561. switch self.interceptorStateMachine.interceptResponseStatus() {
  562. case .intercept:
  563. self.interceptors?.send(.end(status, processedTrailers), promise: nil)
  564. case .cancel:
  565. return self.cancel(error: nil)
  566. case .drop:
  567. ()
  568. }
  569. case .drop:
  570. ()
  571. }
  572. }
  573. @Sendable
  574. @inlinable
  575. internal func interceptTermination(_ status: Error?) {
  576. if self.eventLoop.inEventLoop {
  577. self._interceptTermination(status)
  578. } else {
  579. self.eventLoop.execute {
  580. self._interceptTermination(status)
  581. }
  582. }
  583. }
  584. @inlinable
  585. internal func sendInterceptedPart(
  586. _ part: GRPCServerResponsePart<Response>,
  587. promise: EventLoopPromise<Void>?
  588. ) {
  589. switch part {
  590. case let .metadata(headers):
  591. self.sendInterceptedMetadata(headers, promise: promise)
  592. case let .message(message, metadata):
  593. do {
  594. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  595. self.sendInterceptedResponse(bytes, metadata: metadata, promise: promise)
  596. } catch {
  597. promise?.fail(error)
  598. self.cancel(error: error)
  599. }
  600. case let .end(status, trailers):
  601. self.sendInterceptedStatus(status, metadata: trailers, promise: promise)
  602. }
  603. }
  604. @inlinable
  605. internal func sendInterceptedMetadata(
  606. _ metadata: HPACKHeaders,
  607. promise: EventLoopPromise<Void>?
  608. ) {
  609. switch self.interceptorStateMachine.interceptedResponseMetadata() {
  610. case .forward:
  611. if let responseWriter = self.responseWriter {
  612. responseWriter.sendMetadata(metadata, flush: false, promise: promise)
  613. } else if let promise = promise {
  614. promise.fail(GRPCStatus.processingError)
  615. }
  616. case .cancel:
  617. self.cancel(error: nil)
  618. case .drop:
  619. ()
  620. }
  621. }
  622. @inlinable
  623. internal func sendInterceptedResponse(
  624. _ bytes: ByteBuffer,
  625. metadata: MessageMetadata,
  626. promise: EventLoopPromise<Void>?
  627. ) {
  628. switch self.interceptorStateMachine.interceptedResponseMessage() {
  629. case .forward:
  630. if let responseWriter = self.responseWriter {
  631. responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  632. } else if let promise = promise {
  633. promise.fail(GRPCStatus.processingError)
  634. }
  635. case .cancel:
  636. self.cancel(error: nil)
  637. case .drop:
  638. ()
  639. }
  640. }
  641. @inlinable
  642. internal func sendInterceptedStatus(
  643. _ status: GRPCStatus,
  644. metadata: HPACKHeaders,
  645. promise: EventLoopPromise<Void>?
  646. ) {
  647. switch self.interceptorStateMachine.interceptedResponseStatus() {
  648. case .forward:
  649. if let responseWriter = self.responseWriter {
  650. responseWriter.sendEnd(status: status, trailers: metadata, promise: promise)
  651. } else if let promise = promise {
  652. promise.fail(GRPCStatus.processingError)
  653. }
  654. case .cancel:
  655. self.cancel(error: nil)
  656. case .drop:
  657. ()
  658. }
  659. }
  660. }
  661. // Sendability is unchecked as all mutable state is accessed/modified from an appropriate event
  662. // loop.
  663. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  664. extension AsyncServerHandler: @unchecked Sendable {}
  665. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  666. extension AsyncServerHandler: AsyncServerCallContextProvider {
  667. @usableFromInline
  668. internal func setResponseHeaders(_ headers: HPACKHeaders) async throws {
  669. let completed = self.eventLoop.submit {
  670. self.handlerStateMachine.setResponseHeaders(headers)
  671. }
  672. try await completed.get()
  673. }
  674. @usableFromInline
  675. internal func setResponseTrailers(_ headers: HPACKHeaders) async throws {
  676. let completed = self.eventLoop.submit {
  677. self.handlerStateMachine.setResponseTrailers(headers)
  678. }
  679. try await completed.get()
  680. }
  681. @usableFromInline
  682. internal func setResponseCompression(_ enabled: Bool) async throws {
  683. let completed = self.eventLoop.submit {
  684. self.compressResponsesIfPossible = enabled
  685. }
  686. try await completed.get()
  687. }
  688. @usableFromInline
  689. func withUserInfo<Result: Sendable>(
  690. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  691. ) async throws -> Result {
  692. let result = self.eventLoop.submit {
  693. try modify(self.userInfoRef.value)
  694. }
  695. return try await result.get()
  696. }
  697. @usableFromInline
  698. func withMutableUserInfo<Result: Sendable>(
  699. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  700. ) async throws -> Result {
  701. let result = self.eventLoop.submit {
  702. try modify(&self.userInfoRef.value)
  703. }
  704. return try await result.get()
  705. }
  706. }
  707. /// This protocol exists so that the generic server handler can be erased from the
  708. /// `GRPCAsyncServerCallContext`.
  709. ///
  710. /// It provides methods which update context on the async handler by first executing onto the
  711. /// correct event loop.
  712. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  713. @usableFromInline
  714. protocol AsyncServerCallContextProvider: Sendable {
  715. func setResponseHeaders(_ headers: HPACKHeaders) async throws
  716. func setResponseTrailers(_ trailers: HPACKHeaders) async throws
  717. func setResponseCompression(_ enabled: Bool) async throws
  718. func withUserInfo<Result: Sendable>(
  719. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  720. ) async throws -> Result
  721. func withMutableUserInfo<Result: Sendable>(
  722. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  723. ) async throws -> Result
  724. }
  725. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  726. @usableFromInline
  727. internal struct ServerHandlerComponents<
  728. Request: Sendable,
  729. Response: Sendable,
  730. Delegate: NIOAsyncWriterSinkDelegate
  731. > where Delegate.Element == (Response, Compression) {
  732. @usableFromInline
  733. internal typealias AsyncWriterSink = NIOAsyncWriter<(Response, Compression), Delegate>.Sink
  734. @usableFromInline
  735. internal typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer<
  736. Request,
  737. Error,
  738. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  739. GRPCAsyncSequenceProducerDelegate
  740. >.Source
  741. @usableFromInline
  742. internal let task: Task<Void, Never>
  743. @usableFromInline
  744. internal let responseWriterSink: AsyncWriterSink
  745. @usableFromInline
  746. internal let requestSource: AsyncSequenceSource
  747. @inlinable
  748. init(
  749. requestSource: AsyncSequenceSource,
  750. responseWriterSink: AsyncWriterSink,
  751. task: Task<Void, Never>
  752. ) {
  753. self.task = task
  754. self.responseWriterSink = responseWriterSink
  755. self.requestSource = requestSource
  756. }
  757. func cancel() {
  758. // Cancel the request and response streams.
  759. //
  760. // The user handler is encouraged to check for cancellation, however, we should assume
  761. // they do not. Finishing the request source stops any more requests from being delivered
  762. // to the request stream, and finishing the writer sink will ensure no more responses are
  763. // written. This should reduce how long the user handler runs for as it can no longer do
  764. // anything useful.
  765. self.requestSource.finish()
  766. self.responseWriterSink.finish(error: CancellationError())
  767. self.task.cancel()
  768. }
  769. }
  770. #endif // compiler(>=5.6)