Просмотр исходного кода

Use {request,response}{s,stream} consistently (#1320)

Motivation:

Some of the new async-await APIs are inconsistent in their use of
'requests' vs 'requestStream' and 'responses' vs 'responseStream'. We
should name things consistently.

Modifications:

- Rename 'requests' to 'requestStream' in generated server code
- Rename 'responses' to 'responseStream' in client calls

Result:

More consistent naming.
George Barnett 4 лет назад
Родитель
Сommit
b0ee72670e

+ 2 - 10
.github/workflows/ci.yaml

@@ -3,7 +3,7 @@ on:
   push:
     branches: [main]
   pull_request:
-    branches: [main, async-await, 1.4.1-async-await]
+    branches: [main, 1.6.0-async-await]
 jobs:
   preflight:
     name: License Header and Formatting Checks
@@ -25,7 +25,7 @@ jobs:
       matrix:
         include:
           - image: swift:5.5-focal
-            swift-test-flags: "--enable-test-discovery --sanitize=thread"
+            swift-test-flags: "--enable-test-discovery"
           - image: swift:5.4-focal
             swift-test-flags: "--enable-test-discovery --sanitize=thread"
           - image: swift:5.3-focal
@@ -52,14 +52,6 @@ jobs:
       matrix:
         include:
           - image: swift:5.5-focal
-            env:
-              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 503000
-              MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 215000
-              MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
-              MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
-              MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
-              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 204000
-          - image: swift:5.4-focal
             env:
               MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 504000
               MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 216000

+ 4 - 4
Sources/Examples/Echo/Implementation/EchoAsyncProvider.swift

@@ -45,10 +45,10 @@ public final class EchoAsyncProvider: Echo_EchoAsyncProvider {
   }
 
   public func collect(
-    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    requestStream: GRPCAsyncRequestStream<Echo_EchoRequest>,
     context: GRPCAsyncServerCallContext
   ) async throws -> Echo_EchoResponse {
-    let text = try await requests.reduce(into: "Swift echo collect:") { result, request in
+    let text = try await requestStream.reduce(into: "Swift echo collect:") { result, request in
       result += " \(request.text)"
     }
 
@@ -56,12 +56,12 @@ public final class EchoAsyncProvider: Echo_EchoAsyncProvider {
   }
 
   public func update(
-    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    requestStream: GRPCAsyncRequestStream<Echo_EchoRequest>,
     responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
     context: GRPCAsyncServerCallContext
   ) async throws {
     var counter = 0
-    for try await request in requests {
+    for try await request in requestStream {
       let text = "Swift echo update (\(counter)): \(request.text)"
       try await responseStream.send(.with { $0.text = text })
       counter += 1

+ 4 - 4
Sources/Examples/Echo/Model/echo.grpc.swift

@@ -574,13 +574,13 @@ public protocol Echo_EchoAsyncProvider: CallHandlerProvider {
 
   /// Collects a stream of messages and returns them concatenated when the caller closes.
   @Sendable func collect(
-    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    requestStream: GRPCAsyncRequestStream<Echo_EchoRequest>,
     context: GRPCAsyncServerCallContext
   ) async throws -> Echo_EchoResponse
 
   /// Streams back messages as they are received in an input stream.
   @Sendable func update(
-    requests: GRPCAsyncRequestStream<Echo_EchoRequest>,
+    requestStream: GRPCAsyncRequestStream<Echo_EchoRequest>,
     responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
     context: GRPCAsyncServerCallContext
   ) async throws
@@ -625,7 +625,7 @@ extension Echo_EchoAsyncProvider {
         requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
         responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
         interceptors: self.interceptors?.makeCollectInterceptors() ?? [],
-        wrapping: self.collect(requests:context:)
+        wrapping: self.collect(requestStream:context:)
       )
 
     case "Update":
@@ -634,7 +634,7 @@ extension Echo_EchoAsyncProvider {
         requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
         responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
         interceptors: self.interceptors?.makeUpdateInterceptors() ?? [],
-        wrapping: self.update(requests:responseStream:context:)
+        wrapping: self.update(requestStream:responseStream:context:)
       )
 
     default:

+ 2 - 2
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

@@ -28,7 +28,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
   public let requestStream: GRPCAsyncRequestStreamWriter<Request>
 
   /// The stream of responses from the server.
-  public let responses: GRPCAsyncResponseStream<Response>
+  public let responseStream: GRPCAsyncResponseStream<Response>
 
   /// The options used to make the RPC.
   public var options: CallOptions {
@@ -79,7 +79,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
     self.call = call
     self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
     self.responseSource = PassthroughMessageSource<Response, Error>()
-    self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
+    self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
     self.requestStream = call.makeRequestStreamWriter()
   }
 

+ 2 - 2
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift

@@ -25,7 +25,7 @@ public struct GRPCAsyncServerStreamingCall<Request, Response> {
   private let responseSource: PassthroughMessageSource<Response, Error>
 
   /// The stream of responses from the server.
-  public let responses: GRPCAsyncResponseStream<Response>
+  public let responseStream: GRPCAsyncResponseStream<Response>
 
   /// The options used to make the RPC.
   public var options: CallOptions {
@@ -78,7 +78,7 @@ public struct GRPCAsyncServerStreamingCall<Request, Response> {
     // invoke the `call`.
     self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
     self.responseSource = PassthroughMessageSource<Response, Error>()
-    self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
+    self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
   }
 
   /// We expose this as the only non-private initializer so that the caller

+ 3 - 3
Sources/GRPC/AsyncAwaitSupport/GRPCClient+AsyncAwaitSupport.swift

@@ -197,7 +197,7 @@ extension GRPCClient {
       request: request,
       callOptions: callOptions ?? self.defaultCallOptions,
       interceptors: interceptors
-    ).responses
+    ).responseStream
   }
 
   public func performAsyncServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
@@ -212,7 +212,7 @@ extension GRPCClient {
       request: request,
       callOptions: callOptions ?? self.defaultCallOptions,
       interceptors: interceptors
-    ).responses
+    ).responseStream
   }
 
   public func performAsyncClientStreamingCall<
@@ -441,7 +441,7 @@ extension GRPCClient {
         }
       }
     }
-    return call.responses
+    return call.responseStream
   }
 }
 

+ 68 - 43
Sources/GRPCInteroperabilityTestModels/Generated/test.grpc.swift

@@ -292,7 +292,7 @@ public final class Grpc_Testing_TestServiceClient: Grpc_Testing_TestServiceClien
   }
 }
 
