Browse Source

Set tolerance to zero when using `Task.sleep` (#2225)

`Task.sleep` will by default try and coalesce multiple timers into one,
mostly for client-specific reasons such as performance, power
consumption, etc.

However, this is undesirable on servers, as it can increase latency,
memory usage, and (in the case of gRPC) may result in timeouts not
firing when they should.

We can avoid this by setting the sleep `tolerance` to zero.
Gus Cairo 9 months ago
parent
commit
c295efd55e

+ 2 - 2
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

@@ -83,7 +83,7 @@ extension ClientRPCExecutor.HedgingExecutor {
       if let deadline = self.deadline {
         group.addTask {
           let result = await Result {
-            try await Task.sleep(until: deadline, clock: .continuous)
+            try await Task.sleep(until: deadline, tolerance: .zero, clock: .continuous)
           }
           return .timedOut(result)
         }
@@ -533,7 +533,7 @@ extension ClientRPCExecutor.HedgingExecutor {
       self._isPushback = pushback
       self._handle = group.addCancellableTask {
         do {
-          try await Task.sleep(for: delay, clock: .continuous)
+          try await Task.sleep(for: delay, tolerance: .zero, clock: .continuous)
           return .scheduledAttemptFired(.ran)
         } catch {
           return .scheduledAttemptFired(.cancelled)

+ 1 - 1
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

@@ -137,7 +137,7 @@ func withDeadline<Result: Sendable>(
   return await withTaskGroup(of: _DeadlineChildTaskResult<Result>.self) { group in
     group.addTask {
       do {
-        try await Task.sleep(until: deadline)
+        try await Task.sleep(until: deadline, tolerance: .zero)
         return .deadlinePassed
       } catch {
         return .timeoutCancelled

+ 7 - 2
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

@@ -92,7 +92,7 @@ extension ClientRPCExecutor.RetryExecutor {
       if let deadline = self.deadline {
         group.addTask {
           let result = await Result {
-            try await Task.sleep(until: deadline, clock: .continuous)
+            try await Task.sleep(until: deadline, tolerance: .zero, clock: .continuous)
           }
           return .timedOut(result)
         }
@@ -155,11 +155,16 @@ extension ClientRPCExecutor.RetryExecutor {
                   // If the delay is overridden with server pushback then reset the iterator for the
                   // next retry.
                   delayIterator = delaySequence.makeIterator()
-                  try? await Task.sleep(until: .now.advanced(by: delayOverride), clock: .continuous)
+                  try? await Task.sleep(
+                    until: .now.advanced(by: delayOverride),
+                    tolerance: .zero,
+                    clock: .continuous
+                  )
                 } else {
                   // The delay iterator never terminates.
                   try? await Task.sleep(
                     until: .now.advanced(by: delayIterator.next()!),
+                    tolerance: .zero,
                     clock: .continuous
                   )
                 }

+ 1 - 1
Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

@@ -123,7 +123,7 @@ struct ServerRPCExecutor {
     await withTaskGroup(of: Void.self) { group in
       group.addTask {
         do {
-          try await Task.sleep(for: timeout, clock: .continuous)
+          try await Task.sleep(for: timeout, tolerance: .zero, clock: .continuous)
           context.cancellation.cancel()
         } catch {
           ()  // Only cancel the RPC if the timeout completes.

+ 1 - 1
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift

@@ -112,7 +112,7 @@ extension ClientRPCExecutorTestHarness.ServerStreamHandler {
 
   static func sleepFor(duration: Duration, then handler: Self) -> Self {
     return Self { stream in
-      try await Task.sleep(until: .now.advanced(by: duration), clock: .continuous)
+      try await Task.sleep(until: .now.advanced(by: duration), tolerance: .zero, clock: .continuous)
       try await handler.handle(stream: stream)
     }
   }

+ 4 - 4
Tests/GRPCCoreTests/GRPCClientTests.swift

@@ -40,7 +40,7 @@ final class GRPCClientTests: XCTestCase {
         transport: inProcess.client,
         interceptorPipeline: interceptorPipeline
       ) { client in
-        try await Task.sleep(for: .milliseconds(100))
+        try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
         try await body(client, server)
       }
     }
@@ -341,7 +341,7 @@ final class GRPCClientTests: XCTestCase {
       let task = Task {
         try await client.clientStreaming(
           request: StreamingClientRequest { writer in
-            try await Task.sleep(for: .seconds(5))
+            try await Task.sleep(for: .seconds(5), tolerance: .zero)
           },
           descriptor: BinaryEcho.Methods.collect,
           serializer: IdentitySerializer(),
@@ -382,7 +382,7 @@ final class GRPCClientTests: XCTestCase {
     // Run the client.
     let task = Task { try await client.runConnections() }
     // Make sure the client is run for the first time here.
-    try await Task.sleep(for: .milliseconds(10))
+    try await Task.sleep(for: .milliseconds(10), tolerance: .zero)
 
     // Client is already running, should throw an error.
     await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
@@ -545,7 +545,7 @@ struct ClientTests {
       }
 
       // Make sure both server and client are running
-      try await Task.sleep(for: .milliseconds(100))
+      try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
       try await body(client, server)
       client.beginGracefulShutdown()
       server.beginGracefulShutdown()

+ 1 - 1
Tests/GRPCCoreTests/Internal/Result+CatchingTests.swift

@@ -21,7 +21,7 @@ import XCTest
 final class ResultCatchingTests: XCTestCase {
   func testResultCatching() async {
     let result = await Result {
-      try? await Task.sleep(nanoseconds: 1)
+      try? await Task.sleep(for: .nanoseconds(1), tolerance: .zero)
       throw RPCError(code: .unknown, message: "foo")
     }
 

+ 7 - 7
Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

@@ -62,7 +62,7 @@ final class InProcessClientTransportTests: XCTestCase {
         try await client.connect()
       }
       group.addTask {
-        try await Task.sleep(for: .milliseconds(100))
+        try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
       }
 
       try await group.next()
@@ -97,7 +97,7 @@ final class InProcessClientTransportTests: XCTestCase {
         try await client.connect()
       }
       group.addTask {
-        try await Task.sleep(for: .milliseconds(100))
+        try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
       }
 
       try await group.next()
@@ -121,7 +121,7 @@ final class InProcessClientTransportTests: XCTestCase {
       group.addTask {
         // Add a sleep to make sure connection happens after `withStream` has been called,
         // to test pending streams are handled correctly.
-        try await Task.sleep(for: .milliseconds(100))
+        try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
         try await client.connect()
       }
 
@@ -171,7 +171,7 @@ final class InProcessClientTransportTests: XCTestCase {
       }
 
       group.addTask {
-        try await Task.sleep(for: .milliseconds(100))
+        try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
         client.beginGracefulShutdown()
       }
 
@@ -252,18 +252,18 @@ final class InProcessClientTransportTests: XCTestCase {
 
       group.addTask {
         try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
-          try await Task.sleep(for: .milliseconds(100))
+          try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
         }
       }
 
       group.addTask {
         try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
-          try await Task.sleep(for: .milliseconds(100))
+          try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
         }
       }
 
       group.addTask {
-        try await Task.sleep(for: .milliseconds(50))
+        try await Task.sleep(for: .milliseconds(50), tolerance: .zero)
         client.beginGracefulShutdown()
       }