ChannelConnectivityObserver.swift 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. extension Channel {
  22. /// Provides an interface for observing the connectivity of a given channel.
  23. final class ConnectivityObserver {
  24. private let mutex = Mutex()
  25. private let completionQueue: CompletionQueue
  26. private let underlyingChannel: UnsafeMutableRawPointer
  27. private let underlyingCompletionQueue: UnsafeMutableRawPointer
  28. private var callbacks = [(ConnectivityState) -> Void]()
  29. private var hasBeenShutdown = false
  30. init(underlyingChannel: UnsafeMutableRawPointer) {
  31. self.underlyingChannel = underlyingChannel
  32. self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
  33. self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue,
  34. name: "Connectivity State")
  35. self.run()
  36. }
  37. deinit {
  38. self.shutdown()
  39. }
  40. func addConnectivityObserver(callback: @escaping (ConnectivityState) -> Void) {
  41. self.mutex.synchronize {
  42. self.callbacks.append(callback)
  43. }
  44. }
  45. func shutdown() {
  46. self.mutex.synchronize {
  47. guard !self.hasBeenShutdown else { return }
  48. self.hasBeenShutdown = true
  49. self.completionQueue.shutdown()
  50. }
  51. }
  52. // MARK: - Private
  53. private func run() {
  54. let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread")
  55. var lastState = ConnectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
  56. spinloopThreadQueue.async {
  57. while (self.mutex.synchronize { !self.hasBeenShutdown }) {
  58. guard let underlyingState = lastState.underlyingState else { return }
  59. let deadline: TimeInterval = 0.2
  60. cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue,
  61. underlyingState, deadline, nil)
  62. let event = self.completionQueue.wait(timeout: deadline)
  63. guard (self.mutex.synchronize { !self.hasBeenShutdown }) else {
  64. return
  65. }
  66. switch event.type {
  67. case .complete:
  68. let newState = ConnectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
  69. guard newState != lastState else { continue }
  70. let callbacks = self.mutex.synchronize { Array(self.callbacks) }
  71. lastState = newState
  72. callbacks.forEach { callback in callback(newState) }
  73. case .queueShutdown:
  74. return
  75. default:
  76. continue
  77. }
  78. }
  79. }
  80. }
  81. }
  82. }