|
|
@@ -17,9 +17,10 @@
|
|
|
internal import Atomics
|
|
|
internal import DequeModule
|
|
|
package import GRPCCore
|
|
|
+private import Synchronization
|
|
|
|
|
|
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
|
|
|
-package struct GRPCChannel: ClientTransport {
|
|
|
+package final class GRPCChannel: ClientTransport {
|
|
|
private enum Input: Sendable {
|
|
|
/// Close the channel, if possible.
|
|
|
case close
|
|
|
@@ -43,7 +44,7 @@ package struct GRPCChannel: ClientTransport {
|
|
|
private let resolver: NameResolver
|
|
|
|
|
|
/// The state of the channel.
|
|
|
- private let state: LockedValueBox<StateMachine>
|
|
|
+ private let state: Mutex<StateMachine>
|
|
|
|
|
|
/// The maximum number of times to attempt to create a stream per RPC.
|
|
|
///
|
|
|
@@ -68,8 +69,8 @@ package struct GRPCChannel: ClientTransport {
|
|
|
private let defaultServiceConfig: ServiceConfig
|
|
|
|
|
|
// These are both read frequently and updated infrequently so may be a bottleneck.
|
|
|
- private let _methodConfig: LockedValueBox<MethodConfigs>
|
|
|
- private let _retryThrottle: LockedValueBox<RetryThrottle?>
|
|
|
+ private let _methodConfig: Mutex<MethodConfigs>
|
|
|
+ private let _retryThrottle: Mutex<RetryThrottle?>
|
|
|
|
|
|
package init(
|
|
|
resolver: NameResolver,
|
|
|
@@ -78,7 +79,7 @@ package struct GRPCChannel: ClientTransport {
|
|
|
defaultServiceConfig: ServiceConfig
|
|
|
) {
|
|
|
self.resolver = resolver
|
|
|
- self.state = LockedValueBox(StateMachine())
|
|
|
+ self.state = Mutex(StateMachine())
|
|
|
self._connectivityState = AsyncStream.makeStream()
|
|
|
self.input = AsyncStream.makeStream()
|
|
|
self.connector = connector
|
|
|
@@ -94,10 +95,10 @@ package struct GRPCChannel: ClientTransport {
|
|
|
self.defaultServiceConfig = defaultServiceConfig
|
|
|
|
|
|
let throttle = defaultServiceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
|
|
|
- self._retryThrottle = LockedValueBox(throttle)
|
|
|
+ self._retryThrottle = Mutex(throttle)
|
|
|
|
|
|
let methodConfig = MethodConfigs(serviceConfig: defaultServiceConfig)
|
|
|
- self._methodConfig = LockedValueBox(methodConfig)
|
|
|
+ self._methodConfig = Mutex(methodConfig)
|
|
|
}
|
|
|
|
|
|
/// The connectivity state of the channel.
|
|
|
@@ -107,7 +108,7 @@ package struct GRPCChannel: ClientTransport {
|
|
|
|
|
|
/// Returns a throttle which gRPC uses to determine whether retries can be executed.
|
|
|
package var retryThrottle: RetryThrottle? {
|
|
|
- self._retryThrottle.withLockedValue { $0 }
|
|
|
+ self._retryThrottle.withLock { $0 }
|
|
|
}
|
|
|
|
|
|
/// Returns the configuration for a given method.
|
|
|
@@ -115,12 +116,12 @@ package struct GRPCChannel: ClientTransport {
|
|
|
/// - Parameter descriptor: The method to lookup configuration for.
|
|
|
/// - Returns: Configuration for the method, if it exists.
|
|
|
package func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
|
|
|
- self._methodConfig.withLockedValue { $0[descriptor] }
|
|
|
+ self._methodConfig.withLock { $0[descriptor] }
|
|
|
}
|
|
|
|
|
|
/// Establishes and maintains a connection to the remote destination.
|
|
|
package func connect() async {
|
|
|
- self.state.withLockedValue { $0.start() }
|
|
|
+ self.state.withLock { $0.start() }
|
|
|
self._connectivityState.continuation.yield(.idle)
|
|
|
|
|
|
await withDiscardingTaskGroup { group in
|
|
|
@@ -146,7 +147,7 @@ package struct GRPCChannel: ClientTransport {
|
|
|
// When the channel is closed gracefully, the task group running the load balancer mustn't
|
|
|
// be cancelled (otherwise in-flight RPCs would fail), but the push based resolver will
|
|
|
// continue indefinitely. Store its handle and cancel it on close when closing the channel.
|
|
|
- self.state.withLockedValue { state in
|
|
|
+ self.state.withLock { state in
|
|
|
state.setNameResolverTaskHandle(handle)
|
|
|
}
|
|
|
|
|
|
@@ -270,7 +271,7 @@ extension GRPCChannel {
|
|
|
options: CallOptions
|
|
|
) async -> MakeStreamResult {
|
|
|
let waitForReady = options.waitForReady ?? true
|
|
|
- switch self.state.withLockedValue({ $0.makeStream(waitForReady: waitForReady) }) {
|
|
|
+ switch self.state.withLock({ $0.makeStream(waitForReady: waitForReady) }) {
|
|
|
case .useLoadBalancer(let loadBalancer):
|
|
|
return await self.makeStream(
|
|
|
descriptor: descriptor,
|
|
|
@@ -327,7 +328,7 @@ extension GRPCChannel {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- let enqueued = self.state.withLockedValue { state in
|
|
|
+ let enqueued = self.state.withLock { state in
|
|
|
state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
|
|
|
}
|
|
|
|
|
|
@@ -338,7 +339,7 @@ extension GRPCChannel {
|
|
|
}
|
|
|
}
|
|
|
} onCancel: {
|
|
|
- let continuation = self.state.withLockedValue { state in
|
|
|
+ let continuation = self.state.withLock { state in
|
|
|
state.dequeueContinuation(id: id)
|
|
|
}
|
|
|
|
|
|
@@ -350,7 +351,7 @@ extension GRPCChannel {
|
|
|
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
|
|
|
extension GRPCChannel {
|
|
|
private func handleClose(in group: inout DiscardingTaskGroup) {
|
|
|
- switch self.state.withLockedValue({ $0.close() }) {
|
|
|
+ switch self.state.withLock({ $0.close() }) {
|
|
|
case .close(let current, let next, let resolver, let continuations):
|
|
|
resolver?.cancel()
|
|
|
current.close()
|
|
|
@@ -383,10 +384,10 @@ extension GRPCChannel {
|
|
|
case .success(let config):
|
|
|
// Update per RPC configuration.
|
|
|
let methodConfig = MethodConfigs(serviceConfig: config)
|
|
|
- self._methodConfig.withLockedValue { $0 = methodConfig }
|
|
|
+ self._methodConfig.withLock { $0 = methodConfig }
|
|
|
|
|
|
let retryThrottle = config.retryThrottling.map { RetryThrottle(policy: $0) }
|
|
|
- self._retryThrottle.withLockedValue { $0 = retryThrottle }
|
|
|
+ self._retryThrottle.withLock { $0 = retryThrottle }
|
|
|
|
|
|
// Update the load balancer.
|
|
|
self.updateLoadBalancer(serviceConfig: config, endpoints: result.endpoints, in: &group)
|
|
|
@@ -446,7 +447,7 @@ extension GRPCChannel {
|
|
|
let loadBalancerConfig = configFromServiceConfig ?? .pickFirst(.init(shuffleAddressList: false))
|
|
|
switch loadBalancerConfig {
|
|
|
case .roundRobin:
|
|
|
- onUpdatePolicy = self.state.withLockedValue { state in
|
|
|
+ onUpdatePolicy = self.state.withLock { state in
|
|
|
state.changeLoadBalancerKind(to: loadBalancerConfig) {
|
|
|
let loadBalancer = RoundRobinLoadBalancer(
|
|
|
connector: self.connector,
|
|
|
@@ -463,7 +464,7 @@ extension GRPCChannel {
|
|
|
endpoints[0].addresses.shuffle()
|
|
|
}
|
|
|
|
|
|
- onUpdatePolicy = self.state.withLockedValue { state in
|
|
|
+ onUpdatePolicy = self.state.withLock { state in
|
|
|
state.changeLoadBalancerKind(to: loadBalancerConfig) {
|
|
|
let loadBalancer = PickFirstLoadBalancer(
|
|
|
connector: self.connector,
|
|
|
@@ -527,7 +528,7 @@ extension GRPCChannel {
|
|
|
) async {
|
|
|
switch event {
|
|
|
case .connectivityStateChanged(let connectivityState):
|
|
|
- let actions = self.state.withLockedValue { state in
|
|
|
+ let actions = self.state.withLock { state in
|
|
|
state.loadBalancerStateChanged(to: connectivityState, id: loadBalancerID)
|
|
|
}
|
|
|
|