Browse Source

Remove lazy connect parameter from client transport (#1870)

Motivation:

The client transport has a parameter to enabled lazy connectivity. If
set the transport shouldn't create a connection until a stream is
created. However, this isn't applicable for all transports, instead it
should be configured on concrete transport types.

Modifications:

- Remove 'lazily' parameter

Result:

Better API
George Barnett 1 year ago
parent
commit
5dc50ed703

+ 1 - 1
Sources/GRPCCore/GRPCClient.swift

@@ -195,7 +195,7 @@ public struct GRPCClient: Sendable {
     }
 
     do {
-      try await self.transport.connect(lazily: false)
+      try await self.transport.connect()
     } catch {
       throw RuntimeError(
         code: .transportError,

+ 2 - 8
Sources/GRPCCore/Transport/ClientTransport.swift

@@ -36,13 +36,7 @@ public protocol ClientTransport: Sendable {
   /// maintains connections. The function exits when all open streams have been closed and new connections
   /// are no longer required by the caller who signals this by calling ``close()``, or by cancelling the
   /// task this function runs in.
-  ///
-  /// - Parameter lazily: Whether the transport should establish connections lazily, that is,
-  ///     when the first stream is opened or eagerly, when this function is called. If `false`
-  ///     then the transport should attempt to establish a connection immediately. Note that
-  ///     this is a _hint_: transports aren't required to respect this value and you should
-  ///     refer to the documentation of the transport you're using to check whether it's supported.
-  func connect(lazily: Bool) async throws
+  func connect() async throws
 
   /// Signal to the transport that no new streams may be created.
   ///
@@ -50,7 +44,7 @@ public protocol ClientTransport: Sendable {
   /// should result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown.
   ///
   /// If you want to forcefully cancel all active streams then cancel the task
-  /// running ``connect(lazily:)``.
+  /// running ``connect()``.
   func close()
 
   /// Opens a stream using the transport, and uses it as input into a user-provided closure.

+ 7 - 9
Sources/GRPCInProcessTransport/InProcessClientTransport.swift

@@ -26,13 +26,13 @@ import GRPCCore
 /// ``ClientRPCExecutionConfiguration``s which are specific, per-method configurations for your
 /// transport.
 ///
-/// Once you have a client, you must keep a long-running task executing ``connect(lazily:)``, which
+/// Once you have a client, you must keep a long-running task executing ``connect()``, which
 /// will return only once all streams have been finished and ``close()`` has been called on this client; or
 /// when the containing task is cancelled.
 ///
 /// To execute requests using this client, use ``withStream(descriptor:_:)``. If this function is
-/// called before ``connect(lazily:)`` is called, then any streams will remain pending and the call will
-/// block until ``connect(lazily:)`` is called or the task is cancelled.
+/// called before ``connect()`` is called, then any streams will remain pending and the call will
+/// block until ``connect()`` is called or the task is cancelled.
 ///
 /// - SeeAlso: ``ClientTransport``
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
@@ -127,9 +127,7 @@ public struct InProcessClientTransport: ClientTransport {
   /// maintains connections. The function exits when all open streams have been closed and new connections
   /// are no longer required by the caller who signals this by calling ``close()``, or by cancelling the
   /// task this function runs in.
-  ///
-  /// - Parameter lazily: This parameter is ignored in this implementation.
-  public func connect(lazily: Bool) async throws {
+  public func connect() async throws {
     let (stream, continuation) = AsyncStream<Void>.makeStream()
     try self.state.withLockedValue { state in
       switch state {
@@ -157,7 +155,7 @@ public struct InProcessClientTransport: ClientTransport {
     }
 
     for await _ in stream {
-      // This for-await loop will exit (and thus `connect(lazily:)` will return)
+      // This for-await loop will exit (and thus `connect()` will return)
       // only when the task is cancelled, or when the stream's continuation is
       // finished - whichever happens first.
       // The continuation will be finished when `close()` is called and there
@@ -190,7 +188,7 @@ public struct InProcessClientTransport: ClientTransport {
   /// Existing streams may run to completion naturally but calling ``withStream(descriptor:_:)``
   /// will result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown.
   ///
-  /// If you want to forcefully cancel all active streams then cancel the task running ``connect(lazily:)``.
+  /// If you want to forcefully cancel all active streams then cancel the task running ``connect()``.
   public func close() {
     let maybeContinuation: AsyncStream<Void>.Continuation? = self.state.withLockedValue { state in
       switch state {
@@ -220,7 +218,7 @@ public struct InProcessClientTransport: ClientTransport {
   /// is closing or has been closed.
   ///
   ///   This implementation will queue any streams (and thus block this call) if this function is called before
-  ///   ``connect(lazily:)``, until a connection is established - at which point all streams will be
+  ///   ``connect()``, until a connection is established - at which point all streams will be
   ///   created.
   ///
   /// - Parameters:

+ 1 - 1
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift

@@ -132,7 +132,7 @@ struct ClientRPCExecutorTestHarness {
       }
 
       group.addTask {
-        try await self.clientTransport.connect(lazily: false)
+        try await self.clientTransport.connect()
       }
 
       // Execute the request.

+ 3 - 3
Tests/GRPCCoreTests/GRPCServerTests.swift

@@ -38,7 +38,7 @@ final class GRPCServerTests: XCTestCase {
       }
 
       group.addTask {
-        try await inProcess.client.connect(lazily: true)
+        try await inProcess.client.connect()
       }
 
       try await body(inProcess.client, server)
@@ -325,7 +325,7 @@ final class GRPCServerTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try? await inProcess.client.connect(lazily: true)
+        try? await inProcess.client.connect()
       }
 
       try await self.doEchoGet(using: inProcess.client)
@@ -388,7 +388,7 @@ final class GRPCServerTests: XCTestCase {
     // other transport to throw. This stream should be failed by the server.
     await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await inProcess.client.connect(lazily: true)
+        try await inProcess.client.connect()
       }
 
       group.addTask {

+ 5 - 5
Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift

@@ -27,7 +27,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
       _ options: CallOptions,
       _ body: (RPCStream<Inbound, Outbound>) async throws -> Any
     ) async throws -> Any
-  private let _connect: @Sendable (Bool) async throws -> Void
+  private let _connect: @Sendable () async throws -> Void
   private let _close: @Sendable () -> Void
   private let _configuration: @Sendable (MethodDescriptor) -> MethodConfig?
 
@@ -40,8 +40,8 @@ struct AnyClientTransport: ClientTransport, Sendable {
       }
     }
 
-    self._connect = { lazily in
-      try await transport.connect(lazily: lazily)
+    self._connect = {
+      try await transport.connect()
     }
 
     self._close = {
@@ -57,8 +57,8 @@ struct AnyClientTransport: ClientTransport, Sendable {
     self._retryThrottle()
   }
 
-  func connect(lazily: Bool) async throws {
-    try await self._connect(lazily)
+  func connect() async throws {
+    try await self._connect()
   }
 
   func close() {

+ 2 - 2
Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift

@@ -43,8 +43,8 @@ struct StreamCountingClientTransport: ClientTransport, Sendable {
     self.transport.retryThrottle
   }
 
-  func connect(lazily: Bool) async throws {
-    try await self.transport.connect(lazily: lazily)
+  func connect() async throws {
+    try await self.transport.connect()
   }
 
   func close() {

+ 1 - 1
Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift

@@ -28,7 +28,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport {
 
   let retryThrottle: RetryThrottle? = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
 
-  func connect(lazily: Bool) async throws {
+  func connect() async throws {
     // no-op
   }
 

+ 10 - 10
Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

@@ -27,11 +27,11 @@ final class InProcessClientTransportTests: XCTestCase {
 
     await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
 
       group.addTask {
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
 
       await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
@@ -49,7 +49,7 @@ final class InProcessClientTransportTests: XCTestCase {
     client.close()
 
     await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
-      try await client.connect(lazily: false)
+      try await client.connect()
     } errorHandler: { error in
       XCTAssertEqual(error.code, .failedPrecondition)
     }
@@ -60,7 +60,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
       group.addTask {
         try await Task.sleep(for: .milliseconds(100))
@@ -70,7 +70,7 @@ final class InProcessClientTransportTests: XCTestCase {
       group.cancelAll()
 
       await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
-        try await client.connect(lazily: false)
+        try await client.connect()
       } errorHandler: { error in
         XCTAssertEqual(error.code, .failedPrecondition)
       }
@@ -95,7 +95,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
       group.addTask {
         try await Task.sleep(for: .milliseconds(100))
@@ -117,7 +117,7 @@ final class InProcessClientTransportTests: XCTestCase {
         ) { _ in
           // Once the pending stream is opened, close the client to new connections,
           // so that, once this closure is executed and this stream is closed,
-          // the client will return from `connect(lazily:)`.
+          // the client will return from `connect()`.
           client.close()
         }
       }
@@ -126,7 +126,7 @@ final class InProcessClientTransportTests: XCTestCase {
         // Add a sleep to make sure connection happens after `withStream` has been called,
         // to test pending streams are handled correctly.
         try await Task.sleep(for: .milliseconds(100))
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
 
       try await group.waitForAll()
@@ -154,7 +154,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
 
       group.addTask {
@@ -254,7 +254,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await client.connect(lazily: false)
+        try await client.connect()
       }
 
       group.addTask {