Browse Source

Propagate timeout to server (#1947)

Motivation:

gRPC timeouts can be propagated to the server via the 'grpc-timeout'
metadata field. The server can then also choose to enforce the timeout.
At the moment this value isn't propagated, but it should be.

Modifications:

- Convert the timeout to a deadline when it's passed to the rpc executor
  and use this internally. Compute a timeout from the deadline and
  current time when executing an RPC.

Result:

Timeout is propagated to server
George Barnett 1 year ago
parent
commit
9d98758d90

+ 11 - 6
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

@@ -32,7 +32,7 @@ extension ClientRPCExecutor {
     @usableFromInline
     let policy: HedgingPolicy
     @usableFromInline
-    let timeout: Duration?
+    let deadline: ContinuousClock.Instant?
     @usableFromInline
     let interceptors: [any ClientInterceptor]
     @usableFromInline
@@ -46,7 +46,7 @@ extension ClientRPCExecutor {
     init(
       transport: Transport,
       policy: HedgingPolicy,
-      timeout: Duration?,
+      deadline: ContinuousClock.Instant?,
       interceptors: [any ClientInterceptor],
       serializer: Serializer,
       deserializer: Deserializer,
@@ -54,7 +54,7 @@ extension ClientRPCExecutor {
     ) {
       self.transport = transport
       self.policy = policy
-      self.timeout = timeout
+      self.deadline = deadline
       self.interceptors = interceptors
       self.serializer = serializer
       self.deserializer = deserializer
@@ -83,10 +83,10 @@ extension ClientRPCExecutor.HedgingExecutor {
     // all other in flight attempts. Each attempt is started at a fixed interval unless the server
     // explicitly overrides the period using "pushback".
     let result = await withTaskGroup(of: _HedgingTaskResult<R>.self) { group in
-      if let timeout = self.timeout {
+      if let deadline = self.deadline {
         group.addTask {
           let result = await Result {
-            try await Task.sleep(for: timeout, clock: .continuous)
+            try await Task.sleep(until: deadline, clock: .continuous)
           }
           return .timedOut(result)
         }
@@ -103,7 +103,12 @@ extension ClientRPCExecutor.HedgingExecutor {
       }
 
       group.addTask {
-        let replayableRequest = ClientRequest.Stream(metadata: request.metadata) { writer in
+        var metadata = request.metadata
+        if let deadline = self.deadline {
+          metadata.timeout = ContinuousClock.now.duration(to: deadline)
+        }
+
+        let replayableRequest = ClientRequest.Stream(metadata: metadata) { writer in
           try await writer.write(contentsOf: broadcast.stream)
         }
 

+ 9 - 6
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

@@ -32,7 +32,7 @@ extension ClientRPCExecutor {
     @usableFromInline
     let transport: Transport
     @usableFromInline
-    let timeout: Duration?
+    let deadline: ContinuousClock.Instant?
     @usableFromInline
     let interceptors: [any ClientInterceptor]
     @usableFromInline
@@ -43,13 +43,13 @@ extension ClientRPCExecutor {
     @inlinable
     init(
       transport: Transport,
-      timeout: Duration?,
+      deadline: ContinuousClock.Instant?,
       interceptors: [any ClientInterceptor],
       serializer: Serializer,
       deserializer: Deserializer
     ) {
       self.transport = transport
-      self.timeout = timeout
+      self.deadline = deadline
       self.interceptors = interceptors
       self.serializer = serializer
       self.deserializer = deserializer
@@ -72,10 +72,13 @@ extension ClientRPCExecutor.OneShotExecutor {
     ) { group in
       do {
         return try await self.transport.withStream(descriptor: method, options: options) { stream in
-          if let timeout = self.timeout {
+          var request = request
+
+          if let deadline = self.deadline {
+            request.metadata.timeout = ContinuousClock.now.duration(to: deadline)
             group.addTask {
               let result = await Result {
-                try await Task.sleep(until: .now.advanced(by: timeout), clock: .continuous)
+                try await Task.sleep(until: deadline, clock: .continuous)
               }
               return .timedOut(result)
             }
@@ -87,7 +90,7 @@ extension ClientRPCExecutor.OneShotExecutor {
             return .streamExecutorCompleted
           }
 
-          group.addTask {
+          group.addTask { [request] in
             let response = await ClientRPCExecutor.unsafeExecute(
               request: request,
               method: method,

+ 12 - 6
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

@@ -32,7 +32,7 @@ extension ClientRPCExecutor {
     @usableFromInline
     let policy: RetryPolicy
     @usableFromInline
-    let timeout: Duration?
+    let deadline: ContinuousClock.Instant?
     @usableFromInline
     let interceptors: [any ClientInterceptor]
     @usableFromInline
@@ -46,7 +46,7 @@ extension ClientRPCExecutor {
     init(
       transport: Transport,
       policy: RetryPolicy,
-      timeout: Duration?,
+      deadline: ContinuousClock.Instant?,
       interceptors: [any ClientInterceptor],
       serializer: Serializer,
       deserializer: Deserializer,
@@ -54,7 +54,7 @@ extension ClientRPCExecutor {
     ) {
       self.transport = transport
       self.policy = policy
-      self.timeout = timeout
+      self.deadline = deadline
       self.interceptors = interceptors
       self.serializer = serializer
       self.deserializer = deserializer
@@ -94,10 +94,10 @@ extension ClientRPCExecutor.RetryExecutor {
       returning: Result<R, Error>.self
     ) { group in
       // Add a task to limit the overall execution time of the RPC.
-      if let timeout = self.timeout {
+      if let deadline = self.deadline {
         group.addTask {
           let result = await Result {
-            try await Task.sleep(until: .now.advanced(by: timeout), clock: .continuous)
+            try await Task.sleep(until: deadline, clock: .continuous)
           }
           return .timedOut(result)
         }
@@ -136,8 +136,14 @@ extension ClientRPCExecutor.RetryExecutor {
                 }
 
                 thisAttemptGroup.addTask {
+                  var metadata = request.metadata
+                  // Work out the timeout from the deadline.
+                  if let deadline = self.deadline {
+                    metadata.timeout = ContinuousClock.now.duration(to: deadline)
+                  }
+
                   let response = await ClientRPCExecutor.unsafeExecute(
-                    request: ClientRequest.Stream(metadata: request.metadata) {
+                    request: ClientRequest.Stream(metadata: metadata) {
                       try await $0.write(contentsOf: retry.stream)
                     },
                     method: method,

+ 5 - 3
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

@@ -42,11 +42,13 @@ enum ClientRPCExecutor {
     interceptors: [any ClientInterceptor],
     handler: @Sendable @escaping (ClientResponse.Stream<Output>) async throws -> Result
   ) async throws -> Result {
+    let deadline = options.timeout.map { ContinuousClock.now + $0 }
+
     switch options.executionPolicy?.wrapped {
     case .none:
       let oneShotExecutor = OneShotExecutor(
         transport: transport,
-        timeout: options.timeout,
+        deadline: deadline,
         interceptors: interceptors,
         serializer: serializer,
         deserializer: deserializer
@@ -63,7 +65,7 @@ enum ClientRPCExecutor {
       let retryExecutor = RetryExecutor(
         transport: transport,
         policy: policy,
-        timeout: options.timeout,
+        deadline: deadline,
         interceptors: interceptors,
         serializer: serializer,
         deserializer: deserializer,
@@ -81,7 +83,7 @@ enum ClientRPCExecutor {
       let hedging = HedgingExecutor(
         transport: transport,
         policy: policy,
-        timeout: options.timeout,
+        deadline: deadline,
         interceptors: interceptors,
         serializer: serializer,
         deserializer: deserializer,

+ 1 - 2
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift

@@ -51,8 +51,7 @@ extension ClientRPCExecutorTestHarness {
 @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
 extension ClientRPCExecutorTestHarness.ServerStreamHandler {
   static var echo: Self {
-    return Self {
-      stream in
+    return Self { stream in
       let response = stream.inbound.map { part -> RPCResponsePart in
         switch part {
         case .metadata(let metadata):

+ 47 - 1
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests.swift

@@ -13,9 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import GRPCCore
+
 import XCTest
 
+@testable import GRPCCore
+
 @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
 final class ClientRPCExecutorTests: XCTestCase {
   func testUnaryEcho() async throws {
@@ -223,4 +225,48 @@ final class ClientRPCExecutorTests: XCTestCase {
     XCTAssertEqual(tester.clientStreamOpenFailures, 1)
     XCTAssertEqual(tester.serverStreamsAccepted, 0)
   }
+
+  func testTimeoutIsPropagated() async throws {
+    // 'nil' means no retires or hedging, just try to execute the RPC once.
+    var policies: [RPCExecutionPolicy?] = [nil]
+
+    let retryPolicy = RetryPolicy(
+      maximumAttempts: 5,
+      initialBackoff: .seconds(1),
+      maximumBackoff: .seconds(1),
+      backoffMultiplier: 1.6,
+      retryableStatusCodes: [.unavailable]
+    )
+    policies.append(.retry(retryPolicy))
+
+    let hedgingPolicy = HedgingPolicy(
+      maximumAttempts: 5,
+      hedgingDelay: .seconds(1),
+      nonFatalStatusCodes: [.unavailable]
+    )
+    policies.append(.hedge(hedgingPolicy))
+
+    for policy in policies {
+      let timeout = Duration.seconds(120)
+      var options = CallOptions.defaults
+      options.timeout = timeout
+      options.executionPolicy = policy
+
+      let tester = ClientRPCExecutorTestHarness(transport: .inProcess, server: .echo)
+      try await tester.unary(
+        request: ClientRequest.Single(message: []),
+        options: options
+      ) { response in
+        let timeoutMetadata = Array(response.metadata[stringValues: "grpc-timeout"])
+        let parsed = try XCTUnwrap(timeoutMetadata.first.flatMap { Timeout(decoding: $0) })
+
+        // The timeout is handled as a deadline internally and gets converted back to a timeout
+        // when transmitted as metadata, so allow some leeway when checking the value.
+        let leeway = Duration.seconds(1)
+        let acceptable: ClosedRange<Duration> = timeout - leeway ... timeout + leeway
+
+        XCTAssert(acceptable.contains(parsed.duration))
+      }
+    }
+  }
 }