-#if compiler(>=5.5)
+#if compiler(>=5.5) && canImport(_Concurrency)
 /// A simple service to test the various types of RPCs and experiment with
 /// performance with various types of payload.
 @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@@ -355,7 +355,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.TestService/EmptyCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeEmptyCallInterceptors() ?? []
     )
   }
 
@@ -366,7 +367,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.TestService/UnaryCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeUnaryCallInterceptors() ?? []
     )
   }
 
@@ -377,7 +379,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.TestService/CacheableUnaryCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeCacheableUnaryCallInterceptors() ?? []
     )
   }
 
@@ -388,7 +391,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.makeAsyncServerStreamingCall(
       path: "/grpc.testing.TestService/StreamingOutputCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStreamingOutputCallInterceptors() ?? []
     )
   }
 
@@ -397,7 +401,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
   ) -> GRPCAsyncClientStreamingCall<Grpc_Testing_StreamingInputCallRequest, Grpc_Testing_StreamingInputCallResponse> {
     return self.makeAsyncClientStreamingCall(
       path: "/grpc.testing.TestService/StreamingInputCall",
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStreamingInputCallInterceptors() ?? []
     )
   }
 
@@ -406,7 +411,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
   ) -> GRPCAsyncBidirectionalStreamingCall<Grpc_Testing_StreamingOutputCallRequest, Grpc_Testing_StreamingOutputCallResponse> {
     return self.makeAsyncBidirectionalStreamingCall(
       path: "/grpc.testing.TestService/FullDuplexCall",
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeFullDuplexCallInterceptors() ?? []
     )
   }
 
