Channel.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. /// A gRPC Channel
  22. public class Channel {
  23. /// Pointer to underlying C representation
  24. private let underlyingChannel: UnsafeMutableRawPointer
  25. /// Completion queue for channel call operations
  26. private let completionQueue: CompletionQueue
  27. /// Timeout for new calls
  28. public var timeout: TimeInterval = 600.0
  29. /// Default host to use for new calls
  30. public var host: String
  31. /// Connectivity state observers
  32. private var observers: [ConnectivityObserver] = []
  33. /// Initializes a gRPC channel
  34. ///
  35. /// - Parameter address: the address of the server to be called
  36. /// - Parameter secure: if true, use TLS
  37. public init(address: String, secure: Bool = true) {
  38. host = address
  39. if secure {
  40. underlyingChannel = cgrpc_channel_create_secure(address, roots_pem(), nil)
  41. } else {
  42. underlyingChannel = cgrpc_channel_create(address)
  43. }
  44. completionQueue = CompletionQueue(
  45. underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
  46. completionQueue.run() // start a loop that watches the channel's completion queue
  47. }
  48. /// Initializes a gRPC channel
  49. ///
  50. /// - Parameter address: the address of the server to be called
  51. /// - Parameter certificates: a PEM representation of certificates to use
  52. /// - Parameter host: an optional hostname override
  53. public init(address: String, certificates: String, host: String?) {
  54. self.host = address
  55. underlyingChannel = cgrpc_channel_create_secure(address, certificates, host)
  56. completionQueue = CompletionQueue(
  57. underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
  58. completionQueue.run() // start a loop that watches the channel's completion queue
  59. }
  60. deinit {
  61. cgrpc_channel_destroy(underlyingChannel)
  62. completionQueue.shutdown()
  63. }
  64. /// Constructs a Call object to make a gRPC API call
  65. ///
  66. /// - Parameter method: the gRPC method name for the call
  67. /// - Parameter host: the gRPC host name for the call. If unspecified, defaults to the Client host
  68. /// - Parameter timeout: a timeout value in seconds
  69. /// - Returns: a Call object that can be used to perform the request
  70. public func makeCall(_ method: String, host: String = "") -> Call {
  71. let host = (host == "") ? self.host : host
  72. let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
  73. return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
  74. }
  75. public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
  76. return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
  77. }
  78. public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) {
  79. var observer = observers.first(where: { $0.state == sourceState })
  80. if observer == nil {
  81. let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect)
  82. observers.append(newObserver)
  83. observer = newObserver
  84. }
  85. observer?.callbacks.append(callback)
  86. observer?.polling = true
  87. }
  88. }
  89. private extension Channel {
  90. class ConnectivityObserver: Equatable {
  91. let state: ConnectivityState
  92. let queue: CompletionQueue
  93. let underlyingChannel: UnsafeMutableRawPointer
  94. let underlyingCompletionQueue: UnsafeMutableRawPointer
  95. private(set) var tryToConnect: Bool
  96. var callbacks: [(ConnectivityState) -> ()] = []
  97. private var lastState: ConnectivityState
  98. var polling: Bool = false {
  99. didSet {
  100. if polling == true && oldValue == false {
  101. run()
  102. }
  103. }
  104. }
  105. init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) {
  106. self.state = state
  107. self.underlyingChannel = underlyingChannel
  108. self.tryToConnect = tryToConnect
  109. self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
  110. self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
  111. self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
  112. }
  113. deinit {
  114. queue.shutdown()
  115. }
  116. private func run() {
  117. DispatchQueue.global().async { [weak self] in
  118. guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return }
  119. while self.polling {
  120. guard !self.callbacks.isEmpty && !self.tryToConnect else {
  121. self.polling = false
  122. break
  123. }
  124. defer { self.tryToConnect = false }
  125. let deadline: TimeInterval = 0.2
  126. cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
  127. let event = self.queue.wait(timeout: deadline)
  128. if event.success == 1 || self.tryToConnect {
  129. let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0))
  130. guard newState != self.lastState else { continue }
  131. defer { self.lastState = newState }
  132. if self.lastState == self.state {
  133. self.callbacks.forEach({ $0(newState) })
  134. }
  135. }
  136. }
  137. }
  138. }
  139. static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool {
  140. return lhs.state == rhs.state
  141. }
  142. }
  143. }
  144. extension Channel {
  145. public enum ConnectivityState {
  146. /// Channel has just been initialized
  147. case initialized
  148. /// Channel is idle
  149. case idle
  150. /// Channel is connecting
  151. case connecting
  152. /// Channel is ready for work
  153. case ready
  154. /// Channel has seen a failure but expects to recover
  155. case transientFailure
  156. /// Channel has seen a failure that it cannot recover from
  157. case shutdown
  158. /// Channel connectivity state is unknown
  159. case unknown
  160. fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
  161. switch value {
  162. case GRPC_CHANNEL_INIT:
  163. return .initialized
  164. case GRPC_CHANNEL_IDLE:
  165. return .idle
  166. case GRPC_CHANNEL_CONNECTING:
  167. return .connecting
  168. case GRPC_CHANNEL_READY:
  169. return .ready
  170. case GRPC_CHANNEL_TRANSIENT_FAILURE:
  171. return .transientFailure
  172. case GRPC_CHANNEL_SHUTDOWN:
  173. return .shutdown
  174. default:
  175. return .unknown
  176. }
  177. }
  178. fileprivate var underlyingState: grpc_connectivity_state? {
  179. switch self {
  180. case .initialized:
  181. return GRPC_CHANNEL_INIT
  182. case .idle:
  183. return GRPC_CHANNEL_IDLE
  184. case .connecting:
  185. return GRPC_CHANNEL_CONNECTING
  186. case .ready:
  187. return GRPC_CHANNEL_READY
  188. case .transientFailure:
  189. return GRPC_CHANNEL_TRANSIENT_FAILURE
  190. case .shutdown:
  191. return GRPC_CHANNEL_SHUTDOWN
  192. default:
  193. return nil
  194. }
  195. }
  196. }
  197. }