|
|
@@ -130,42 +130,75 @@ extension ClientRPCExecutor {
|
|
|
) async -> ClientResponse.Stream<Output> {
|
|
|
let context = ClientInterceptorContext(descriptor: method)
|
|
|
|
|
|
- return await Self._intercept(
|
|
|
- request: request,
|
|
|
- context: context,
|
|
|
- interceptors: interceptors
|
|
|
- ) { request, context in
|
|
|
- // Let the server know this is a retry.
|
|
|
- var metadata = request.metadata
|
|
|
- if attempt > 1 {
|
|
|
- metadata.previousRPCAttempts = attempt &- 1
|
|
|
- }
|
|
|
-
|
|
|
- var response = await streamProcessor.execute(
|
|
|
- request: ClientRequest.Stream<[UInt8]>(metadata: metadata) { writer in
|
|
|
- try await request.producer(.serializing(into: writer, with: serializer))
|
|
|
- },
|
|
|
- method: context.descriptor,
|
|
|
+ if interceptors.isEmpty {
|
|
|
+ return await Self._runRPC(
|
|
|
+ request: request,
|
|
|
+ context: context,
|
|
|
+ attempt: attempt,
|
|
|
+ serializer: serializer,
|
|
|
+ deserializer: deserializer,
|
|
|
+ streamProcessor: streamProcessor,
|
|
|
stream: stream
|
|
|
)
|
|
|
+ } else {
|
|
|
+ return await Self._intercept(
|
|
|
+ request: request,
|
|
|
+ context: context,
|
|
|
+ interceptors: interceptors
|
|
|
+ ) { request, context in
|
|
|
+ return await Self._runRPC(
|
|
|
+ request: request,
|
|
|
+ context: context,
|
|
|
+ attempt: attempt,
|
|
|
+ serializer: serializer,
|
|
|
+ deserializer: deserializer,
|
|
|
+ streamProcessor: streamProcessor,
|
|
|
+ stream: stream
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // Attach the number of previous attempts, it can be useful information for callers.
|
|
|
- if attempt > 1 {
|
|
|
- switch response.accepted {
|
|
|
- case .success(var contents):
|
|
|
- contents.metadata.previousRPCAttempts = attempt &- 1
|
|
|
- response.accepted = .success(contents)
|
|
|
+ @inlinable
|
|
|
+ static func _runRPC<Transport: ClientTransport, Input: Sendable, Output: Sendable>(
|
|
|
+ request: ClientRequest.Stream<Input>,
|
|
|
+ context: ClientInterceptorContext,
|
|
|
+ attempt: Int,
|
|
|
+ serializer: some MessageSerializer<Input>,
|
|
|
+ deserializer: some MessageDeserializer<Output>,
|
|
|
+ streamProcessor: ClientStreamExecutor<Transport>,
|
|
|
+ stream: RPCStream<Transport.Inbound, Transport.Outbound>
|
|
|
+ ) async -> ClientResponse.Stream<Output> {
|
|
|
+ // Let the server know this is a retry.
|
|
|
+ var metadata = request.metadata
|
|
|
+ if attempt > 1 {
|
|
|
+ metadata.previousRPCAttempts = attempt &- 1
|
|
|
+ }
|
|
|
|
|
|
- case .failure(var error):
|
|
|
- error.metadata.previousRPCAttempts = attempt &- 1
|
|
|
- response.accepted = .failure(error)
|
|
|
- }
|
|
|
- }
|
|
|
+ var response = await streamProcessor.execute(
|
|
|
+ request: ClientRequest.Stream<[UInt8]>(metadata: metadata) { writer in
|
|
|
+ try await request.producer(.serializing(into: writer, with: serializer))
|
|
|
+ },
|
|
|
+ method: context.descriptor,
|
|
|
+ stream: stream
|
|
|
+ )
|
|
|
|
|
|
- return response.map { bytes in
|
|
|
- try deserializer.deserialize(bytes)
|
|
|
+ // Attach the number of previous attempts, it can be useful information for callers.
|
|
|
+ if attempt > 1 {
|
|
|
+ switch response.accepted {
|
|
|
+ case .success(var contents):
|
|
|
+ contents.metadata.previousRPCAttempts = attempt &- 1
|
|
|
+ response.accepted = .success(contents)
|
|
|
+
|
|
|
+ case .failure(var error):
|
|
|
+ error.metadata.previousRPCAttempts = attempt &- 1
|
|
|
+ response.accepted = .failure(error)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return response.map { bytes in
|
|
|
+ try deserializer.deserialize(bytes)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@inlinable
|
|
|
@@ -173,7 +206,7 @@ extension ClientRPCExecutor {
|
|
|
request: ClientRequest.Stream<Input>,
|
|
|
context: ClientInterceptorContext,
|
|
|
interceptors: [any ClientInterceptor],
|
|
|
- finally: @escaping @Sendable (
|
|
|
+ finally: @Sendable (
|
|
|
_ request: ClientRequest.Stream<Input>,
|
|
|
_ context: ClientInterceptorContext
|
|
|
) async -> ClientResponse.Stream<Output>
|
|
|
@@ -191,7 +224,7 @@ extension ClientRPCExecutor {
|
|
|
request: ClientRequest.Stream<Input>,
|
|
|
context: ClientInterceptorContext,
|
|
|
iterator: Array<any ClientInterceptor>.Iterator,
|
|
|
- finally: @escaping @Sendable (
|
|
|
+ finally: @Sendable (
|
|
|
_ request: ClientRequest.Stream<Input>,
|
|
|
_ context: ClientInterceptorContext
|
|
|
) async -> ClientResponse.Stream<Output>
|