@@ -415,7 +421,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
   ) -> GRPCAsyncBidirectionalStreamingCall<Grpc_Testing_StreamingOutputCallRequest, Grpc_Testing_StreamingOutputCallResponse> {
     return self.makeAsyncBidirectionalStreamingCall(
       path: "/grpc.testing.TestService/HalfDuplexCall",
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeHalfDuplexCallInterceptors() ?? []
     )
   }
 
@@ -426,7 +433,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.TestService/UnimplementedCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeUnimplementedCallInterceptors() ?? []
     )
   }
 }
@@ -440,7 +448,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.TestService/EmptyCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeEmptyCallInterceptors() ?? []
     )
   }
 
@@ -451,7 +460,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.TestService/UnaryCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeUnaryCallInterceptors() ?? []
     )
   }
 
@@ -462,7 +472,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.TestService/CacheableUnaryCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeCacheableUnaryCallInterceptors() ?? []
     )
   }
 
@@ -473,7 +484,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.performAsyncServerStreamingCall(
       path: "/grpc.testing.TestService/StreamingOutputCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStreamingOutputCallInterceptors() ?? []
     )
   }
 
@@ -484,7 +496,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return try await self.performAsyncClientStreamingCall(
       path: "/grpc.testing.TestService/StreamingInputCall",
       requests: requests,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStreamingInputCallInterceptors() ?? []
     )
   }
 
@@ -495,7 +508,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return try await self.performAsyncClientStreamingCall(
       path: "/grpc.testing.TestService/StreamingInputCall",
       requests: requests,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStreamingInputCallInterceptors() ?? []
     )
   }
 
@@ -506,7 +520,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.performAsyncBidirectionalStreamingCall(
       path: "/grpc.testing.TestService/FullDuplexCall",
       requests: requests,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeFullDuplexCallInterceptors() ?? []
     )
   }
 
@@ -517,7 +532,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.performAsyncBidirectionalStreamingCall(
       path: "/grpc.testing.TestService/FullDuplexCall",
       requests: requests,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeFullDuplexCallInterceptors() ?? []
     )
   }
 
@@ -528,7 +544,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.performAsyncBidirectionalStreamingCall(
       path: "/grpc.testing.TestService/HalfDuplexCall",
       requests: requests,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeHalfDuplexCallInterceptors() ?? []
     )
   }
 
@@ -539,7 +556,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return self.performAsyncBidirectionalStreamingCall(
       path: "/grpc.testing.TestService/HalfDuplexCall",
       requests: requests,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeHalfDuplexCallInterceptors() ?? []
     )
   }
 
@@ -550,7 +568,8 @@ extension Grpc_Testing_TestServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.TestService/UnimplementedCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeUnimplementedCallInterceptors() ?? []
     )
   }
 }
@@ -572,7 +591,7 @@ public struct Grpc_Testing_TestServiceAsyncClient: Grpc_Testing_TestServiceAsync
   }
 }
 
-#endif // compiler(>=5.5)
+#endif // compiler(>=5.5) && canImport(_Concurrency)
 
 /// A simple service NOT implemented at servers so clients can test for
 /// that case.
@@ -640,7 +659,7 @@ public final class Grpc_Testing_UnimplementedServiceClient: Grpc_Testing_Unimple
   }
 }
 
-#if compiler(>=5.5)
+#if compiler(>=5.5) && canImport(_Concurrency)
 /// A simple service NOT implemented at servers so clients can test for
 /// that case.
 @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@@ -671,7 +690,8 @@ extension Grpc_Testing_UnimplementedServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.UnimplementedService/UnimplementedCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeUnimplementedCallInterceptors() ?? []
     )
   }
 }
