GRPCAsyncResponseStreamWriter.swift 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. /// Writer for server-streaming RPC handlers to provide responses.
  18. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  19. public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
  20. @usableFromInline
  21. internal typealias Element = (Response, Compression)
  22. @usableFromInline
  23. internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>
  24. @usableFromInline
  25. internal let asyncWriter: AsyncWriter<Delegate>
  26. @inlinable
  27. internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
  28. self.asyncWriter = asyncWriter
  29. }
  30. @inlinable
  31. public func send(
  32. _ response: Response,
  33. compression: Compression = .deferToCallDefault
  34. ) async throws {
  35. try await self.asyncWriter.write((response, compression))
  36. }
  37. }
  38. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  39. @usableFromInline
  40. internal final class AsyncResponseStreamWriterDelegate<Response: Sendable>: AsyncWriterDelegate {
  41. @usableFromInline
  42. internal typealias Element = (Response, Compression)
  43. @usableFromInline
  44. internal typealias End = GRPCStatus
  45. @usableFromInline
  46. internal let _context: GRPCAsyncServerCallContext
  47. @usableFromInline
  48. internal let _send: @Sendable (Response, MessageMetadata) -> Void
  49. @usableFromInline
  50. internal let _finish: @Sendable (GRPCStatus) -> Void
  51. @usableFromInline
  52. internal let _compressionEnabledOnServer: Bool
  53. // Create a new AsyncResponseStreamWriterDelegate.
  54. //
  55. // - Important: the `send` and `finish` closures must be thread-safe.
  56. @inlinable
  57. internal init(
  58. context: GRPCAsyncServerCallContext,
  59. compressionIsEnabled: Bool,
  60. send: @escaping @Sendable (Response, MessageMetadata) -> Void,
  61. finish: @escaping @Sendable (GRPCStatus) -> Void
  62. ) {
  63. self._context = context
  64. self._compressionEnabledOnServer = compressionIsEnabled
  65. self._send = send
  66. self._finish = finish
  67. }
  68. @inlinable
  69. internal func _shouldCompress(_ compression: Compression) -> Bool {
  70. guard self._compressionEnabledOnServer else {
  71. return false
  72. }
  73. return compression.isEnabled(callDefault: self._context.compressionEnabled)
  74. }
  75. @inlinable
  76. internal func _send(
  77. _ response: Response,
  78. compression: Compression = .deferToCallDefault
  79. ) {
  80. let compress = self._shouldCompress(compression)
  81. self._send(response, .init(compress: compress, flush: true))
  82. }
  83. // MARK: - AsyncWriterDelegate conformance.
  84. @inlinable
  85. internal func write(_ element: (Response, Compression)) {
  86. self._send(element.0, compression: element.1)
  87. }
  88. @inlinable
  89. internal func writeEnd(_ end: GRPCStatus) {
  90. self._finish(end)
  91. }
  92. }
  93. #endif