Browse Source

Add more properties to `ClientContext` and have the `ClientTransport` provide it (#2158)

The `ServerTransport` provides the `ServerContext`, as it contains
information that only the transport knows about (such as the remote
peer's address).

For consistency and to allow the `ClientContext` to also hold some
additional information (such as remote and local peer descriptions),
this PR changes the `ClientTransport` protocol so that implementations
also provide the corresponding `ClientContext`.

This PR also adds additional information to the context (which will be
used by the tracing interceptor but can be useful for users in general):
remote and local peer addresses, server hostname, and network transport.
Gus Cairo 1 year ago
parent
commit
1fb1626490

+ 1 - 0
Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift

@@ -45,6 +45,7 @@ extension VariableDescription {
   /// static let descriptor = GRPCCore.MethodDescriptor(
   ///   service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: "<literalFullyQualifiedService>"),
   ///   method: "<literalMethodName>"
+  /// )
   /// ```
   package static func methodDescriptor(
     accessModifier: AccessModifier? = nil,

+ 33 - 1
Sources/GRPCCore/Call/Client/ClientContext.swift

@@ -19,8 +19,40 @@ public struct ClientContext: Sendable {
   /// A description of the method being called.
   public var descriptor: MethodDescriptor
 
+  /// A description of the remote peer.
+  ///
+  /// The format of the description should follow the pattern "<transport>:<address>" where
+  /// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
+  /// "in-process"). This is a guideline for how descriptions should be formatted; different
+  /// implementations may not follow this format so you shouldn't make assumptions based on it.
+  ///
+  /// Some examples include:
+  /// - "ipv4:127.0.0.1:31415",
+  /// - "ipv6:[::1]:443",
+  /// - "in-process:27182".
+  public var remotePeer: String
+
+  /// A description of the local peer.
+  ///
+  /// The format of the description should follow the pattern "<transport>:<address>" where
+  /// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
+  /// "in-process"). This is a guideline for how descriptions should be formatted; different
+  /// implementations may not follow this format so you shouldn't make assumptions based on it.
+  ///
+  /// Some examples include:
+  /// - "ipv4:127.0.0.1:31415",
+  /// - "ipv6:[::1]:443",
+  /// - "in-process:27182".
+  public var localPeer: String
+
   /// Create a new client interceptor context.
-  public init(descriptor: MethodDescriptor) {
+  public init(
+    descriptor: MethodDescriptor,
+    remotePeer: String,
+    localPeer: String
+  ) {
     self.descriptor = descriptor
+    self.remotePeer = remotePeer
+    self.localPeer = localPeer
   }
 }

+ 2 - 2
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

@@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor {
       return try await self.transport.withStream(
         descriptor: method,
         options: options
-      ) { stream -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
+      ) { stream, context -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
         return await withTaskGroup(of: _HedgingAttemptTaskResult<R, Output>.self) { group in
           group.addTask {
             do {
@@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor {
 
               let response = await ClientRPCExecutor._execute(
                 in: &group,
+                context: context,
                 request: request,
-                method: method,
                 attempt: attempt,
                 serializer: self.serializer,
                 deserializer: self.deserializer,

+ 5 - 2
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

@@ -98,11 +98,14 @@ extension ClientRPCExecutor.OneShotExecutor {
   ) async -> Result<R, any Error> {
     return await withTaskGroup(of: Void.self, returning: Result<R, any Error>.self) { group in
       do {
-        return try await self.transport.withStream(descriptor: method, options: options) { stream in
+        return try await self.transport.withStream(
+          descriptor: method,
+          options: options
+        ) { stream, context in
           let response = await ClientRPCExecutor._execute(
             in: &group,
+            context: context,
             request: request,
-            method: method,
             attempt: 1,
             serializer: self.serializer,
             deserializer: self.deserializer,

+ 4 - 2
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

@@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor {
           let attemptResult = try await self.transport.withStream(
             descriptor: method,
             options: options
-          ) { stream in
+          ) { stream, context in
             group.addTask {
               var metadata = request.metadata
               // Work out the timeout from the deadline.
@@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor {
               }
 
               return await self.executeAttempt(
+                context: context,
                 stream: stream,
                 metadata: metadata,
                 retryStream: retry.stream,
@@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {
 
   @inlinable
   func executeAttempt<R: Sendable>(
+    context: ClientContext,
     stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
     metadata: Metadata,
     retryStream: BroadcastAsyncSequence<Input>,
@@ -211,8 +213,8 @@ extension ClientRPCExecutor.RetryExecutor {
 
       let response = await ClientRPCExecutor._execute(
         in: &group,
+        context: context,
         request: request,
-        method: method,
         attempt: attempt,
         serializer: self.serializer,
         deserializer: self.deserializer,

+ 3 - 3
Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

@@ -104,25 +104,25 @@ extension ClientRPCExecutor {
   ///
   /// - Parameters:
   ///   - request: The request to execute.
-  ///   - method: A description of the method to execute the request against.
+  ///   - context: The ``ClientContext`` related to this request.
   ///   - attempt: The attempt number of the request.
   ///   - serializer: A serializer to convert input messages to bytes.
   ///   - deserializer: A deserializer to convert bytes to output messages.
   ///   - interceptors: An array of interceptors which the request and response pass through. The
   ///       interceptors will be called in the order of the array.
+  ///   - stream: The stream to excecute the RPC on.
   /// - Returns: The deserialized response.
   @inlinable  // would be private
   static func _execute<Input: Sendable, Output: Sendable>(
     in group: inout TaskGroup<Void>,
+    context: ClientContext,
     request: StreamingClientRequest<Input>,
-    method: MethodDescriptor,
     attempt: Int,
     serializer: some MessageSerializer<Input>,
     deserializer: some MessageDeserializer<Output>,
     interceptors: [any ClientInterceptor],
     stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
   ) async -> StreamingClientResponse<Output> {
-    let context = ClientContext(descriptor: method)
 
     if interceptors.isEmpty {
       return await ClientStreamExecutor.execute(

+ 3 - 3
Sources/GRPCCore/Transport/ClientTransport.swift

@@ -47,7 +47,7 @@ public protocol ClientTransport: Sendable {
   /// running ``connect()``.
   func beginGracefulShutdown()
 
-  /// Opens a stream using the transport, and uses it as input into a user-provided closure.
+  /// Opens a stream using the transport, and uses it as input into a user-provided closure alongisde the given context.
   ///
   /// - Important: The opened stream is closed after the closure is finished.
   ///
@@ -59,12 +59,12 @@ public protocol ClientTransport: Sendable {
   /// - Parameters:
   ///   - descriptor: A description of the method to open a stream for.
   ///   - options: Options specific to the stream.
-  ///   - closure: A closure that takes the opened stream as parameter.
+  ///   - closure: A closure that takes the opened stream and the client context as its parameters.
   /// - Returns: Whatever value was returned from `closure`.
   func withStream<T: Sendable>(
     descriptor: MethodDescriptor,
     options: CallOptions,
-    _ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
+    _ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
   ) async throws -> T
 
   /// Returns the configuration for a given method.

+ 14 - 4
Sources/GRPCInProcessTransport/InProcessTransport+Client.swift

@@ -103,19 +103,23 @@ extension InProcessTransport {
 
     private let methodConfig: MethodConfigs
     private let state: Mutex<State>
+    private let peer: String
 
     /// Creates a new in-process client transport.
     ///
     /// - Parameters:
     ///   - server: The in-process server transport to connect to.
     ///   - serviceConfig: Service configuration.
+    ///   - peer: The system's PID for the running client and server.
     package init(
       server: InProcessTransport.Server,
-      serviceConfig: ServiceConfig = ServiceConfig()
+      serviceConfig: ServiceConfig = ServiceConfig(),
+      peer: String
     ) {
       self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
       self.methodConfig = MethodConfigs(serviceConfig: serviceConfig)
       self.state = Mutex(.unconnected(.init(serverTransport: server)))
+      self.peer = peer
     }
 
     /// Establish and maintain a connection to the remote destination.
@@ -225,12 +229,12 @@ extension InProcessTransport {
     /// - Parameters:
     ///   - descriptor: A description of the method to open a stream for.
     ///   - options: Options specific to the stream.
-    ///   - closure: A closure that takes the opened stream as parameter.
+    ///   - closure: A closure that takes the opened stream and the client context as its parameters.
     /// - Returns: Whatever value was returned from `closure`.
     public func withStream<T>(
       descriptor: MethodDescriptor,
       options: CallOptions,
-      _ closure: (RPCStream<Inbound, Outbound>) async throws -> T
+      _ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
     ) async throws -> T {
       let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self)
       let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
@@ -297,11 +301,17 @@ extension InProcessTransport {
         }
       }
 
+      let clientContext = ClientContext(
+        descriptor: descriptor,
+        remotePeer: self.peer,
+        localPeer: self.peer
+      )
+
       switch acceptStream {
       case .success(let streamID):
         let streamHandlingResult: Result<T, any Error>
         do {
-          let result = try await closure(clientStream)
+          let result = try await closure(clientStream, clientContext)
           streamHandlingResult = .success(result)
         } catch {
           streamHandlingResult = .failure(error)

+ 4 - 1
Sources/GRPCInProcessTransport/InProcessTransport+Server.swift

@@ -34,7 +34,7 @@ extension InProcessTransport {
 
     private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
     private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
-    private let peer: String
+    package let peer: String
 
     private struct State: Sendable {
       private var _nextID: UInt64
@@ -74,6 +74,9 @@ extension InProcessTransport {
     private let handles: Mutex<State>
 
     /// Creates a new instance of ``Server``.
+    ///
+    /// - Parameters:
+    ///   - peer: The system's PID for the running client and server.
     package init(peer: String) {
       (self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
       self.handles = Mutex(State())

+ 1 - 1
Sources/GRPCInProcessTransport/InProcessTransport.swift

@@ -27,6 +27,6 @@ public struct InProcessTransport: Sendable {
   public init(serviceConfig: ServiceConfig = ServiceConfig()) {
     let peer = "in-process:\(System.pid())"
     self.server = Self.Server(peer: peer)
-    self.client = Self.Client(server: self.server, serviceConfig: serviceConfig)
+    self.client = Self.Client(server: self.server, serviceConfig: serviceConfig, peer: peer)
   }
 }

+ 2 - 2
Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift

@@ -1,5 +1,5 @@
 /*
- * Copyright 2023, gRPC Authors All rights reserved.
+ * Copyright 2023-2025, gRPC Authors All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +21,6 @@ extension InProcessTransport.Server {
   func spawnClientTransport(
     throttle: RetryThrottle = RetryThrottle(maxTokens: 10, tokenRatio: 0.1)
   ) -> InProcessTransport.Client {
-    return InProcessTransport.Client(server: self)
+    return InProcessTransport.Client(server: self, peer: self.peer)
   }
 }

+ 15 - 15
Tests/GRPCCoreTests/GRPCServerTests.swift

@@ -48,7 +48,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.get,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
         await stream.outbound.finish()
@@ -75,7 +75,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.collect,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message([3]))
         try await stream.outbound.write(.message([1]))
@@ -106,7 +106,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.expand,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
         await stream.outbound.finish()
@@ -135,7 +135,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.update,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         for byte in [3, 1, 4, 1, 5] as [UInt8] {
           try await stream.outbound.write(.message([byte]))
@@ -166,7 +166,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         await stream.outbound.finish()
 
@@ -187,7 +187,7 @@ final class GRPCServerTests: XCTestCase {
             try await client.withStream(
               descriptor: BinaryEcho.Methods.get,
               options: .defaults
-            ) { stream in
+            ) { stream, _ in
               try await stream.outbound.write(.metadata([:]))
               try await stream.outbound.write(.message([i]))
               await stream.outbound.finish()
@@ -225,7 +225,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.get,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         await stream.outbound.finish()
 
@@ -250,7 +250,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         await stream.outbound.finish()
 
@@ -277,7 +277,7 @@ final class GRPCServerTests: XCTestCase {
         try await client.withStream(
           descriptor: BinaryEcho.Methods.get,
           options: .defaults
-        ) { stream in
+        ) { stream, _ in
           XCTFail("Stream shouldn't be opened")
         }
       } errorHandler: { error in
@@ -291,7 +291,7 @@ final class GRPCServerTests: XCTestCase {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.update,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         var iterator = stream.inbound.makeAsyncIterator()
         // Don't need to validate the response, just that the server is running.
@@ -364,7 +364,7 @@ final class GRPCServerTests: XCTestCase {
     try await transport.withStream(
       descriptor: BinaryEcho.Methods.get,
       options: .defaults
-    ) { stream in
+    ) { stream, _ in
       try await stream.outbound.write(.metadata([:]))
       try await stream.outbound.write(.message([0]))
       await stream.outbound.finish()
@@ -407,7 +407,7 @@ struct ServerTests {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.get,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message(Array("hello".utf8)))
         await stream.outbound.finish()
@@ -437,7 +437,7 @@ struct ServerTests {
       try await client.withStream(
         descriptor: HelloWorld.Methods.sayHello,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message(Array("Swift".utf8)))
         await stream.outbound.finish()
@@ -494,7 +494,7 @@ struct ServerTests {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.get,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message(Array("hello".utf8)))
         await stream.outbound.finish()
@@ -524,7 +524,7 @@ struct ServerTests {
       try await client.withStream(
         descriptor: BinaryEcho.Methods.collect,
         options: .defaults
-      ) { stream in
+      ) { stream, _ in
         try await stream.outbound.write(.metadata([:]))
         try await stream.outbound.write(.message(Array("hello".utf8)))
         await stream.outbound.finish()

+ 4 - 4
Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift

@@ -24,7 +24,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
     @Sendable (
       _ method: MethodDescriptor,
       _ options: CallOptions,
-      _ body: (RPCStream<Inbound, Outbound>) async throws -> (any Sendable)
+      _ body: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> (any Sendable)
     ) async throws -> Any
   private let _connect: @Sendable () async throws -> Void
   private let _close: @Sendable () -> Void
@@ -34,8 +34,8 @@ struct AnyClientTransport: ClientTransport, Sendable {
   where Transport.Inbound == Inbound, Transport.Outbound == Outbound {
     self._retryThrottle = { transport.retryThrottle }
     self._withStream = { descriptor, options, closure in
-      try await transport.withStream(descriptor: descriptor, options: options) { stream in
-        try await closure(stream) as (any Sendable)
+      try await transport.withStream(descriptor: descriptor, options: options) { stream, context in
+        try await closure(stream, context) as (any Sendable)
       }
     }
 
@@ -67,7 +67,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
   func withStream<T>(
     descriptor: MethodDescriptor,
     options: CallOptions,
-    _ closure: (RPCStream<Inbound, Outbound>) async throws -> T
+    _ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
   ) async throws -> T {
     let result = try await self._withStream(descriptor, options, closure)
     return result as! T

+ 3 - 3
Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift

@@ -54,15 +54,15 @@ struct StreamCountingClientTransport: ClientTransport, Sendable {
   func withStream<T>(
     descriptor: MethodDescriptor,
     options: CallOptions,
-    _ closure: (RPCStream<Inbound, Outbound>) async throws -> T
+    _ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
   ) async throws -> T {
     do {
       return try await self.transport.withStream(
         descriptor: descriptor,
         options: options
-      ) { stream in
+      ) { stream, context in
         self._streamsOpened.increment()
-        return try await closure(stream)
+        return try await closure(stream, context)
       }
     } catch {
       self._streamFailures.increment()

+ 1 - 1
Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift

@@ -44,7 +44,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport {
   func withStream<T>(
     descriptor: MethodDescriptor,
     options: CallOptions,
-    _ closure: (RPCStream<Inbound, Outbound>) async throws -> T
+    _ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
   ) async throws -> T {
     throw RPCError(code: self.code, message: "")
   }

+ 15 - 11
Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

@@ -1,5 +1,5 @@
 /*
- * Copyright 2023, gRPC Authors All rights reserved.
+ * Copyright 2023-2025, gRPC Authors All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -110,7 +110,7 @@ final class InProcessClientTransportTests: XCTestCase {
 
     try await withThrowingTaskGroup(of: Void.self) { group in
       group.addTask {
-        try await client.withStream(descriptor: .testTest, options: .defaults) { _ in
+        try await client.withStream(descriptor: .testTest, options: .defaults) { _, _ in
           // Once the pending stream is opened, close the client to new connections,
           // so that, once this closure is executed and this stream is closed,
           // the client will return from `connect()`.
@@ -135,7 +135,7 @@ final class InProcessClientTransportTests: XCTestCase {
     client.beginGracefulShutdown()
 
     await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
-      try await client.withStream(descriptor: .testTest, options: .defaults) { _ in }
+      try await client.withStream(descriptor: .testTest, options: .defaults) { _, _ in }
     } errorHandler: { error in
       XCTAssertEqual(error.code, .failedPrecondition)
     }
@@ -151,7 +151,7 @@ final class InProcessClientTransportTests: XCTestCase {
       }
 
       group.addTask {
-        try await client.withStream(descriptor: .testTest, options: .defaults) { stream in
+        try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
           try await stream.outbound.write(.message([1]))
           await stream.outbound.finish()
           let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
@@ -198,9 +198,11 @@ final class InProcessClientTransportTests: XCTestCase {
       ]
     )
 
+    let peer = "in-process:1234"
     var client = InProcessTransport.Client(
-      server: InProcessTransport.Server(peer: "in-process:1234"),
-      serviceConfig: serviceConfig
+      server: InProcessTransport.Server(peer: peer),
+      serviceConfig: serviceConfig,
+      peer: peer
     )
 
     let firstDescriptor = MethodDescriptor(fullyQualifiedService: "test", method: "first")
@@ -223,8 +225,9 @@ final class InProcessClientTransportTests: XCTestCase {
     )
     serviceConfig.methodConfig.append(overrideConfiguration)
     client = InProcessTransport.Client(
-      server: InProcessTransport.Server(peer: "in-process:1234"),
-      serviceConfig: serviceConfig
+      server: InProcessTransport.Server(peer: peer),
+      serviceConfig: serviceConfig,
+      peer: peer
     )
 
     let secondDescriptor = MethodDescriptor(fullyQualifiedService: "test", method: "second")
@@ -248,13 +251,13 @@ final class InProcessClientTransportTests: XCTestCase {
       }
 
       group.addTask {
-        try await client.withStream(descriptor: .testTest, options: .defaults) { stream in
+        try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
           try await Task.sleep(for: .milliseconds(100))
         }
       }
 
       group.addTask {
-        try await client.withStream(descriptor: .testTest, options: .defaults) { stream in
+        try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
           try await Task.sleep(for: .milliseconds(100))
         }
       }
@@ -290,7 +293,8 @@ final class InProcessClientTransportTests: XCTestCase {
 
     return InProcessTransport.Client(
       server: server,
-      serviceConfig: serviceConfig
+      serviceConfig: serviceConfig,
+      peer: server.peer
     )
   }
 }

+ 1 - 1
dev/license-check.sh

@@ -88,7 +88,7 @@ check_copyright_headers() {
 
     actual_sha=$(head -n "$((drop_first + expected_lines))" "$filename" \
       | tail -n "$expected_lines" \
-      | sed -e 's/201[56789]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' \
+      | sed -e 's/20[12][0-9]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' \
       | shasum \
       | awk '{print $1}')