|
|
@@ -18,6 +18,16 @@ import CgRPC
|
|
|
#endif
|
|
|
import Foundation
|
|
|
|
|
|
+/// Used to hold weak references to objects since `NSHashTable<T>.weakObjects()` isn't available on Linux.
|
|
|
+/// If/when this type becomes available on Linux, this should be replaced.
|
|
|
+private final class WeakReference<T: AnyObject> {
|
|
|
+ private(set) weak var value: T?
|
|
|
+
|
|
|
+ init(value: T) {
|
|
|
+ self.value = value
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
/// A gRPC Channel
|
|
|
public class Channel {
|
|
|
private let mutex = Mutex()
|
|
|
@@ -25,8 +35,12 @@ public class Channel {
|
|
|
private let underlyingChannel: UnsafeMutableRawPointer
|
|
|
/// Completion queue for channel call operations
|
|
|
private let completionQueue: CompletionQueue
|
|
|
+ /// Weak references to API calls using this channel that are in-flight
|
|
|
+ private var activeCalls = [WeakReference<Call>]()
|
|
|
/// Observer for connectivity state changes. Created lazily if needed
|
|
|
private var connectivityObserver: ConnectivityObserver?
|
|
|
+ /// Whether the gRPC channel has been shut down
|
|
|
+ private var hasBeenShutdown = false
|
|
|
|
|
|
/// Timeout for new calls
|
|
|
public var timeout: TimeInterval = 600.0
|
|
|
@@ -34,44 +48,45 @@ public class Channel {
|
|
|
/// Default host to use for new calls
|
|
|
public var host: String
|
|
|
|
|
|
+ /// Errors that may be thrown by the channel
|
|
|
+ enum Error: Swift.Error {
|
|
|
+ /// Action cannot be performed because the channel has already been shut down
|
|
|
+ case alreadyShutdown
|
|
|
+ /// Failed to create a new call within the gRPC stack
|
|
|
+ case callCreationFailed
|
|
|
+ }
|
|
|
+
|
|
|
/// Initializes a gRPC channel
|
|
|
///
|
|
|
/// - Parameter address: the address of the server to be called
|
|
|
/// - Parameter secure: if true, use TLS
|
|
|
/// - Parameter arguments: list of channel configuration options
|
|
|
- public init(address: String, secure: Bool = true, arguments: [Argument] = []) {
|
|
|
+ public convenience init(address: String, secure: Bool = true, arguments: [Argument] = []) {
|
|
|
gRPC.initialize()
|
|
|
- host = address
|
|
|
- let argumentWrappers = arguments.map { $0.toCArg() }
|
|
|
|
|
|
- underlyingChannel = withExtendedLifetime(argumentWrappers) {
|
|
|
+ let argumentWrappers = arguments.map { $0.toCArg() }
|
|
|
+ self.init(host: address, underlyingChannel: withExtendedLifetime(argumentWrappers) {
|
|
|
var argumentValues = argumentWrappers.map { $0.wrapped }
|
|
|
if secure {
|
|
|
return cgrpc_channel_create_secure(address, kRootCertificates, nil, nil, &argumentValues, Int32(arguments.count))
|
|
|
} else {
|
|
|
return cgrpc_channel_create(address, &argumentValues, Int32(arguments.count))
|
|
|
}
|
|
|
- }
|
|
|
- completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
|
|
|
- completionQueue.run() // start a loop that watches the channel's completion queue
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
/// Initializes a gRPC channel
|
|
|
///
|
|
|
/// - Parameter address: the address of the server to be called
|
|
|
/// - Parameter arguments: list of channel configuration options
|
|
|
- public init(googleAddress: String, arguments: [Argument] = []) {
|
|
|
+ public convenience init(googleAddress: String, arguments: [Argument] = []) {
|
|
|
gRPC.initialize()
|
|
|
- host = googleAddress
|
|
|
- let argumentWrappers = arguments.map { $0.toCArg() }
|
|
|
|
|
|
- underlyingChannel = withExtendedLifetime(argumentWrappers) {
|
|
|
+ let argumentWrappers = arguments.map { $0.toCArg() }
|
|
|
+ self.init(host: googleAddress, underlyingChannel: withExtendedLifetime(argumentWrappers) {
|
|
|
var argumentValues = argumentWrappers.map { $0.wrapped }
|
|
|
return cgrpc_channel_create_google(googleAddress, &argumentValues, Int32(arguments.count))
|
|
|
- }
|
|
|
-
|
|
|
- completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
|
|
|
- completionQueue.run() // start a loop that watches the channel's completion queue
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
/// Initializes a gRPC channel
|
|
|
@@ -81,25 +96,31 @@ public class Channel {
|
|
|
/// - Parameter clientCertificates: a PEM representation of the client certificates to use
|
|
|
/// - Parameter clientKey: a PEM representation of the client key to use
|
|
|
/// - Parameter arguments: list of channel configuration options
|
|
|
- public init(address: String, certificates: String = kRootCertificates, clientCertificates: String? = nil, clientKey: String? = nil, arguments: [Argument] = []) {
|
|
|
+ public convenience init(address: String, certificates: String = kRootCertificates, clientCertificates: String? = nil, clientKey: String? = nil, arguments: [Argument] = []) {
|
|
|
gRPC.initialize()
|
|
|
- host = address
|
|
|
- let argumentWrappers = arguments.map { $0.toCArg() }
|
|
|
|
|
|
- underlyingChannel = withExtendedLifetime(argumentWrappers) {
|
|
|
+ let argumentWrappers = arguments.map { $0.toCArg() }
|
|
|
+ self.init(host: address, underlyingChannel: withExtendedLifetime(argumentWrappers) {
|
|
|
var argumentValues = argumentWrappers.map { $0.wrapped }
|
|
|
return cgrpc_channel_create_secure(address, certificates, clientCertificates, clientKey, &argumentValues, Int32(arguments.count))
|
|
|
- }
|
|
|
- completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
|
|
|
- completionQueue.run() // start a loop that watches the channel's completion queue
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- deinit {
|
|
|
+ /// Shut down the channel. No new calls may be made using this channel after it is shut down. Any in-flight calls using this channel will be canceled
|
|
|
+ public func shutdown() {
|
|
|
self.mutex.synchronize {
|
|
|
+ guard !self.hasBeenShutdown else { return }
|
|
|
+
|
|
|
+ self.hasBeenShutdown = true
|
|
|
self.connectivityObserver?.shutdown()
|
|
|
+ cgrpc_channel_destroy(self.underlyingChannel)
|
|
|
+ self.completionQueue.shutdown()
|
|
|
+ self.activeCalls.forEach { $0.value?.cancel() }
|
|
|
}
|
|
|
- cgrpc_channel_destroy(self.underlyingChannel)
|
|
|
- self.completionQueue.shutdown()
|
|
|
+ }
|
|
|
+
|
|
|
+ deinit {
|
|
|
+ self.shutdown()
|
|
|
}
|
|
|
|
|
|
/// Constructs a Call object to make a gRPC API call
|
|
|
@@ -108,11 +129,21 @@ public class Channel {
|
|
|
/// - Parameter host: the gRPC host name for the call. If unspecified, defaults to the Client host
|
|
|
/// - Parameter timeout: a timeout value in seconds
|
|
|
/// - Returns: a Call object that can be used to perform the request
|
|
|
- public func makeCall(_ method: String, host: String = "", timeout: TimeInterval? = nil) -> Call {
|
|
|
- let host = host.isEmpty ? self.host : host
|
|
|
- let timeout = timeout ?? self.timeout
|
|
|
- let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
|
|
|
- return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
|
|
|
+ public func makeCall(_ method: String, host: String? = nil, timeout: TimeInterval? = nil) throws -> Call {
|
|
|
+ self.mutex.lock()
|
|
|
+ defer { self.mutex.unlock() }
|
|
|
+
|
|
|
+ guard !self.hasBeenShutdown else {
|
|
|
+ throw Error.alreadyShutdown
|
|
|
+ }
|
|
|
+
|
|
|
+ guard let underlyingCall = cgrpc_channel_create_call(
|
|
|
+ self.underlyingChannel, method, host ?? self.host, timeout ?? self.timeout)
|
|
|
+ else { throw Error.callCreationFailed }
|
|
|
+
|
|
|
+ let call = Call(underlyingCall: underlyingCall, owned: true, completionQueue: self.completionQueue)
|
|
|
+ self.activeCalls.append(WeakReference(value: call))
|
|
|
+ return call
|
|
|
}
|
|
|
|
|
|
/// Check the current connectivity state
|
|
|
@@ -139,4 +170,29 @@ public class Channel {
|
|
|
observer.addConnectivityObserver(callback: callback)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // MARK: - Private
|
|
|
+
|
|
|
+ private init(host: String, underlyingChannel: UnsafeMutableRawPointer) {
|
|
|
+ self.host = host
|
|
|
+ self.underlyingChannel = underlyingChannel
|
|
|
+ self.completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel),
|
|
|
+ name: "Client")
|
|
|
+
|
|
|
+ self.completionQueue.run()
|
|
|
+ self.scheduleActiveCallCleanUp()
|
|
|
+ }
|
|
|
+
|
|
|
+ private func scheduleActiveCallCleanUp() {
|
|
|
+ DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 10.0) { [weak self] in
|
|
|
+ self?.cleanUpActiveCalls()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private func cleanUpActiveCalls() {
|
|
|
+ self.mutex.synchronize {
|
|
|
+ self.activeCalls = self.activeCalls.filter { $0.value != nil }
|
|
|
+ }
|
|
|
+ self.scheduleActiveCallCleanUp()
|
|
|
+ }
|
|
|
}
|