Channel.swift 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. /// A gRPC Channel
  22. public class Channel {
  23. /// Pointer to underlying C representation
  24. private let underlyingChannel: UnsafeMutableRawPointer
  25. /// Completion queue for channel call operations
  26. private let completionQueue: CompletionQueue
  27. /// Timeout for new calls
  28. public var timeout: TimeInterval = 600.0
  29. /// Default host to use for new calls
  30. public var host: String
  31. /// Connectivity state observers
  32. private var connectivityObservers: [ConnectivityObserver] = []
  33. /// Initializes a gRPC channel
  34. ///
  35. /// - Parameter address: the address of the server to be called
  36. /// - Parameter secure: if true, use TLS
  37. /// - Parameter arguments: list of channel configuration options
  38. public init(address: String, secure: Bool = true, arguments: [Argument] = []) {
  39. gRPC.initialize()
  40. host = address
  41. let argumentWrappers = arguments.map { $0.toCArg() }
  42. var argumentValues = argumentWrappers.map { $0.wrapped }
  43. if secure {
  44. underlyingChannel = cgrpc_channel_create_secure(address, roots_pem(), nil, nil, &argumentValues, Int32(arguments.count))
  45. } else {
  46. underlyingChannel = cgrpc_channel_create(address, &argumentValues, Int32(arguments.count))
  47. }
  48. completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
  49. completionQueue.run() // start a loop that watches the channel's completion queue
  50. }
  51. /// Initializes a gRPC channel
  52. ///
  53. /// - Parameter address: the address of the server to be called
  54. /// - Parameter arguments: list of channel configuration options
  55. public init(googleAddress: String, arguments: [Argument] = []) {
  56. gRPC.initialize()
  57. host = googleAddress
  58. let argumentWrappers = arguments.map { $0.toCArg() }
  59. var argumentValues = argumentWrappers.map { $0.wrapped }
  60. underlyingChannel = cgrpc_channel_create_google(googleAddress, &argumentValues, Int32(arguments.count))
  61. completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
  62. completionQueue.run() // start a loop that watches the channel's completion queue
  63. }
  64. /// Initializes a gRPC channel
  65. ///
  66. /// - Parameter address: the address of the server to be called
  67. /// - Parameter certificates: a PEM representation of certificates to use
  68. /// - Parameter clientCertificates: a PEM representation of the client certificates to use
  69. /// - Parameter clientKey: a PEM representation of the client key to use
  70. /// - Parameter arguments: list of channel configuration options
  71. public init(address: String, certificates: String, clientCertificates: String? = nil, clientKey: String? = nil, arguments: [Argument] = []) {
  72. gRPC.initialize()
  73. host = address
  74. let argumentWrappers = arguments.map { $0.toCArg() }
  75. var argumentValues = argumentWrappers.map { $0.wrapped }
  76. underlyingChannel = cgrpc_channel_create_secure(address, certificates, clientCertificates, clientKey, &argumentValues, Int32(arguments.count))
  77. completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
  78. completionQueue.run() // start a loop that watches the channel's completion queue
  79. }
  80. deinit {
  81. connectivityObservers.forEach { $0.shutdown() }
  82. cgrpc_channel_destroy(underlyingChannel)
  83. completionQueue.shutdown()
  84. }
  85. /// Constructs a Call object to make a gRPC API call
  86. ///
  87. /// - Parameter method: the gRPC method name for the call
  88. /// - Parameter host: the gRPC host name for the call. If unspecified, defaults to the Client host
  89. /// - Parameter timeout: a timeout value in seconds
  90. /// - Returns: a Call object that can be used to perform the request
  91. public func makeCall(_ method: String, host: String = "", timeout: TimeInterval? = nil) -> Call {
  92. let host = (host == "") ? self.host : host
  93. let timeout = timeout ?? self.timeout
  94. let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
  95. return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
  96. }
  97. /// Check the current connectivity state
  98. ///
  99. /// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle
  100. /// - Returns: a ConnectivityState value representing the current connectivity state of the channel
  101. public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
  102. return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
  103. }
  104. /// Subscribe to connectivity state changes
  105. ///
  106. /// - Parameter callback: block executed every time a new connectivity state is detected
  107. public func subscribe(callback: @escaping (ConnectivityState) -> Void) {
  108. connectivityObservers.append(ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback))
  109. }
  110. }
  111. private extension Channel {
  112. class ConnectivityObserver {
  113. private let completionQueue: CompletionQueue
  114. private let underlyingChannel: UnsafeMutableRawPointer
  115. private let underlyingCompletionQueue: UnsafeMutableRawPointer
  116. private let callback: (ConnectivityState) -> Void
  117. private var lastState: ConnectivityState
  118. private var hasBeenShutdown = false
  119. private let stateMutex: Mutex = Mutex()
  120. init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) {
  121. self.underlyingChannel = underlyingChannel
  122. self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
  123. self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
  124. self.callback = callback
  125. self.lastState = currentState
  126. run()
  127. }
  128. deinit {
  129. shutdown()
  130. }
  131. private func run() {
  132. let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread")
  133. spinloopThreadQueue.async {
  134. while true {
  135. guard (self.stateMutex.synchronize{ !self.hasBeenShutdown }) else {
  136. return
  137. }
  138. guard let underlyingState = self.lastState.underlyingState else { return }
  139. let deadline: TimeInterval = 0.2
  140. cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
  141. let event = self.completionQueue.wait(timeout: deadline)
  142. guard (self.stateMutex.synchronize{ !self.hasBeenShutdown }) else {
  143. return
  144. }
  145. switch event.type {
  146. case .complete:
  147. let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
  148. if newState != self.lastState {
  149. self.callback(newState)
  150. }
  151. self.lastState = newState
  152. case .queueTimeout:
  153. continue
  154. case .queueShutdown:
  155. return
  156. default:
  157. continue
  158. }
  159. }
  160. }
  161. }
  162. func shutdown() {
  163. stateMutex.synchronize {
  164. hasBeenShutdown = true
  165. }
  166. completionQueue.shutdown()
  167. }
  168. }
  169. }
  170. extension Channel {
  171. public enum ConnectivityState {
  172. /// Channel has just been initialized
  173. case initialized
  174. /// Channel is idle
  175. case idle
  176. /// Channel is connecting
  177. case connecting
  178. /// Channel is ready for work
  179. case ready
  180. /// Channel has seen a failure but expects to recover
  181. case transientFailure
  182. /// Channel has seen a failure that it cannot recover from
  183. case shutdown
  184. /// Channel connectivity state is unknown
  185. case unknown
  186. fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
  187. switch value {
  188. case GRPC_CHANNEL_INIT:
  189. return .initialized
  190. case GRPC_CHANNEL_IDLE:
  191. return .idle
  192. case GRPC_CHANNEL_CONNECTING:
  193. return .connecting
  194. case GRPC_CHANNEL_READY:
  195. return .ready
  196. case GRPC_CHANNEL_TRANSIENT_FAILURE:
  197. return .transientFailure
  198. case GRPC_CHANNEL_SHUTDOWN:
  199. return .shutdown
  200. default:
  201. return .unknown
  202. }
  203. }
  204. fileprivate var underlyingState: grpc_connectivity_state? {
  205. switch self {
  206. case .initialized:
  207. return GRPC_CHANNEL_INIT
  208. case .idle:
  209. return GRPC_CHANNEL_IDLE
  210. case .connecting:
  211. return GRPC_CHANNEL_CONNECTING
  212. case .ready:
  213. return GRPC_CHANNEL_READY
  214. case .transientFailure:
  215. return GRPC_CHANNEL_TRANSIENT_FAILURE
  216. case .shutdown:
  217. return GRPC_CHANNEL_SHUTDOWN
  218. default:
  219. return nil
  220. }
  221. }
  222. }
  223. }