Browse Source

Avoid task group overheaed if there is no timeout (#1968)

Motivation:

Task groups and tasks come with associated costs. If we can avoid using
them we should. The server knows when receiving a request whether it
needs to enforce a deadline on that RPC, if it doesn't then it can avoid
using a task group and save some allocations.

Modifications:

- Avoid creating a task group if there's no timeout

Result:

Fewer allocations
George Barnett 1 year ago
parent
commit
2ad120702e
1 changed files with 44 additions and 6 deletions
  1. 44 6
      Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

+ 44 - 6
Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

@@ -71,15 +71,53 @@ struct ServerRPCExecutor {
     handler: @escaping @Sendable (
     handler: @escaping @Sendable (
       _ request: ServerRequest.Stream<Input>
       _ request: ServerRequest.Stream<Input>
     ) async throws -> ServerResponse.Stream<Output>
     ) async throws -> ServerResponse.Stream<Output>
+  ) async {
+    if let timeout = metadata.timeout {
+      await Self._processRPCWithTimeout(
+        timeout: timeout,
+        method: method,
+        metadata: metadata,
+        inbound: inbound,
+        outbound: outbound,
+        deserializer: deserializer,
+        serializer: serializer,
+        interceptors: interceptors,
+        handler: handler
+      )
+    } else {
+      await Self._processRPC(
+        method: method,
+        metadata: metadata,
+        inbound: inbound,
+        outbound: outbound,
+        deserializer: deserializer,
+        serializer: serializer,
+        interceptors: interceptors,
+        handler: handler
+      )
+    }
+  }
+
+  @inlinable
+  static func _processRPCWithTimeout<Input, Output>(
+    timeout: Duration,
+    method: MethodDescriptor,
+    metadata: Metadata,
+    inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart>.AsyncIterator>,
+    outbound: RPCWriter<RPCResponsePart>.Closable,
+    deserializer: some MessageDeserializer<Input>,
+    serializer: some MessageSerializer<Output>,
+    interceptors: [any ServerInterceptor],
+    handler: @escaping @Sendable (
+      ServerRequest.Stream<Input>
+    ) async throws -> ServerResponse.Stream<Output>
   ) async {
   ) async {
     await withTaskGroup(of: ServerExecutorTask.self) { group in
     await withTaskGroup(of: ServerExecutorTask.self) { group in
-      if let timeout = metadata.timeout {
-        group.addTask {
-          let result = await Result {
-            try await Task.sleep(for: timeout, clock: .continuous)
-          }
-          return .timedOut(result)
+      group.addTask {
+        let result = await Result {
+          try await Task.sleep(for: timeout, clock: .continuous)
         }
         }
+        return .timedOut(result)
       }
       }
 
 
       group.addTask {
       group.addTask {