@@ -685,7 +705,8 @@ extension Grpc_Testing_UnimplementedServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.UnimplementedService/UnimplementedCall",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeUnimplementedCallInterceptors() ?? []
     )
   }
 }
@@ -707,7 +728,7 @@ public struct Grpc_Testing_UnimplementedServiceAsyncClient: Grpc_Testing_Unimple
   }
 }
 
-#endif // compiler(>=5.5)
+#endif // compiler(>=5.5) && canImport(_Concurrency)
 
 /// A service used to control reconnect server.
 ///
@@ -800,7 +821,7 @@ public final class Grpc_Testing_ReconnectServiceClient: Grpc_Testing_ReconnectSe
   }
 }
 
-#if compiler(>=5.5)
+#if compiler(>=5.5) && canImport(_Concurrency)
 /// A service used to control reconnect server.
 @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
 public protocol Grpc_Testing_ReconnectServiceAsyncClientProtocol: GRPCClient {
@@ -835,7 +856,8 @@ extension Grpc_Testing_ReconnectServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.ReconnectService/Start",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStartInterceptors() ?? []
     )
   }
 
@@ -846,7 +868,8 @@ extension Grpc_Testing_ReconnectServiceAsyncClientProtocol {
     return self.makeAsyncUnaryCall(
       path: "/grpc.testing.ReconnectService/Stop",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStopInterceptors() ?? []
     )
   }
 }
@@ -860,7 +883,8 @@ extension Grpc_Testing_ReconnectServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.ReconnectService/Start",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStartInterceptors() ?? []
     )
   }
 
@@ -871,7 +895,8 @@ extension Grpc_Testing_ReconnectServiceAsyncClientProtocol {
     return try await self.performAsyncUnaryCall(
       path: "/grpc.testing.ReconnectService/Stop",
       request: request,
-      callOptions: callOptions ?? self.defaultCallOptions
+      callOptions: callOptions ?? self.defaultCallOptions,
+      interceptors: self.interceptors?.makeStopInterceptors() ?? []
     )
   }
 }
@@ -893,7 +918,7 @@ public struct Grpc_Testing_ReconnectServiceAsyncClient: Grpc_Testing_ReconnectSe
   }
 }
 
-#endif // compiler(>=5.5)
+#endif // compiler(>=5.5) && canImport(_Concurrency)
 
 /// A simple service to test the various types of RPCs and experiment with
 /// performance with various types of payload.
@@ -1047,7 +1072,7 @@ public protocol Grpc_Testing_TestServiceServerInterceptorFactoryProtocol {
   func makeUnimplementedCallInterceptors() -> [ServerInterceptor<Grpc_Testing_Empty, Grpc_Testing_Empty>]
 }
 
-#if compiler(>=5.5)
+#if compiler(>=5.5) && canImport(_Concurrency)
 
 /// A simple service to test the various types of RPCs and experiment with
 /// performance with various types of payload.
