| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- /*
- * Copyright 2016, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #if SWIFT_PACKAGE
- import CgRPC
- import Dispatch
- #endif
- import Foundation
- /// A gRPC Channel
- public class Channel {
- /// Pointer to underlying C representation
- private let underlyingChannel: UnsafeMutableRawPointer
- /// Completion queue for channel call operations
- private let completionQueue: CompletionQueue
- /// Timeout for new calls
- public var timeout: TimeInterval = 600.0
- /// Default host to use for new calls
- public var host: String
- /// Connectivity state observers
- private var connectivityObservers: [ConnectivityObserver] = []
- /// Initializes a gRPC channel
- ///
- /// - Parameter address: the address of the server to be called
- /// - Parameter secure: if true, use TLS
- /// - Parameter arguments: list of channel configuration options
- public init(address: String, secure: Bool = true, arguments: [Argument] = []) {
- gRPC.initialize()
- host = address
- let argumentWrappers = arguments.map { $0.toCArg() }
- underlyingChannel = withExtendedLifetime(argumentWrappers) {
- var argumentValues = argumentWrappers.map { $0.wrapped }
- if secure {
- return cgrpc_channel_create_secure(address, roots_pem(), nil, nil, &argumentValues, Int32(arguments.count))
- } else {
- return cgrpc_channel_create(address, &argumentValues, Int32(arguments.count))
- }
- }
- completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
- completionQueue.run() // start a loop that watches the channel's completion queue
- }
- /// Initializes a gRPC channel
- ///
- /// - Parameter address: the address of the server to be called
- /// - Parameter arguments: list of channel configuration options
- public init(googleAddress: String, arguments: [Argument] = []) {
- gRPC.initialize()
- host = googleAddress
- let argumentWrappers = arguments.map { $0.toCArg() }
-
- underlyingChannel = withExtendedLifetime(argumentWrappers) {
- var argumentValues = argumentWrappers.map { $0.wrapped }
- return cgrpc_channel_create_google(googleAddress, &argumentValues, Int32(arguments.count))
- }
- completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
- completionQueue.run() // start a loop that watches the channel's completion queue
- }
- /// Initializes a gRPC channel
- ///
- /// - Parameter address: the address of the server to be called
- /// - Parameter certificates: a PEM representation of certificates to use.
- /// - Parameter clientCertificates: a PEM representation of the client certificates to use
- /// - Parameter clientKey: a PEM representation of the client key to use
- /// - Parameter arguments: list of channel configuration options
- public init(address: String, certificates: String = roots_pem(), clientCertificates: String? = nil, clientKey: String? = nil, arguments: [Argument] = []) {
- gRPC.initialize()
- host = address
- let argumentWrappers = arguments.map { $0.toCArg() }
- underlyingChannel = withExtendedLifetime(argumentWrappers) {
- var argumentValues = argumentWrappers.map { $0.wrapped }
- return cgrpc_channel_create_secure(address, certificates, clientCertificates, clientKey, &argumentValues, Int32(arguments.count))
- }
- completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
- completionQueue.run() // start a loop that watches the channel's completion queue
- }
- deinit {
- connectivityObservers.forEach { $0.shutdown() }
- cgrpc_channel_destroy(underlyingChannel)
- completionQueue.shutdown()
- }
- /// Constructs a Call object to make a gRPC API call
- ///
- /// - Parameter method: the gRPC method name for the call
- /// - Parameter host: the gRPC host name for the call. If unspecified, defaults to the Client host
- /// - Parameter timeout: a timeout value in seconds
- /// - Returns: a Call object that can be used to perform the request
- public func makeCall(_ method: String, host: String = "", timeout: TimeInterval? = nil) -> Call {
- let host = (host == "") ? self.host : host
- let timeout = timeout ?? self.timeout
- let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
- return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
- }
- /// Check the current connectivity state
- ///
- /// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle
- /// - Returns: a ConnectivityState value representing the current connectivity state of the channel
- public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
- return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
- }
- /// Subscribe to connectivity state changes
- ///
- /// - Parameter callback: block executed every time a new connectivity state is detected
- public func subscribe(callback: @escaping (ConnectivityState) -> Void) {
- connectivityObservers.append(ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback))
- }
- }
- private extension Channel {
- class ConnectivityObserver {
- private let completionQueue: CompletionQueue
- private let underlyingChannel: UnsafeMutableRawPointer
- private let underlyingCompletionQueue: UnsafeMutableRawPointer
- private let callback: (ConnectivityState) -> Void
- private var lastState: ConnectivityState
- private var hasBeenShutdown = false
- private let stateMutex: Mutex = Mutex()
- init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) {
- self.underlyingChannel = underlyingChannel
- self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
- self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
- self.callback = callback
- self.lastState = currentState
- run()
- }
- deinit {
- shutdown()
- }
- private func run() {
- let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread")
- spinloopThreadQueue.async {
- while true {
- guard (self.stateMutex.synchronize{ !self.hasBeenShutdown }) else {
- return
- }
-
- guard let underlyingState = self.lastState.underlyingState else { return }
- let deadline: TimeInterval = 0.2
- cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
- let event = self.completionQueue.wait(timeout: deadline)
-
- guard (self.stateMutex.synchronize{ !self.hasBeenShutdown }) else {
- return
- }
- switch event.type {
- case .complete:
- let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
- if newState != self.lastState {
- self.callback(newState)
- }
- self.lastState = newState
- case .queueTimeout:
- continue
- case .queueShutdown:
- return
- default:
- continue
- }
- }
- }
- }
- func shutdown() {
- stateMutex.synchronize {
- hasBeenShutdown = true
- }
- completionQueue.shutdown()
- }
- }
- }
- extension Channel {
- public enum ConnectivityState {
- /// Channel has just been initialized
- case initialized
- /// Channel is idle
- case idle
- /// Channel is connecting
- case connecting
- /// Channel is ready for work
- case ready
- /// Channel has seen a failure but expects to recover
- case transientFailure
- /// Channel has seen a failure that it cannot recover from
- case shutdown
- /// Channel connectivity state is unknown
- case unknown
- fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
- switch value {
- case GRPC_CHANNEL_INIT:
- return .initialized
- case GRPC_CHANNEL_IDLE:
- return .idle
- case GRPC_CHANNEL_CONNECTING:
- return .connecting
- case GRPC_CHANNEL_READY:
- return .ready
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- return .transientFailure
- case GRPC_CHANNEL_SHUTDOWN:
- return .shutdown
- default:
- return .unknown
- }
- }
- fileprivate var underlyingState: grpc_connectivity_state? {
- switch self {
- case .initialized:
- return GRPC_CHANNEL_INIT
- case .idle:
- return GRPC_CHANNEL_IDLE
- case .connecting:
- return GRPC_CHANNEL_CONNECTING
- case .ready:
- return GRPC_CHANNEL_READY
- case .transientFailure:
- return GRPC_CHANNEL_TRANSIENT_FAILURE
- case .shutdown:
- return GRPC_CHANNEL_SHUTDOWN
- default:
- return nil
- }
- }
- }
- }
|