Channel.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. #endif
  19. import Foundation
  20. /// Used to hold weak references to objects since `NSHashTable<T>.weakObjects()` isn't available on Linux.
  21. /// If/when this type becomes available on Linux, this should be replaced.
  22. private final class WeakReference<T: AnyObject> {
  23. private(set) weak var value: T?
  24. init(value: T) {
  25. self.value = value
  26. }
  27. }
  28. /// A gRPC Channel
  29. public class Channel {
  30. private let mutex = Mutex()
  31. /// Pointer to underlying C representation
  32. private let underlyingChannel: UnsafeMutableRawPointer
  33. /// Completion queue for channel call operations
  34. private let completionQueue: CompletionQueue
  35. /// Weak references to API calls using this channel that are in-flight
  36. private var activeCalls = [WeakReference<Call>]()
  37. /// Observer for connectivity state changes. Created lazily if needed
  38. private var connectivityObserver: ConnectivityObserver?
  39. /// Whether the gRPC channel has been shut down
  40. private var hasBeenShutdown = false
  41. /// Timeout for new calls
  42. public var timeout: TimeInterval = 600.0
  43. /// Default host to use for new calls
  44. public var host: String
  45. /// Errors that may be thrown by the channel
  46. enum Error: Swift.Error {
  47. /// Action cannot be performed because the channel has already been shut down
  48. case alreadyShutdown
  49. /// Failed to create a new call within the gRPC stack
  50. case callCreationFailed
  51. }
  52. /// Initializes a gRPC channel
  53. ///
  54. /// - Parameter address: the address of the server to be called
  55. /// - Parameter secure: if true, use TLS
  56. /// - Parameter arguments: list of channel configuration options
  57. public convenience init(address: String, secure: Bool = true, arguments: [Argument] = []) {
  58. gRPC.initialize()
  59. let argumentWrappers = arguments.map { $0.toCArg() }
  60. self.init(host: address, underlyingChannel: withExtendedLifetime(argumentWrappers) {
  61. var argumentValues = argumentWrappers.map { $0.wrapped }
  62. if secure {
  63. return cgrpc_channel_create_secure(address, kRootCertificates, nil, nil, &argumentValues, Int32(arguments.count))
  64. } else {
  65. return cgrpc_channel_create(address, &argumentValues, Int32(arguments.count))
  66. }
  67. })
  68. }
  69. /// Initializes a gRPC channel
  70. ///
  71. /// - Parameter address: the address of the server to be called
  72. /// - Parameter arguments: list of channel configuration options
  73. public convenience init(googleAddress: String, arguments: [Argument] = []) {
  74. gRPC.initialize()
  75. let argumentWrappers = arguments.map { $0.toCArg() }
  76. self.init(host: googleAddress, underlyingChannel: withExtendedLifetime(argumentWrappers) {
  77. var argumentValues = argumentWrappers.map { $0.wrapped }
  78. return cgrpc_channel_create_google(googleAddress, &argumentValues, Int32(arguments.count))
  79. })
  80. }
  81. /// Initializes a gRPC channel
  82. ///
  83. /// - Parameter address: the address of the server to be called
  84. /// - Parameter certificates: a PEM representation of certificates to use.
  85. /// - Parameter clientCertificates: a PEM representation of the client certificates to use
  86. /// - Parameter clientKey: a PEM representation of the client key to use
  87. /// - Parameter arguments: list of channel configuration options
  88. public convenience init(address: String, certificates: String = kRootCertificates, clientCertificates: String? = nil, clientKey: String? = nil, arguments: [Argument] = []) {
  89. gRPC.initialize()
  90. let argumentWrappers = arguments.map { $0.toCArg() }
  91. self.init(host: address, underlyingChannel: withExtendedLifetime(argumentWrappers) {
  92. var argumentValues = argumentWrappers.map { $0.wrapped }
  93. return cgrpc_channel_create_secure(address, certificates, clientCertificates, clientKey, &argumentValues, Int32(arguments.count))
  94. })
  95. }
  96. /// Shut down the channel. No new calls may be made using this channel after it is shut down. Any in-flight calls using this channel will be canceled
  97. public func shutdown() {
  98. self.mutex.synchronize {
  99. guard !self.hasBeenShutdown else { return }
  100. self.hasBeenShutdown = true
  101. self.connectivityObserver?.shutdown()
  102. cgrpc_channel_destroy(self.underlyingChannel)
  103. self.completionQueue.shutdown()
  104. self.activeCalls.forEach { $0.value?.cancel() }
  105. }
  106. }
  107. deinit {
  108. self.shutdown()
  109. }
  110. /// Constructs a Call object to make a gRPC API call
  111. ///
  112. /// - Parameter method: the gRPC method name for the call
  113. /// - Parameter host: the gRPC host name for the call. If unspecified, defaults to the Client host
  114. /// - Parameter timeout: a timeout value in seconds
  115. /// - Returns: a Call object that can be used to perform the request
  116. public func makeCall(_ method: String, host: String? = nil, timeout: TimeInterval? = nil) throws -> Call {
  117. self.mutex.lock()
  118. defer { self.mutex.unlock() }
  119. guard !self.hasBeenShutdown else {
  120. throw Error.alreadyShutdown
  121. }
  122. guard let underlyingCall = cgrpc_channel_create_call(
  123. self.underlyingChannel, method, host ?? self.host, timeout ?? self.timeout)
  124. else { throw Error.callCreationFailed }
  125. let call = Call(underlyingCall: underlyingCall, owned: true, completionQueue: self.completionQueue)
  126. self.activeCalls.append(WeakReference(value: call))
  127. return call
  128. }
  129. /// Check the current connectivity state
  130. ///
  131. /// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle
  132. /// - Returns: a ConnectivityState value representing the current connectivity state of the channel
  133. public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
  134. return ConnectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
  135. }
  136. /// Subscribe to connectivity state changes
  137. ///
  138. /// - Parameter callback: block executed every time a new connectivity state is detected
  139. public func addConnectivityObserver(callback: @escaping (ConnectivityState) -> Void) {
  140. self.mutex.synchronize {
  141. let observer: ConnectivityObserver
  142. if let existingObserver = self.connectivityObserver {
  143. observer = existingObserver
  144. } else {
  145. observer = ConnectivityObserver(underlyingChannel: self.underlyingChannel)
  146. self.connectivityObserver = observer
  147. }
  148. observer.addConnectivityObserver(callback: callback)
  149. }
  150. }
  151. // MARK: - Private
  152. private init(host: String, underlyingChannel: UnsafeMutableRawPointer) {
  153. self.host = host
  154. self.underlyingChannel = underlyingChannel
  155. self.completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel),
  156. name: "Client")
  157. self.completionQueue.run()
  158. self.scheduleActiveCallCleanUp()
  159. }
  160. private func scheduleActiveCallCleanUp() {
  161. DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 10.0) { [weak self] in
  162. self?.cleanUpActiveCalls()
  163. }
  164. }
  165. private func cleanUpActiveCalls() {
  166. self.mutex.synchronize {
  167. self.activeCalls = self.activeCalls.filter { $0.value != nil }
  168. }
  169. self.scheduleActiveCallCleanUp()
  170. }
  171. }