Explorar el Código

Make the retry throttle optional (#1734)

Motivation:

The retry throttle should be optional, it's currently required by the
client transport API.

The in-process transport also defaults the throttle without allowing
callers to configure it.

Modifications:

- Make the retry throttle an optional requirement on client transport
- Allow callers to configure the retry throttle for the in-process
  client transport
- Add `async throws` to one of the in-process server transport methods,
  while not necessary, the protocol allows it and we may want to change
  the implementation in the future.

Result:

Retry throttle is optional and configurable for the in-process transport
George Barnett hace 2 años
padre
commit
ab82da88e9

+ 3 - 3
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

@@ -359,7 +359,7 @@ extension ClientRPCExecutor.HedgingExecutor {
             case .response(let response):
               switch response.accepted {
               case .success:
-                self.transport.retryThrottle.recordSuccess()
+                self.transport.retryThrottle?.recordSuccess()
 
                 if state.withLockedValue({ $0.receivedUsableResponse() }) {
                   try? await picker.continuation.write(attempt)
@@ -376,11 +376,11 @@ extension ClientRPCExecutor.HedgingExecutor {
 
                 if self.policy.nonFatalStatusCodes.contains(Status.Code(error.code)) {
                   // The response failed and the status code is non-fatal, we can make another attempt.
-                  self.transport.retryThrottle.recordFailure()
+                  self.transport.retryThrottle?.recordFailure()
                   return .unusableResponse(response, error.metadata.retryPushback)
                 } else {
                   // A fatal error code counts as a success to the throttle.
-                  self.transport.retryThrottle.recordSuccess()
+                  self.transport.retryThrottle?.recordSuccess()
 
                   if state.withLockedValue({ $0.receivedUsableResponse() }) {
                     try! await picker.continuation.write(attempt)

+ 3 - 3
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

@@ -152,7 +152,7 @@ extension ClientRPCExecutor.RetryExecutor {
                   case .success:
                     // Request was accepted. This counts as success to the throttle and there's no need
                     // to retry.
-                    self.transport.retryThrottle.recordSuccess()
+                    self.transport.retryThrottle?.recordSuccess()
                     retryDelayOverride = nil
                     shouldRetry = false
 
@@ -170,7 +170,7 @@ extension ClientRPCExecutor.RetryExecutor {
 
                     if isRetryableStatusCode {
                       // Counted as failure for throttling.
-                      let throttled = self.transport.retryThrottle.recordFailure()
+                      let throttled = self.transport.retryThrottle?.recordFailure() ?? false
 
                       // Status code can be retried, Did the server send pushback?
                       switch error.metadata.retryPushback {
@@ -190,7 +190,7 @@ extension ClientRPCExecutor.RetryExecutor {
                       }
                     } else {
                       // Not-retryable; this is considered a success.
-                      self.transport.retryThrottle.recordSuccess()
+                      self.transport.retryThrottle?.recordSuccess()
                       shouldRetry = false
                       retryDelayOverride = nil
                     }

+ 1 - 1
Sources/GRPCCore/Transport/ClientTransport.swift

@@ -24,7 +24,7 @@ public protocol ClientTransport: Sendable {
   /// Client transports don't need to implement the throttle or interact with it beyond its
   /// creation. gRPC will record the results of requests to determine whether retries can be
   /// performed.
-  var retryThrottle: RetryThrottle { get }
+  var retryThrottle: RetryThrottle? { get }
 
   /// Establish and maintain a connection to the remote destination.
   ///

+ 10 - 3
Sources/GRPCInProcessTransport/InProcessClientTransport.swift

@@ -96,16 +96,23 @@ public struct InProcessClientTransport: ClientTransport {
   public typealias Inbound = RPCAsyncSequence<RPCResponsePart>
   public typealias Outbound = RPCWriter<RPCRequestPart>.Closable
 
-  public let retryThrottle: RetryThrottle
+  public let retryThrottle: RetryThrottle?
 
   private let methodConfiguration: MethodConfigurations
   private let state: _LockedValueBox<State>
 
+  /// Creates a new in-process client transport.
+  ///
+  /// - Parameters:
+  ///   - server: The in-process server transport to connect to.
+  ///   - methodConfiguration: Method specific configuration.
+  ///   - retryThrottle: A throttle to apply to RPCs which are hedged or retried.
   public init(
     server: InProcessServerTransport,
-    methodConfiguration: MethodConfigurations = MethodConfigurations()
+    methodConfiguration: MethodConfigurations = MethodConfigurations(),
+    retryThrottle: RetryThrottle? = nil
   ) {
-    self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+    self.retryThrottle = retryThrottle
     self.methodConfiguration = methodConfiguration
     self.state = _LockedValueBox(.unconnected(.init(serverTransport: server)))
   }

+ 1 - 1
Sources/GRPCInProcessTransport/InProcessServerTransport.swift

@@ -59,7 +59,7 @@ public struct InProcessServerTransport: ServerTransport, Sendable {
   /// to this transport using the ``acceptStream(_:)`` method.
   ///
   /// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
-  public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
+  public func listen() async throws -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
     RPCAsyncSequence(wrapping: self.newStreams)
   }
 

+ 2 - 2
Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift

@@ -20,7 +20,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
   typealias Inbound = RPCAsyncSequence<RPCResponsePart>
   typealias Outbound = RPCWriter<RPCRequestPart>.Closable
 
-  private let _retryThrottle: @Sendable () -> RetryThrottle
+  private let _retryThrottle: @Sendable () -> RetryThrottle?
   private let _withStream:
     @Sendable (
       _ method: MethodDescriptor,
@@ -52,7 +52,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
     }
   }
 
-  var retryThrottle: RetryThrottle {
+  var retryThrottle: RetryThrottle? {
     self._retryThrottle()
   }
 

+ 1 - 1
Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift

@@ -39,7 +39,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable {
     self.transport = AnyClientTransport(wrapping: transport)
   }
 
-  var retryThrottle: RetryThrottle {
+  var retryThrottle: RetryThrottle? {
     self.transport.retryThrottle
   }
 

+ 1 - 1
Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift

@@ -26,7 +26,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport {
     self.code = code
   }
 
-  let retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
+  let retryThrottle: RetryThrottle? = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
 
   func connect(lazily: Bool) async throws {
     // no-op

+ 1 - 1
Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

@@ -162,7 +162,7 @@ final class InProcessClientTransportTests: XCTestCase {
       }
 
       group.addTask {
-        for try await stream in server.listen() {
+        for try await stream in try await server.listen() {
           let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
           try await stream.outbound.write(RPCResponsePart.message([42]))
           stream.outbound.finish()

+ 2 - 2
Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift

@@ -37,7 +37,7 @@ final class InProcessServerTransportTests: XCTestCase {
       )
     )
 
-    let streamSequence = transport.listen()
+    let streamSequence = try await transport.listen()
     var streamSequenceInterator = streamSequence.makeAsyncIterator()
 
     try transport.acceptStream(stream)
@@ -66,7 +66,7 @@ final class InProcessServerTransportTests: XCTestCase {
       )
     )
 
-    let streamSequence = transport.listen()
+    let streamSequence = try await transport.listen()
     var streamSequenceInterator = streamSequence.makeAsyncIterator()
 
     try transport.acceptStream(firstStream)