@@ -1088,7 +1113,7 @@ public protocol Grpc_Testing_TestServiceAsyncProvider: CallHandlerProvider {
   /// A sequence of requests followed by one response (streamed upload).
   /// The server returns the aggregated size of client payload as the result.
   @Sendable func streamingInputCall(
-    requests: GRPCAsyncRequestStream<Grpc_Testing_StreamingInputCallRequest>,
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_StreamingInputCallRequest>,
     context: GRPCAsyncServerCallContext
   ) async throws -> Grpc_Testing_StreamingInputCallResponse
 
@@ -1096,7 +1121,7 @@ public protocol Grpc_Testing_TestServiceAsyncProvider: CallHandlerProvider {
   /// As one request could lead to multiple responses, this interface
   /// demonstrates the idea of full duplexing.
   @Sendable func fullDuplexCall(
-    requests: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
     responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_StreamingOutputCallResponse>,
     context: GRPCAsyncServerCallContext
   ) async throws
@@ -1106,7 +1131,7 @@ public protocol Grpc_Testing_TestServiceAsyncProvider: CallHandlerProvider {
   /// stream of responses are returned to the client when the server starts with
   /// first request.
   @Sendable func halfDuplexCall(
-    requests: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
     responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_StreamingOutputCallResponse>,
     context: GRPCAsyncServerCallContext
   ) async throws
@@ -1169,7 +1194,7 @@ extension Grpc_Testing_TestServiceAsyncProvider {
         requestDeserializer: ProtobufDeserializer<Grpc_Testing_StreamingInputCallRequest>(),
         responseSerializer: ProtobufSerializer<Grpc_Testing_StreamingInputCallResponse>(),
         interceptors: self.interceptors?.makeStreamingInputCallInterceptors() ?? [],
-        wrapping: self.streamingInputCall(requests:context:)
+        wrapping: self.streamingInputCall(requestStream:context:)
       )
 
     case "FullDuplexCall":
@@ -1178,7 +1203,7 @@ extension Grpc_Testing_TestServiceAsyncProvider {
         requestDeserializer: ProtobufDeserializer<Grpc_Testing_StreamingOutputCallRequest>(),
         responseSerializer: ProtobufSerializer<Grpc_Testing_StreamingOutputCallResponse>(),
         interceptors: self.interceptors?.makeFullDuplexCallInterceptors() ?? [],
-        wrapping: self.fullDuplexCall(requests:responseStream:context:)
+        wrapping: self.fullDuplexCall(requestStream:responseStream:context:)
       )
 
     case "HalfDuplexCall":
@@ -1187,7 +1212,7 @@ extension Grpc_Testing_TestServiceAsyncProvider {
         requestDeserializer: ProtobufDeserializer<Grpc_Testing_StreamingOutputCallRequest>(),
         responseSerializer: ProtobufSerializer<Grpc_Testing_StreamingOutputCallResponse>(),
         interceptors: self.interceptors?.makeHalfDuplexCallInterceptors() ?? [],
-        wrapping: self.halfDuplexCall(requests:responseStream:context:)
+        wrapping: self.halfDuplexCall(requestStream:responseStream:context:)
       )
 
     default:
@@ -1196,7 +1221,7 @@ extension Grpc_Testing_TestServiceAsyncProvider {
   }
 }
 
-#endif // compiler(>=5.5)
+#endif // compiler(>=5.5) && canImport(_Concurrency)
 
 /// A simple service NOT implemented at servers so clients can test for
 /// that case.
@@ -1241,7 +1266,7 @@ public protocol Grpc_Testing_UnimplementedServiceServerInterceptorFactoryProtoco
   func makeUnimplementedCallInterceptors() -> [ServerInterceptor<Grpc_Testing_Empty, Grpc_Testing_Empty>]
 }
 
-#if compiler(>=5.5)
+#if compiler(>=5.5) && canImport(_Concurrency)
 
 /// A simple service NOT implemented at servers so clients can test for
 /// that case.
@@ -1288,7 +1313,7 @@ extension Grpc_Testing_UnimplementedServiceAsyncProvider {
   }
 }
 
-#endif // compiler(>=5.5)
+#endif // compiler(>=5.5) && canImport(_Concurrency)
 
 /// A service used to control reconnect server.
 ///
@@ -1346,7 +1371,7 @@ public protocol Grpc_Testing_ReconnectServiceServerInterceptorFactoryProtocol {
   func makeStopInterceptors() -> [ServerInterceptor<Grpc_Testing_Empty, Grpc_Testing_ReconnectInfo>]
 }
 
-#if compiler(>=5.5)
+#if compiler(>=5.5) && canImport(_Concurrency)
 
 /// A service used to control reconnect server.
 ///
@@ -1405,5 +1430,5 @@ extension Grpc_Testing_ReconnectServiceAsyncProvider {
   }
 }
 
-#endif // compiler(>=5.5)
+#endif // compiler(>=5.5) && canImport(_Concurrency)
 

