2
0

GRPCAsyncRequestStreamWriter.swift 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. /// An object allowing the holder -- a client -- to send requests on an RPC.
  18. ///
  19. /// Requests may be sent using ``send(_:compression:)``. After all requests have been sent
  20. /// the user is responsible for closing the request stream by calling ``finish()``.
  21. ///
  22. /// ```
  23. /// // Send a request on the request stream, use the compression setting configured for the RPC.
  24. /// try await stream.send(request)
  25. ///
  26. /// // Send a request and explicitly disable compression.
  27. /// try await stream.send(request, compression: .disabled)
  28. ///
  29. /// // Finish the stream to indicate that no more messages will be sent.
  30. /// try await stream.finish()
  31. /// ```
  32. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  33. public struct GRPCAsyncRequestStreamWriter<Request: Sendable>: Sendable {
  34. @usableFromInline
  35. internal let asyncWriter: AsyncWriter<Delegate<Request>>
  36. @inlinable
  37. internal init(asyncWriter: AsyncWriter<Delegate<Request>>) {
  38. self.asyncWriter = asyncWriter
  39. }
  40. /// Send a single request.
  41. ///
  42. /// To ensure requests are delivered in order callers should `await` the result of this call
  43. /// before sending another request. Callers who do not need this guarantee do not have to `await`
  44. /// the completion of this call and may send messages concurrently from multiple ``Task``s.
  45. /// However, it is important to note that no more than 16 writes may be pending at any one time
  46. /// and attempting to exceed this will result in an ``GRPCAsyncWriterError.tooManyPendingWrites``
  47. /// error being thrown.
  48. ///
  49. /// Callers must call ``finish()`` when they have no more requests left to send.
  50. ///
  51. /// - Parameters:
  52. /// - request: The request to send.
  53. /// - compression: Whether the request should be compressed or not. Ignored if compression was
  54. /// not enabled for the RPC.
  55. /// - Throws: ``GRPCAsyncWriterError`` if there are too many pending writes or the request stream
  56. /// has already been finished.
  57. @inlinable
  58. public func send(
  59. _ request: Request,
  60. compression: Compression = .deferToCallDefault
  61. ) async throws {
  62. try await self.asyncWriter.write((request, compression))
  63. }
  64. /// Finish the request stream for the RPC. This must be called when there are no more requests to
  65. /// be sent.
  66. ///
  67. /// - Throws: ``GRPCAsyncWriterError`` if the request stream has already been finished.
  68. public func finish() async throws {
  69. try await self.asyncWriter.finish()
  70. }
  71. }
  72. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  73. extension GRPCAsyncRequestStreamWriter {
  74. /// A delegate for the writer which writes messages to an underlying receiver.`
  75. @usableFromInline
  76. internal final class Delegate<Request: Sendable>: AsyncWriterDelegate, Sendable {
  77. @usableFromInline
  78. internal typealias Element = (Request, Compression)
  79. @usableFromInline
  80. internal typealias End = Void
  81. @usableFromInline
  82. internal let _compressionEnabled: Bool
  83. @usableFromInline
  84. internal let _send: @Sendable (Request, MessageMetadata) -> Void
  85. @usableFromInline
  86. internal let _finish: @Sendable () -> Void
  87. @inlinable
  88. internal init(
  89. compressionEnabled: Bool,
  90. send: @Sendable @escaping (Request, MessageMetadata) -> Void,
  91. finish: @Sendable @escaping () -> Void
  92. ) {
  93. self._compressionEnabled = compressionEnabled
  94. self._send = send
  95. self._finish = finish
  96. }
  97. @inlinable
  98. internal func write(_ element: (Request, Compression)) {
  99. let (request, compression) = element
  100. let compress = compression.isEnabled(callDefault: self._compressionEnabled)
  101. // TODO: be smarter about inserting flushes.
  102. //
  103. // We currently always flush after every write which may trigger more syscalls than necessary.
  104. let metadata = MessageMetadata(compress: compress, flush: true)
  105. self._send(request, metadata)
  106. }
  107. @inlinable
  108. internal func writeEnd(_ end: Void) {
  109. self._finish()
  110. }
  111. }
  112. }
  113. #endif // compiler(>=5.6)