Browse Source

Move in-process transport to own module (#1730)

Motivation:

The in-process transport is mostly helpful for testing, it should
therefore not be built unnecessarily and should live in its own module.

Modifications:

- Move the in-process transport and its tests to their own modules

Results:

The in-process transport lives in its own module.
George Barnett 2 years ago
parent
commit
1c33b78518

+ 23 - 4
Package.swift

@@ -133,6 +133,7 @@ extension Target.Dependency {
   static let atomics: Self = .product(name: "Atomics", package: "swift-atomics")
 
   static let grpcCore: Self = .target(name: "GRPCCore")
+  static let grpcInProcessTransport: Self = .target(name: "GRPCInProcessTransport")
 }
 
 // MARK: - Targets
@@ -170,6 +171,13 @@ extension Target {
     path: "Sources/GRPCCore"
   )
 
+  static let grpcInProcessTransport: Target = .target(
+    name: "GRPCInProcessTransport",
+    dependencies: [
+      .grpcCore
+    ]
+  )
+
   static let cgrpcZlib: Target = .target(
     name: cgrpcZlibTargetName,
     path: "Sources/CGRPCZlib",
@@ -230,18 +238,27 @@ extension Target {
     name: "GRPCCoreTests",
     dependencies: [
       .grpcCore,
+      .grpcInProcessTransport,
       .dequeModule,
       .atomics
     ]
   )
 
+  static let grpcInProcessTransportTests: Target = .testTarget(
+    name: "GRPCInProcessTransportTests",
+    dependencies: [
+      .grpcCore,
+      .grpcInProcessTransport,
+    ]
+  )
+
   static let grpcCodeGenTests: Target = .testTarget(
     name: "GRPCCodeGenTests",
     dependencies: [
       .grpcCodeGen
     ]
   )
-  
+
   static let interopTestModels: Target = .target(
     name: "GRPCInteroperabilityTestModels",
     dependencies: [
@@ -468,7 +485,7 @@ extension Target {
       "v1Alpha/reflection-v1alpha.proto"
     ]
   )
-  
+
   static let reflectionServer: Target = .executableTarget(
     name: "ReflectionServer",
     dependencies: [
@@ -486,7 +503,7 @@ extension Target {
       .copy("Generated")
     ]
   )
-  
+
   static let grpcCodeGen: Target = .target(
     name: "GRPCCodeGen",
     path: "Sources/GRPCCodeGen"
@@ -500,7 +517,7 @@ extension Product {
     name: grpcProductName,
     targets: [grpcTargetName]
   )
-    
+
   static let grpcCore: Product = .library(
     name: "_GRPCCore",
     targets: ["GRPCCore"]
@@ -566,10 +583,12 @@ let package = Package(
 
     // v2
     .grpcCore,
+    .grpcInProcessTransport,
     .grpcCodeGen,
 
     // v2 tests
     .grpcCoreTests,
+    .grpcInProcessTransportTests,
     .grpcCodeGenTests
   ]
 )

+ 7 - 4
Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift

@@ -237,19 +237,22 @@ extension UnsafeMutablePointer {
 }
 
 @usableFromInline
-struct LockedValueBox<Value> {
+internal typealias LockedValueBox<Value> = _LockedValueBox<Value>
+
+// TODO: Use 'package' ACL when 5.9 is the minimum Swift version.
+public struct _LockedValueBox<Value> {
   @usableFromInline
   let storage: LockStorage<Value>
 
   @inlinable
-  init(_ value: Value) {
+  public init(_ value: Value) {
     self.storage = .create(value: value)
   }
 
   @inlinable
-  func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
+  public func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
     return try self.storage.withLockedValue(mutate)
   }
 }
 
-extension LockedValueBox: Sendable where Value: Sendable {}
+extension _LockedValueBox: Sendable where Value: Sendable {}

+ 8 - 0
Sources/GRPCCore/Streaming/Internal/RPCAsyncSequence+Buffered.swift

@@ -28,4 +28,12 @@ extension RPCAsyncSequence {
 
     return (RPCAsyncSequence(wrapping: stream), RPCWriter.Closable(wrapping: continuation))
   }
+
+  @inlinable
+  public static func _makeBackpressuredStream(
+    of elementType: Element.Type = Element.self,
+    watermarks: (low: Int, high: Int)
+  ) -> (stream: Self, writer: RPCWriter<Element>.Closable) {
+    return Self.makeBackpressuredStream(of: elementType, watermarks: watermarks)
+  }
 }

+ 11 - 9
Sources/GRPCCore/Transport/InProcessClientTransport.swift → Sources/GRPCInProcessTransport/InProcessClientTransport.swift

@@ -14,7 +14,8 @@
  * limitations under the License.
  */
 
-@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+import GRPCCore
+
 /// An in-process implementation of a ``ClientTransport``.
 ///
 /// This is useful when you're interested in testing your application without any actual networking layers
@@ -34,6 +35,7 @@
 /// block until ``connect(lazily:)`` is called or the task is cancelled.
 ///
 /// - SeeAlso: ``ClientTransport``
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 public struct InProcessClientTransport: ClientTransport {
   private enum State: Sendable {
     struct UnconnectedState {
@@ -96,16 +98,16 @@ public struct InProcessClientTransport: ClientTransport {
 
   public let retryThrottle: RetryThrottle
 
-  private let executionConfigurations: MethodConfigurations
-  private let state: LockedValueBox<State>
+  private let methodConfiguration: MethodConfigurations
+  private let state: _LockedValueBox<State>
 
   public init(
     server: InProcessServerTransport,
-    executionConfigurations: MethodConfigurations
+    methodConfiguration: MethodConfigurations = MethodConfigurations()
   ) {
     self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
-    self.executionConfigurations = executionConfigurations
-    self.state = LockedValueBox(.unconnected(.init(serverTransport: server)))
+    self.methodConfiguration = methodConfiguration
+    self.state = _LockedValueBox(.unconnected(.init(serverTransport: server)))
   }
 
   /// Establish and maintain a connection to the remote destination.
@@ -222,8 +224,8 @@ public struct InProcessClientTransport: ClientTransport {
     descriptor: MethodDescriptor,
     _ closure: (RPCStream<Inbound, Outbound>) async throws -> T
   ) async throws -> T {
-    let request = RPCAsyncSequence<RPCRequestPart>.makeBackpressuredStream(watermarks: (16, 32))
-    let response = RPCAsyncSequence<RPCResponsePart>.makeBackpressuredStream(watermarks: (16, 32))
+    let request = RPCAsyncSequence<RPCRequestPart>._makeBackpressuredStream(watermarks: (16, 32))
+    let response = RPCAsyncSequence<RPCResponsePart>._makeBackpressuredStream(watermarks: (16, 32))
 
     let clientStream = RPCStream(
       descriptor: descriptor,
@@ -330,6 +332,6 @@ public struct InProcessClientTransport: ClientTransport {
   public func executionConfiguration(
     forMethod descriptor: MethodDescriptor
   ) -> MethodConfiguration? {
-    self.executionConfigurations[descriptor]
+    self.methodConfiguration[descriptor]
   }
 }

+ 3 - 1
Sources/GRPCCore/Transport/InProcessServerTransport.swift → Sources/GRPCInProcessTransport/InProcessServerTransport.swift

@@ -14,7 +14,8 @@
  * limitations under the License.
  */
 
-@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+import GRPCCore
+
 /// An in-process implementation of a ``ServerTransport``.
 ///
 /// This is useful when you're interested in testing your application without any actual networking layers
@@ -25,6 +26,7 @@
 /// To stop listening to new requests, call ``stopListening()``.
 ///
 /// - SeeAlso: ``ClientTransport``
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 public struct InProcessServerTransport: ServerTransport, Sendable {
   public typealias Inbound = RPCAsyncSequence<RPCRequestPart>
   public typealias Outbound = RPCWriter<RPCResponsePart>.Closable

+ 32 - 0
Sources/GRPCInProcessTransport/Internal/AsyncStream+MakeStream.swift

@@ -0,0 +1,32 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if swift(<5.9)
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension AsyncStream {
+  @inlinable
+  static func makeStream(
+    of elementType: Element.Type = Element.self,
+    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
+  ) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
+    var continuation: AsyncStream<Element>.Continuation!
+    let stream = AsyncStream(Element.self, bufferingPolicy: limit) {
+      continuation = $0
+    }
+    return (stream, continuation)
+  }
+}
+#endif

+ 3 - 6
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift

@@ -14,17 +14,14 @@
  * limitations under the License.
  */
 import Atomics
-
-@testable import GRPCCore
+import GRPCCore
+import GRPCInProcessTransport
 
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 extension InProcessServerTransport {
   func spawnClientTransport(
     throttle: RetryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
   ) -> InProcessClientTransport {
-    return InProcessClientTransport(
-      server: self,
-      executionConfigurations: .init()
-    )
+    return InProcessClientTransport(server: self)
   }
 }

+ 1 - 0
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift

@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 import Atomics
+import GRPCInProcessTransport
 import XCTest
 
 @testable import GRPCCore

+ 2 - 1
Tests/GRPCCoreTests/GRPCClientTests.swift

@@ -15,6 +15,7 @@
  */
 import Atomics
 import GRPCCore
+import GRPCInProcessTransport
 import XCTest
 
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
@@ -23,7 +24,7 @@ final class GRPCClientTests: XCTestCase {
     let server = InProcessServerTransport()
     let client = InProcessClientTransport(
       server: server,
-      executionConfigurations: MethodConfigurations()
+      methodConfiguration: MethodConfigurations()
     )
 
     return (client, server)

+ 2 - 4
Tests/GRPCCoreTests/GRPCServerTests.swift

@@ -15,16 +15,14 @@
  */
 import Atomics
 import GRPCCore
+import GRPCInProcessTransport
 import XCTest
 
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 final class GRPCServerTests: XCTestCase {
   func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) {
     let server = InProcessServerTransport()
-    let client = InProcessClientTransport(
-      server: server,
-      executionConfigurations: MethodConfigurations()
-    )
+    let client = InProcessClientTransport(server: server)
 
     return (client, server)
   }

+ 11 - 11
Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift → Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

@@ -14,10 +14,10 @@
  * limitations under the License.
  */
 
+import GRPCCore
+import GRPCInProcessTransport
 import XCTest
 
-@testable import GRPCCore
-
 @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
 final class InProcessClientTransportTests: XCTestCase {
   struct FailTest: Error {}
@@ -34,7 +34,7 @@ final class InProcessClientTransportTests: XCTestCase {
         try await client.connect(lazily: false)
       }
 
-      await XCTAssertThrowsRPCErrorAsync {
+      await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
         try await group.next()
       } errorHandler: { error in
         XCTAssertEqual(error.code, .failedPrecondition)
@@ -48,7 +48,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     client.close()
 
-    await XCTAssertThrowsRPCErrorAsync {
+    await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
       try await client.connect(lazily: false)
     } errorHandler: { error in
       XCTAssertEqual(error.code, .failedPrecondition)
@@ -69,7 +69,7 @@ final class InProcessClientTransportTests: XCTestCase {
       try await group.next()
       group.cancelAll()
 
-      await XCTAssertThrowsRPCErrorAsync {
+      await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
         try await client.connect(lazily: false)
       } errorHandler: { error in
         XCTAssertEqual(error.code, .failedPrecondition)
@@ -135,7 +135,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     client.close()
 
-    await XCTAssertThrowsRPCErrorAsync {
+    await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
       try await client.withStream(descriptor: .init(service: "test", method: "test")) { _ in }
     } errorHandler: { error in
       XCTAssertEqual(error.code, .failedPrecondition)
@@ -155,7 +155,7 @@ final class InProcessClientTransportTests: XCTestCase {
         try await client.withStream(descriptor: .init(service: "test", method: "test")) { stream in
           try await stream.outbound.write(.message([1]))
           stream.outbound.finish()
-          let receivedMessages = try await stream.inbound.collect()
+          let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
 
           XCTAssertEqual(receivedMessages, [.message([42])])
         }
@@ -163,7 +163,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
       group.addTask {
         for try await stream in server.listen() {
-          let receivedMessages = try await stream.inbound.collect()
+          let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
           try await stream.outbound.write(RPCResponsePart.message([42]))
           stream.outbound.finish()
 
@@ -191,7 +191,7 @@ final class InProcessClientTransportTests: XCTestCase {
     var configurations = MethodConfigurations()
     configurations.setDefaultConfiguration(defaultConfiguration)
 
-    var client = InProcessClientTransport(server: .init(), executionConfigurations: configurations)
+    var client = InProcessClientTransport(server: .init(), methodConfiguration: configurations)
 
     let firstDescriptor = MethodDescriptor(service: "test", method: "first")
     XCTAssertEqual(client.executionConfiguration(forMethod: firstDescriptor), defaultConfiguration)
@@ -205,7 +205,7 @@ final class InProcessClientTransportTests: XCTestCase {
     )
     let overrideConfiguration = MethodConfiguration(retryPolicy: retryPolicy)
     configurations[firstDescriptor] = overrideConfiguration
-    client = InProcessClientTransport(server: .init(), executionConfigurations: configurations)
+    client = InProcessClientTransport(server: .init(), methodConfiguration: configurations)
     let secondDescriptor = MethodDescriptor(service: "test", method: "second")
     XCTAssertEqual(client.executionConfiguration(forMethod: firstDescriptor), overrideConfiguration)
     XCTAssertEqual(client.executionConfiguration(forMethod: secondDescriptor), defaultConfiguration)
@@ -259,7 +259,7 @@ final class InProcessClientTransportTests: XCTestCase {
     )
     return InProcessClientTransport(
       server: server,
-      executionConfigurations: methodConfiguration
+      methodConfiguration: methodConfiguration
     )
   }
 }

+ 24 - 6
Tests/GRPCCoreTests/Transport/InProcessServerTransportTests.swift → Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift

@@ -17,13 +17,19 @@
 import XCTest
 
 @testable import GRPCCore
+@testable import GRPCInProcessTransport
 
 final class InProcessServerTransportTests: XCTestCase {
   func testStartListening() async throws {
     let transport = InProcessServerTransport()
     let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
       descriptor: .init(service: "testService", method: "testMethod"),
-      inbound: .elements([.message([42])]),
+      inbound: RPCAsyncSequence(
+        wrapping: AsyncStream {
+          $0.yield(.message([42]))
+          $0.finish()
+        }
+      ),
       outbound: .init(
         wrapping: BufferedStream.Source(
           storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
@@ -37,7 +43,7 @@ final class InProcessServerTransportTests: XCTestCase {
     try transport.acceptStream(stream)
 
     let testStream = try await streamSequenceInterator.next()
-    let messages = try await testStream?.inbound.collect()
+    let messages = try await testStream?.inbound.reduce(into: []) { $0.append($1) }
     XCTAssertEqual(messages, [.message([42])])
   }
 
@@ -47,7 +53,12 @@ final class InProcessServerTransportTests: XCTestCase {
       RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
     >(
       descriptor: .init(service: "testService1", method: "testMethod1"),
-      inbound: .elements([.message([42])]),
+      inbound: RPCAsyncSequence(
+        wrapping: AsyncStream {
+          $0.yield(.message([42]))
+          $0.finish()
+        }
+      ),
       outbound: .init(
         wrapping: BufferedStream.Source(
           storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
@@ -61,7 +72,7 @@ final class InProcessServerTransportTests: XCTestCase {
     try transport.acceptStream(firstStream)
 
     let firstTestStream = try await streamSequenceInterator.next()
-    let firstStreamMessages = try await firstTestStream?.inbound.collect()
+    let firstStreamMessages = try await firstTestStream?.inbound.reduce(into: []) { $0.append($1) }
     XCTAssertEqual(firstStreamMessages, [.message([42])])
 
     transport.stopListening()
@@ -70,7 +81,12 @@ final class InProcessServerTransportTests: XCTestCase {
       RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
     >(
       descriptor: .init(service: "testService1", method: "testMethod1"),
-      inbound: .elements([.message([42])]),
+      inbound: RPCAsyncSequence(
+        wrapping: AsyncStream {
+          $0.yield(.message([42]))
+          $0.finish()
+        }
+      ),
       outbound: .init(
         wrapping: BufferedStream.Source(
           storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
@@ -78,7 +94,9 @@ final class InProcessServerTransportTests: XCTestCase {
       )
     )
 
-    XCTAssertThrowsRPCError(try transport.acceptStream(secondStream)) { error in
+    XCTAssertThrowsError(ofType: RPCError.self) {
+      try transport.acceptStream(secondStream)
+    } errorHandler: { error in
       XCTAssertEqual(error.code, .failedPrecondition)
     }
 

+ 44 - 0
Tests/GRPCInProcessTransportTests/Test Utilities/XCTest+Utilities.swift

@@ -0,0 +1,44 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import XCTest
+
+func XCTAssertThrowsError<T, E: Error>(
+  ofType: E.Type,
+  _ expression: () throws -> T,
+  errorHandler: (E) -> Void
+) {
+  XCTAssertThrowsError(try expression()) { error in
+    guard let error = error as? E else {
+      return XCTFail("Error had unexpected type '\(type(of: error))'")
+    }
+    errorHandler(error)
+  }
+}
+
+func XCTAssertThrowsErrorAsync<T, E: Error>(
+  ofType: E.Type = E.self,
+  _ expression: () async throws -> T,
+  errorHandler: (E) -> Void
+) async {
+  do {
+    _ = try await expression()
+    XCTFail("Expression didn't throw")
+  } catch let error as E {
+    errorHandler(error)
+  } catch {
+    XCTFail("Error had unexpected type '\(type(of: error))'")
+  }
+}