+ 5 - 5
Sources/GRPCInteroperabilityTestsImplementation/TestServiceAsyncProvider.swift

@@ -151,12 +151,12 @@ public class TestServiceAsyncProvider: Grpc_Testing_TestServiceAsyncProvider {
   /// `StreamingInputCallResponse` where `aggregatedPayloadSize` is the sum of all request payload
   /// bodies received.
   public func streamingInputCall(
-    requests: GRPCAsyncRequestStream<Grpc_Testing_StreamingInputCallRequest>,
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_StreamingInputCallRequest>,
     context: GRPCAsyncServerCallContext
   ) async throws -> Grpc_Testing_StreamingInputCallResponse {
     var aggregatePayloadSize = 0
 
-    for try await request in requests {
+    for try await request in requestStream {
       if request.expectCompressed.value {
         guard context.requestMetadata.contains(name: "grpc-encoding") else {
           throw GRPCStatus(
@@ -178,7 +178,7 @@ public class TestServiceAsyncProvider: Grpc_Testing_TestServiceAsyncProvider {
   /// of size `ResponseParameter.size` bytes, as specified by its respective `ResponseParameter`s.
   /// After receiving half close and sending all responses, it closes with OK.
   public func fullDuplexCall(
-    requests: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
     responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_StreamingOutputCallResponse>,
     context: GRPCAsyncServerCallContext
   ) async throws {
@@ -187,7 +187,7 @@ public class TestServiceAsyncProvider: Grpc_Testing_TestServiceAsyncProvider {
       throw Self.echoMetadataNotImplemented
     }
 
-    for try await request in requests {
+    for try await request in requestStream {
       if request.shouldEchoStatus {
         let code = GRPCStatus.Code(rawValue: numericCast(request.responseStatus.code))
         let status = GRPCStatus(code: code ?? .unknown, message: request.responseStatus.message)
@@ -207,7 +207,7 @@ public class TestServiceAsyncProvider: Grpc_Testing_TestServiceAsyncProvider {
   ///
   /// See: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
   public func halfDuplexCall(
-    requests: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
+    requestStream: GRPCAsyncRequestStream<Grpc_Testing_StreamingOutputCallRequest>,
     responseStream: GRPCAsyncResponseStreamWriter<Grpc_Testing_StreamingOutputCallResponse>,
     context: GRPCAsyncServerCallContext
   ) async throws {

+ 6 - 4
Sources/protoc-gen-grpc-swift/Generator-Server+AsyncAwait.swift

@@ -59,7 +59,7 @@ extension Generator {
 
     case .clientStreaming:
       arguments = [
-        "requests: \(Types.requestStream(of: self.methodInputName))",
+        "requestStream: \(Types.requestStream(of: self.methodInputName))",
         "context: \(Types.serverContext)",
       ]
       returnType = self.methodOutputName
@@ -74,7 +74,7 @@ extension Generator {
 
     case .bidirectionalStreaming:
       arguments = [
-        "requests: \(Types.requestStream(of: self.methodInputName))",
+        "requestStream: \(Types.requestStream(of: self.methodInputName))",
         "responseStream: \(Types.responseStreamWriter(of: self.methodOutputName))",
         "context: \(Types.serverContext)",
       ]
@@ -145,13 +145,15 @@ extension Generator {
                 self.println("wrapping: self.\(functionName)(request:context:)")
 
               case .clientStreaming:
-                self.println("wrapping: self.\(functionName)(requests:context:)")
+                self.println("wrapping: self.\(functionName)(requestStream:context:)")
 
               case .serverStreaming:
                 self.println("wrapping: self.\(functionName)(request:responseStream:context:)")
 
               case .bidirectionalStreaming:
-                self.println("wrapping: self.\(functionName)(requests:responseStream:context:)")
+                self.println(
+                  "wrapping: self.\(functionName)(requestStream:responseStream:context:)"
+                )
               }
             }
           }

+ 3 - 3
Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift

@@ -123,7 +123,7 @@ final class AsyncIntegrationTests: GRPCTestCase {
       let initialMetadata = try await expand.initialMetadata
       initialMetadata.assertFirst("200", forName: ":status")
 
-      let responses = try await expand.responses.map { $0.text }.collect()
+      let responses = try await expand.responseStream.map { $0.text }.collect()
       XCTAssertEqual(responses, [
         "Swift echo expand (0): boyle",
         "Swift echo expand (1): jeffers",
@@ -154,7 +154,7 @@ final class AsyncIntegrationTests: GRPCTestCase {
     XCTAsyncTest {
       let update = self.echo.makeUpdateCall()
 
-      var responseIterator = update.responses.map { $0.text }.makeAsyncIterator()
+      var responseIterator = update.responseStream.map { $0.text }.makeAsyncIterator()
 
       for (i, name) in ["boyle", "jeffers", "holt"].enumerated() {
         try await update.requestStream.send(.with { $0.text = name })
@@ -200,7 +200,7 @@ final class AsyncIntegrationTests: GRPCTestCase {
     XCTAsyncTest {
       let update = self.echo.makeUpdateCall()
       try await update.requestStream.send(.with { $0.text = "hello" })
-      _ = try await update.responses.first(where: { _ in true })
+      _ = try await update.responseStream.first(where: { _ in true })
       XCTAssertNoThrow(try self.server.close().wait())
       self.server = nil // So that tearDown() does not call close() again.
       try await update.requestStream.finish()

+ 2 - 2
Tests/GRPCTests/AsyncAwaitSupport/InterceptorsAsyncTests.swift

@@ -123,7 +123,7 @@ class InterceptorsAsyncTests: GRPCTestCase {
 
   func testMakingCallServerStreaming() { XCTAsyncTest {
     let call = self.echo.makeExpandCall(.with { $0.text = "hello" }, callOptions: .init())
-    for try await response in call.responses {
+    for try await response in call.responseStream {
       // Expand splits on spaces, so we only expect one response.
       await assertThat(response, .is(.with { $0.text = "hello :)0( dnapxe ohce tfiwS" }))
     }
@@ -156,7 +156,7 @@ class InterceptorsAsyncTests: GRPCTestCase {
     try await call.requestStream.finish()
 
     var count = 0
-    for try await response in call.responses {
+    for try await response in call.responseStream {
       switch count {
       case 0:
         await assertThat(response, .is(.with { $0.text = "1 2 :)0( etadpu ohce tfiwS" }))

+ 4 - 4
Tests/GRPCTests/GRPCAsyncClientCallTests.swift

@@ -117,7 +117,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
 
     await assertThat(try await expand.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
 
-    let numResponses = try await expand.responses.map { _ in 1 }.reduce(0, +)
+    let numResponses = try await expand.responseStream.map { _ in 1 }.reduce(0, +)
 
     await assertThat(numResponses, .is(.equalTo(3)))
     await assertThat(try await expand.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
@@ -137,7 +137,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
     }
     try await update.requestStream.finish()
 
-    let numResponses = try await update.responses.map { _ in 1 }.reduce(0, +)
+    let numResponses = try await update.responseStream.map { _ in 1 }.reduce(0, +)
 
     await assertThat(numResponses, .is(.equalTo(3)))
     await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
@@ -154,7 +154,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
 
     await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
 
-    var responseStreamIterator = update.responses.makeAsyncIterator()
+    var responseStreamIterator = update.responseStream.makeAsyncIterator()
     for word in ["boyle", "jeffers", "holt"] {
       try await update.requestStream.send(.with { $0.text = word })
       await assertThat(try await responseStreamIterator.next(), .is(.notNil()))
@@ -192,7 +192,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase {
       }
       // Get responses in a separate task.
       taskGroup.addTask {
-        for try await _ in update.responses {
+        for try await _ in update.responseStream {
           await counter.incrementResponses()
         }
       }