GRPCAsyncServerHandler.swift 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import Logging
  18. import NIOCore
  19. import NIOHPACK
  20. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  21. public struct GRPCAsyncServerHandler<
  22. Serializer: MessageSerializer,
  23. Deserializer: MessageDeserializer,
  24. Request: Sendable,
  25. Response: Sendable
  26. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  27. @usableFromInline
  28. internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
  29. public func receiveMetadata(_ metadata: HPACKHeaders) {
  30. self._handler.receiveMetadata(metadata)
  31. }
  32. public func receiveMessage(_ bytes: ByteBuffer) {
  33. self._handler.receiveMessage(bytes)
  34. }
  35. public func receiveEnd() {
  36. self._handler.receiveEnd()
  37. }
  38. public func receiveError(_ error: Error) {
  39. self._handler.receiveError(error)
  40. }
  41. public func finish() {
  42. self._handler.finish()
  43. }
  44. }
  45. // MARK: - RPC Adapters
  46. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  47. extension GRPCAsyncServerHandler {
  48. public typealias Request = Deserializer.Output
  49. public typealias Response = Serializer.Input
  50. @inlinable
  51. public init(
  52. context: CallHandlerContext,
  53. requestDeserializer: Deserializer,
  54. responseSerializer: Serializer,
  55. interceptors: [ServerInterceptor<Request, Response>],
  56. wrapping unary: @escaping @Sendable (Request, GRPCAsyncServerCallContext) async throws
  57. -> Response
  58. ) {
  59. self._handler = .init(
  60. context: context,
  61. requestDeserializer: requestDeserializer,
  62. responseSerializer: responseSerializer,
  63. callType: .unary,
  64. interceptors: interceptors,
  65. userHandler: { requestStream, responseStreamWriter, context in
  66. var iterator = requestStream.makeAsyncIterator()
  67. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  68. throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
  69. }
  70. let response = try await unary(request, context)
  71. try await responseStreamWriter.send(response)
  72. }
  73. )
  74. }
  75. @inlinable
  76. public init(
  77. context: CallHandlerContext,
  78. requestDeserializer: Deserializer,
  79. responseSerializer: Serializer,
  80. interceptors: [ServerInterceptor<Request, Response>],
  81. wrapping clientStreaming: @escaping @Sendable (
  82. GRPCAsyncRequestStream<Request>,
  83. GRPCAsyncServerCallContext
  84. ) async throws -> Response
  85. ) {
  86. self._handler = .init(
  87. context: context,
  88. requestDeserializer: requestDeserializer,
  89. responseSerializer: responseSerializer,
  90. callType: .clientStreaming,
  91. interceptors: interceptors,
  92. userHandler: { requestStream, responseStreamWriter, context in
  93. let response = try await clientStreaming(requestStream, context)
  94. try await responseStreamWriter.send(response)
  95. }
  96. )
  97. }
  98. @inlinable
  99. public init(
  100. context: CallHandlerContext,
  101. requestDeserializer: Deserializer,
  102. responseSerializer: Serializer,
  103. interceptors: [ServerInterceptor<Request, Response>],
  104. wrapping serverStreaming: @escaping @Sendable (
  105. Request,
  106. GRPCAsyncResponseStreamWriter<Response>,
  107. GRPCAsyncServerCallContext
  108. ) async throws -> Void
  109. ) {
  110. self._handler = .init(
  111. context: context,
  112. requestDeserializer: requestDeserializer,
  113. responseSerializer: responseSerializer,
  114. callType: .serverStreaming,
  115. interceptors: interceptors,
  116. userHandler: { requestStream, responseStreamWriter, context in
  117. var iterator = requestStream.makeAsyncIterator()
  118. guard let request = try await iterator.next(), try await iterator.next() == nil else {
  119. throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
  120. }
  121. try await serverStreaming(request, responseStreamWriter, context)
  122. }
  123. )
  124. }
  125. @inlinable
  126. public init(
  127. context: CallHandlerContext,
  128. requestDeserializer: Deserializer,
  129. responseSerializer: Serializer,
  130. interceptors: [ServerInterceptor<Request, Response>],
  131. wrapping bidirectional: @escaping @Sendable (
  132. GRPCAsyncRequestStream<Request>,
  133. GRPCAsyncResponseStreamWriter<Response>,
  134. GRPCAsyncServerCallContext
  135. ) async throws -> Void
  136. ) {
  137. self._handler = .init(
  138. context: context,
  139. requestDeserializer: requestDeserializer,
  140. responseSerializer: responseSerializer,
  141. callType: .bidirectionalStreaming,
  142. interceptors: interceptors,
  143. userHandler: bidirectional
  144. )
  145. }
  146. }
  147. // MARK: - Server Handler
  148. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  149. @usableFromInline
  150. internal final class AsyncServerHandler<
  151. Serializer: MessageSerializer,
  152. Deserializer: MessageDeserializer,
  153. Request: Sendable,
  154. Response: Sendable
  155. >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
  156. /// A response serializer.
  157. @usableFromInline
  158. internal let serializer: Serializer
  159. /// A request deserializer.
  160. @usableFromInline
  161. internal let deserializer: Deserializer
  162. /// The event loop that this handler executes on.
  163. @usableFromInline
  164. internal let eventLoop: EventLoop
  165. /// A `ByteBuffer` allocator provided by the underlying `Channel`.
  166. @usableFromInline
  167. internal let allocator: ByteBufferAllocator
  168. /// A user-provided error delegate which, if provided, is used to transform errors and potentially
  169. /// pack errors into trailers.
  170. @usableFromInline
  171. internal let errorDelegate: ServerErrorDelegate?
  172. /// A logger.
  173. @usableFromInline
  174. internal let logger: Logger
  175. /// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
  176. /// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
  177. /// an appropriate event loop.
  178. @usableFromInline
  179. internal let userInfoRef: Ref<UserInfo>
  180. /// Whether compression is enabled on the server and an algorithm has been negotiated with
  181. /// the client
  182. @usableFromInline
  183. internal let compressionEnabledOnRPC: Bool
  184. /// Whether the RPC method would like to compress responses (if possible). Defaults to true.
  185. @usableFromInline
  186. internal var compressResponsesIfPossible: Bool
  187. /// A state machine for the interceptor pipeline.
  188. @usableFromInline
  189. internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine
  190. /// The interceptor pipeline.
  191. @usableFromInline
  192. internal private(set) var interceptors: Optional<ServerInterceptorPipeline<Request, Response>>
  193. /// An object for writing intercepted responses to the channel.
  194. @usableFromInline
  195. internal private(set) var responseWriter: Optional<GRPCServerResponseWriter>
  196. /// A state machine for the user implemented function.
  197. @usableFromInline
  198. internal private(set) var handlerStateMachine: ServerHandlerStateMachine
  199. /// A bag of components used by the user handler.
  200. @usableFromInline
  201. internal private(set) var handlerComponents: Optional<ServerHandlerComponents<
  202. Request,
  203. AsyncResponseStreamWriterDelegate<Response>
  204. >>
  205. /// The user provided function to execute.
  206. @usableFromInline
  207. internal let userHandler: @Sendable (
  208. GRPCAsyncRequestStream<Request>,
  209. GRPCAsyncResponseStreamWriter<Response>,
  210. GRPCAsyncServerCallContext
  211. ) async throws -> Void
  212. @inlinable
  213. internal init(
  214. context: CallHandlerContext,
  215. requestDeserializer: Deserializer,
  216. responseSerializer: Serializer,
  217. callType: GRPCCallType,
  218. interceptors: [ServerInterceptor<Request, Response>],
  219. userHandler: @escaping @Sendable (
  220. GRPCAsyncRequestStream<Request>,
  221. GRPCAsyncResponseStreamWriter<Response>,
  222. GRPCAsyncServerCallContext
  223. ) async throws -> Void
  224. ) {
  225. self.serializer = responseSerializer
  226. self.deserializer = requestDeserializer
  227. self.eventLoop = context.eventLoop
  228. self.allocator = context.allocator
  229. self.responseWriter = context.responseWriter
  230. self.errorDelegate = context.errorDelegate
  231. self.compressionEnabledOnRPC = context.encoding.isEnabled
  232. self.compressResponsesIfPossible = true
  233. self.logger = context.logger
  234. self.userInfoRef = Ref(UserInfo())
  235. self.handlerStateMachine = .init()
  236. self.handlerComponents = nil
  237. self.userHandler = userHandler
  238. self.interceptorStateMachine = .init()
  239. self.interceptors = nil
  240. self.interceptors = ServerInterceptorPipeline(
  241. logger: context.logger,
  242. eventLoop: context.eventLoop,
  243. path: context.path,
  244. callType: callType,
  245. remoteAddress: context.remoteAddress,
  246. userInfoRef: self.userInfoRef,
  247. interceptors: interceptors,
  248. onRequestPart: self.receiveInterceptedPart(_:),
  249. onResponsePart: self.sendInterceptedPart(_:promise:)
  250. )
  251. }
  252. // MARK: - GRPCServerHandlerProtocol conformance
  253. @inlinable
  254. internal func receiveMetadata(_ headers: HPACKHeaders) {
  255. switch self.interceptorStateMachine.interceptRequestMetadata() {
  256. case .intercept:
  257. self.interceptors?.receive(.metadata(headers))
  258. case .cancel:
  259. self.cancel(error: nil)
  260. case .drop:
  261. ()
  262. }
  263. }
  264. @inlinable
  265. internal func receiveMessage(_ bytes: ByteBuffer) {
  266. let request: Request
  267. do {
  268. request = try self.deserializer.deserialize(byteBuffer: bytes)
  269. } catch {
  270. return self.cancel(error: error)
  271. }
  272. switch self.interceptorStateMachine.interceptRequestMessage() {
  273. case .intercept:
  274. self.interceptors?.receive(.message(request))
  275. case .cancel:
  276. self.cancel(error: nil)
  277. case .drop:
  278. ()
  279. }
  280. }
  281. @inlinable
  282. internal func receiveEnd() {
  283. switch self.interceptorStateMachine.interceptRequestEnd() {
  284. case .intercept:
  285. self.interceptors?.receive(.end)
  286. case .cancel:
  287. self.cancel(error: nil)
  288. case .drop:
  289. ()
  290. }
  291. }
  292. @inlinable
  293. internal func receiveError(_ error: Error) {
  294. self.cancel(error: error)
  295. }
  296. @inlinable
  297. internal func finish() {
  298. self.cancel(error: nil)
  299. }
  300. @usableFromInline
  301. internal func cancel(error: Error?) {
  302. self.eventLoop.assertInEventLoop()
  303. switch self.handlerStateMachine.cancel() {
  304. case .cancelAndNilOutHandlerComponents:
  305. // Cancel handler related things (task, response writer).
  306. self.handlerComponents?.cancel()
  307. self.handlerComponents = nil
  308. // We don't distinguish between having sent the status or not; we just tell the interceptor
  309. // state machine that we want to send a response status. It will inform us whether to
  310. // generate and send one or not.
  311. switch self.interceptorStateMachine.interceptedResponseStatus() {
  312. case .forward:
  313. let error = error ?? GRPCStatus.processingError
  314. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  315. error,
  316. delegate: self.errorDelegate
  317. )
  318. self.responseWriter?.sendEnd(status: status, trailers: trailers, promise: nil)
  319. case .drop, .cancel:
  320. ()
  321. }
  322. case .none:
  323. ()
  324. }
  325. switch self.interceptorStateMachine.cancel() {
  326. case .sendStatusThenNilOutInterceptorPipeline:
  327. self.responseWriter?.sendEnd(status: .processingError, trailers: [:], promise: nil)
  328. fallthrough
  329. case .nilOutInterceptorPipeline:
  330. self.interceptors = nil
  331. self.responseWriter = nil
  332. case .none:
  333. ()
  334. }
  335. }
  336. // MARK: - Interceptors to User Function
  337. @inlinable
  338. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  339. switch part {
  340. case let .metadata(headers):
  341. self.receiveInterceptedMetadata(headers)
  342. case let .message(message):
  343. self.receiveInterceptedMessage(message)
  344. case .end:
  345. self.receiveInterceptedEnd()
  346. }
  347. }
  348. @inlinable
  349. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  350. switch self.interceptorStateMachine.interceptedRequestMetadata() {
  351. case .forward:
  352. () // continue
  353. case .cancel:
  354. return self.cancel(error: nil)
  355. case .drop:
  356. return
  357. }
  358. switch self.handlerStateMachine.handleMetadata() {
  359. case .invokeHandler:
  360. // We're going to invoke the handler. We need to create a handful of things in order to do
  361. // that:
  362. //
  363. // - A context which allows the handler to set response headers/trailers and provides them
  364. // with a logger amongst other things.
  365. // - A request source; we push request messages into this which the handler consumes via
  366. // an async sequence.
  367. // - An async writer and delegate. The delegate calls us back with responses. The writer is
  368. // passed to the handler.
  369. //
  370. // All of these components are held in a bundle ("handler components") outside of the state
  371. // machine. We release these when we eventually call cancel (either when we `self.cancel()`
  372. // as a result of an error or when `self.finish()` is called).
  373. let handlerContext = GRPCAsyncServerCallContext(
  374. headers: headers,
  375. logger: self.logger,
  376. contextProvider: self
  377. )
  378. let requestSource = PassthroughMessageSource<Request, Error>()
  379. let writerDelegate = AsyncResponseStreamWriterDelegate(
  380. send: self.interceptResponseMessage(_:compression:),
  381. finish: self.interceptResponseStatus(_:)
  382. )
  383. let writer = AsyncWriter(delegate: writerDelegate)
  384. // The user handler has two exit modes:
  385. // 1. It completes successfully (the async user function completes without throwing), or
  386. // 2. It throws an error.
  387. //
  388. // On the happy path the 'ok' status is queued up on the async writer. On the error path
  389. // the writer queue is drained and promise below is completed. When the promise is failed
  390. // it processes the error (possibly via a delegate) and sends back an appropriate status.
  391. // We require separate paths as the failure path needs to execute on the event loop to process
  392. // the error.
  393. let promise = self.eventLoop.makePromise(of: Void.self)
  394. // The success path is taken care of by the Task.
  395. promise.futureResult.whenFailure { error in
  396. self.userHandlerThrewError(error)
  397. }
  398. // Update our state before invoke the handler.
  399. self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
  400. self.handlerComponents = ServerHandlerComponents(
  401. requestSource: requestSource,
  402. responseWriter: writer,
  403. task: promise.completeWithTask {
  404. // We don't have a task cancellation handler here: we do it in `self.cancel()`.
  405. try await self.invokeUserHandler(
  406. requestStreamSource: requestSource,
  407. responseStreamWriter: writer,
  408. callContext: handlerContext
  409. )
  410. }
  411. )
  412. case .cancel:
  413. self.cancel(error: nil)
  414. }
  415. }
  416. @Sendable
  417. @usableFromInline
  418. internal func invokeUserHandler(
  419. requestStreamSource: PassthroughMessageSource<Request, Error>,
  420. responseStreamWriter: AsyncWriter<AsyncResponseStreamWriterDelegate<Response>>,
  421. callContext: GRPCAsyncServerCallContext
  422. ) async throws {
  423. defer {
  424. // It's possible the user handler completed before the end of the request stream. We
  425. // explicitly finish it to drop any unconsumed inbound messages.
  426. requestStreamSource.finish()
  427. }
  428. do {
  429. let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
  430. let responseStream = GRPCAsyncResponseStreamWriter(wrapping: responseStreamWriter)
  431. try await self.userHandler(requestStream, responseStream, callContext)
  432. // Done successfully. Queue up and send back an 'ok' status.
  433. try await responseStreamWriter.finish(.ok)
  434. } catch {
  435. // Drop pending writes as we're on the error path.
  436. await responseStreamWriter.cancel()
  437. if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
  438. throw GRPCStatus(code: .unknown, message: "Handler threw error with status code 'ok'.")
  439. } else {
  440. throw error
  441. }
  442. }
  443. }
  444. @usableFromInline
  445. internal func userHandlerThrewError(_ error: Error) {
  446. self.eventLoop.assertInEventLoop()
  447. switch self.handlerStateMachine.sendStatus() {
  448. case let .intercept(requestHeaders, trailers):
  449. let (status, processedTrailers) = ServerErrorProcessor.processObserverError(
  450. error,
  451. headers: requestHeaders,
  452. trailers: trailers,
  453. delegate: self.errorDelegate
  454. )
  455. switch self.interceptorStateMachine.interceptResponseStatus() {
  456. case .intercept:
  457. self.interceptors?.send(.end(status, processedTrailers), promise: nil)
  458. case .cancel:
  459. self.cancel(error: nil)
  460. case .drop:
  461. ()
  462. }
  463. case .drop:
  464. ()
  465. }
  466. }
  467. @inlinable
  468. internal func receiveInterceptedMessage(_ request: Request) {
  469. switch self.interceptorStateMachine.interceptedRequestMessage() {
  470. case .forward:
  471. switch self.handlerStateMachine.handleMessage() {
  472. case .forward:
  473. self.handlerComponents?.requestSource.yield(request)
  474. case .cancel:
  475. self.cancel(error: nil)
  476. }
  477. case .cancel:
  478. self.cancel(error: nil)
  479. case .drop:
  480. ()
  481. }
  482. }
  483. @inlinable
  484. internal func receiveInterceptedEnd() {
  485. switch self.interceptorStateMachine.interceptedRequestEnd() {
  486. case .forward:
  487. switch self.handlerStateMachine.handleEnd() {
  488. case .forward:
  489. self.handlerComponents?.requestSource.finish()
  490. case .cancel:
  491. self.cancel(error: nil)
  492. }
  493. case .cancel:
  494. self.cancel(error: nil)
  495. case .drop:
  496. ()
  497. }
  498. }
  499. // MARK: - User Function To Interceptors
  500. @inlinable
  501. internal func _interceptResponseMessage(_ response: Response, compression: Compression) {
  502. self.eventLoop.assertInEventLoop()
  503. switch self.handlerStateMachine.sendMessage() {
  504. case let .intercept(.some(headers)):
  505. switch self.interceptorStateMachine.interceptResponseMetadata() {
  506. case .intercept:
  507. self.interceptors?.send(.metadata(headers), promise: nil)
  508. case .cancel:
  509. return self.cancel(error: nil)
  510. case .drop:
  511. ()
  512. }
  513. // Fall through to the next case to send the response message.
  514. fallthrough
  515. case .intercept(.none):
  516. switch self.interceptorStateMachine.interceptResponseMessage() {
  517. case .intercept:
  518. let senderWantsCompression = compression.isEnabled(
  519. callDefault: self.compressResponsesIfPossible
  520. )
  521. let compress = self.compressionEnabledOnRPC && senderWantsCompression
  522. let metadata = MessageMetadata(compress: compress, flush: true)
  523. self.interceptors?.send(.message(response, metadata), promise: nil)
  524. case .cancel:
  525. return self.cancel(error: nil)
  526. case .drop:
  527. ()
  528. }
  529. case .drop:
  530. ()
  531. }
  532. }
  533. @Sendable
  534. @inlinable
  535. internal func interceptResponseMessage(_ response: Response, compression: Compression) {
  536. if self.eventLoop.inEventLoop {
  537. self._interceptResponseMessage(response, compression: compression)
  538. } else {
  539. self.eventLoop.execute {
  540. self._interceptResponseMessage(response, compression: compression)
  541. }
  542. }
  543. }
  544. @inlinable
  545. internal func _interceptResponseStatus(_ status: GRPCStatus) {
  546. self.eventLoop.assertInEventLoop()
  547. switch self.handlerStateMachine.sendStatus() {
  548. case let .intercept(_, trailers):
  549. switch self.interceptorStateMachine.interceptResponseStatus() {
  550. case .intercept:
  551. self.interceptors?.send(.end(status, trailers), promise: nil)
  552. case .cancel:
  553. return self.cancel(error: nil)
  554. case .drop:
  555. ()
  556. }
  557. case .drop:
  558. ()
  559. }
  560. }
  561. @Sendable
  562. @inlinable
  563. internal func interceptResponseStatus(_ status: GRPCStatus) {
  564. if self.eventLoop.inEventLoop {
  565. self._interceptResponseStatus(status)
  566. } else {
  567. self.eventLoop.execute {
  568. self._interceptResponseStatus(status)
  569. }
  570. }
  571. }
  572. @inlinable
  573. internal func sendInterceptedPart(
  574. _ part: GRPCServerResponsePart<Response>,
  575. promise: EventLoopPromise<Void>?
  576. ) {
  577. switch part {
  578. case let .metadata(headers):
  579. self.sendInterceptedMetadata(headers, promise: promise)
  580. case let .message(message, metadata):
  581. do {
  582. let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
  583. self.sendInterceptedResponse(bytes, metadata: metadata, promise: promise)
  584. } catch {
  585. promise?.fail(error)
  586. self.cancel(error: error)
  587. }
  588. case let .end(status, trailers):
  589. self.sendInterceptedStatus(status, metadata: trailers, promise: promise)
  590. }
  591. }
  592. @inlinable
  593. internal func sendInterceptedMetadata(
  594. _ metadata: HPACKHeaders,
  595. promise: EventLoopPromise<Void>?
  596. ) {
  597. switch self.interceptorStateMachine.interceptedResponseMetadata() {
  598. case .forward:
  599. if let responseWriter = self.responseWriter {
  600. responseWriter.sendMetadata(metadata, flush: false, promise: promise)
  601. } else if let promise = promise {
  602. promise.fail(GRPCStatus.processingError)
  603. }
  604. case .cancel:
  605. self.cancel(error: nil)
  606. case .drop:
  607. ()
  608. }
  609. }
  610. @inlinable
  611. internal func sendInterceptedResponse(
  612. _ bytes: ByteBuffer,
  613. metadata: MessageMetadata,
  614. promise: EventLoopPromise<Void>?
  615. ) {
  616. switch self.interceptorStateMachine.interceptedResponseMessage() {
  617. case .forward:
  618. if let responseWriter = self.responseWriter {
  619. responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  620. } else if let promise = promise {
  621. promise.fail(GRPCStatus.processingError)
  622. }
  623. case .cancel:
  624. self.cancel(error: nil)
  625. case .drop:
  626. ()
  627. }
  628. }
  629. @inlinable
  630. internal func sendInterceptedStatus(
  631. _ status: GRPCStatus,
  632. metadata: HPACKHeaders,
  633. promise: EventLoopPromise<Void>?
  634. ) {
  635. switch self.interceptorStateMachine.interceptedResponseStatus() {
  636. case .forward:
  637. if let responseWriter = self.responseWriter {
  638. responseWriter.sendEnd(status: status, trailers: metadata, promise: promise)
  639. } else if let promise = promise {
  640. promise.fail(GRPCStatus.processingError)
  641. }
  642. case .cancel:
  643. self.cancel(error: nil)
  644. case .drop:
  645. ()
  646. }
  647. }
  648. }
  649. // Sendability is unchecked as all mutable state is accessed/modified from an appropriate event
  650. // loop.
  651. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  652. extension AsyncServerHandler: @unchecked Sendable {}
  653. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  654. extension AsyncServerHandler: AsyncServerCallContextProvider {
  655. @usableFromInline
  656. internal func setResponseHeaders(_ headers: HPACKHeaders) async throws {
  657. let completed = self.eventLoop.submit {
  658. self.handlerStateMachine.setResponseHeaders(headers)
  659. }
  660. try await completed.get()
  661. }
  662. @usableFromInline
  663. internal func setResponseTrailers(_ headers: HPACKHeaders) async throws {
  664. let completed = self.eventLoop.submit {
  665. self.handlerStateMachine.setResponseTrailers(headers)
  666. }
  667. try await completed.get()
  668. }
  669. @usableFromInline
  670. internal func setResponseCompression(_ enabled: Bool) async throws {
  671. let completed = self.eventLoop.submit {
  672. self.compressResponsesIfPossible = enabled
  673. }
  674. try await completed.get()
  675. }
  676. @usableFromInline
  677. func withUserInfo<Result: Sendable>(
  678. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  679. ) async throws -> Result {
  680. let result = self.eventLoop.submit {
  681. try modify(self.userInfoRef.value)
  682. }
  683. return try await result.get()
  684. }
  685. @usableFromInline
  686. func withMutableUserInfo<Result: Sendable>(
  687. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  688. ) async throws -> Result {
  689. let result = self.eventLoop.submit {
  690. try modify(&self.userInfoRef.value)
  691. }
  692. return try await result.get()
  693. }
  694. }
  695. /// This protocol exists so that the generic server handler can be erased from the
  696. /// `GRPCAsyncServerCallContext`.
  697. ///
  698. /// It provides methods which update context on the async handler by first executing onto the
  699. /// correct event loop.
  700. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  701. @usableFromInline
  702. protocol AsyncServerCallContextProvider: Sendable {
  703. func setResponseHeaders(_ headers: HPACKHeaders) async throws
  704. func setResponseTrailers(_ trailers: HPACKHeaders) async throws
  705. func setResponseCompression(_ enabled: Bool) async throws
  706. func withUserInfo<Result: Sendable>(
  707. _ modify: @Sendable @escaping (UserInfo) throws -> Result
  708. ) async throws -> Result
  709. func withMutableUserInfo<Result: Sendable>(
  710. _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
  711. ) async throws -> Result
  712. }
  713. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  714. @usableFromInline
  715. internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriterDelegate> {
  716. @usableFromInline
  717. internal let task: Task<Void, Never>
  718. @usableFromInline
  719. internal let responseWriter: AsyncWriter<Delegate>
  720. @usableFromInline
  721. internal let requestSource: PassthroughMessageSource<Request, Error>
  722. @inlinable
  723. init(
  724. requestSource: PassthroughMessageSource<Request, Error>,
  725. responseWriter: AsyncWriter<Delegate>,
  726. task: Task<Void, Never>
  727. ) {
  728. self.task = task
  729. self.responseWriter = responseWriter
  730. self.requestSource = requestSource
  731. }
  732. func cancel() {
  733. // Cancel the request and response streams.
  734. //
  735. // The user handler is encouraged to check for cancellation, however, we should assume
  736. // they do not. Cancelling the request source stops any more requests from being delivered
  737. // to the request stream, and cancelling the writer will ensure no more responses are
  738. // written. This should reduce how long the user handler runs for as it can no longer do
  739. // anything useful.
  740. self.requestSource.finish(throwing: CancellationError())
  741. self.responseWriter.cancelAsynchronously()
  742. self.task.cancel()
  743. }
  744. }
  745. #endif // compiler(>=5.6)