瀏覽代碼

Allow the connectivity state delegate queue to be specified (#849)

Motivation:

We create a queue on which the connectivity state delegate is called; we
should let users specify the queue to use.

Modifications:

- Allow the queue to be set in configuration / in the builder

Result:

Users can choose the DispatchQueue their connectivity state delegate is
executed on.
George Barnett 5 年之前
父節點
當前提交
7e7fdb183e

+ 8 - 0
Sources/GRPC/ClientConnection.swift

@@ -234,6 +234,10 @@ extension ClientConnection {
     /// A delegate which is called when the connectivity state is changed.
     public var connectivityStateDelegate: ConnectivityStateDelegate?
 
+    /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is
+    /// provided but the queue is `nil` then one will be created by gRPC.
+    public var connectivityStateDelegateQueue: DispatchQueue?
+
     /// TLS configuration for this connection. `nil` if TLS is not desired.
     public var tls: TLS?
 
@@ -264,6 +268,8 @@ extension ClientConnection {
     /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
     ///     on debug builds.
     /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
+    /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the
+    ///     `connectivityStateDelegate`.
     /// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
     /// - Parameter connectionBackoff: The connection backoff configuration to use.
     /// - Parameter messageEncoding: Message compression configuration, defaults to no compression.
@@ -273,6 +279,7 @@ extension ClientConnection {
       eventLoopGroup: EventLoopGroup,
       errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
       connectivityStateDelegate: ConnectivityStateDelegate? = nil,
+      connectivityStateDelegateQueue: DispatchQueue? = nil,
       tls: Configuration.TLS? = nil,
       connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
       connectionIdleTimeout: TimeAmount = .minutes(5),
@@ -282,6 +289,7 @@ extension ClientConnection {
       self.eventLoopGroup = eventLoopGroup
       self.errorDelegate = errorDelegate
       self.connectivityStateDelegate = connectivityStateDelegate
+      self.connectivityStateDelegateQueue = connectivityStateDelegateQueue
       self.tls = tls
       self.connectionBackoff = connectionBackoff
       self.connectionIdleTimeout = connectionIdleTimeout

+ 4 - 1
Sources/GRPC/ConnectionManager.swift

@@ -212,7 +212,10 @@ internal class ConnectionManager {
     let eventLoop = configuration.eventLoopGroup.next()
     self.eventLoop = eventLoop
     self.state = .idle(IdleState(configuration: configuration))
-    self.monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
+    self.monitor = ConnectivityStateMonitor(
+      delegate: configuration.connectivityStateDelegate,
+      queue: configuration.connectivityStateDelegateQueue
+    )
 
     self.channelProvider = channelProvider
 

+ 3 - 2
Sources/GRPC/ConnectivityState.swift

@@ -66,9 +66,10 @@ public class ConnectivityStateMonitor {
   /// Creates a new connectivity state monitor.
   ///
   /// - Parameter delegate: A delegate to call when the connectivity state changes.
-  init(delegate: ConnectivityStateDelegate?) {
+  /// - Parameter queue: The `DispatchQueue` on which the delegate will be called.
+  init(delegate: ConnectivityStateDelegate?, queue: DispatchQueue?) {
     self._delegate = delegate
-    self.delegateCallbackQueue = DispatchQueue(label: "io.grpc.connectivity")
+    self.delegateCallbackQueue = queue ?? DispatchQueue(label: "io.grpc.connectivity")
   }
 
   /// The current state of connectivity.

+ 12 - 3
Sources/GRPC/GRPCChannel/GRPCChannelBuilder.swift

@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import Dispatch
 import NIO
 import NIOSSL
 
@@ -36,6 +37,7 @@ extension ClientConnection {
     private var connectionBackoffIsEnabled = true
     private var errorDelegate: ClientErrorDelegate?
     private var connectivityStateDelegate: ConnectivityStateDelegate?
+    private var connectivityStateDelegateQueue: DispatchQueue?
     private var connectionIdleTimeout: TimeAmount = .minutes(5)
     private var httpTargetWindowSize: Int = 65535
 
@@ -49,6 +51,7 @@ extension ClientConnection {
         eventLoopGroup: self.group,
         errorDelegate: self.errorDelegate,
         connectivityStateDelegate: self.connectivityStateDelegate,
+        connectivityStateDelegateQueue: self.connectivityStateDelegateQueue,
         tls: self.maybeTLS,
         connectionBackoff: self.connectionBackoffIsEnabled ? self.connectionBackoff : nil,
         connectionIdleTimeout: self.connectionIdleTimeout,
@@ -164,10 +167,16 @@ extension ClientConnection.Builder {
 }
 
 extension ClientConnection.Builder {
-  /// Sets the client connectivity state delegate.
-  @discardableResult
-  public func withConnectivityStateDelegate(_ delegate: ConnectivityStateDelegate?) -> Self {
+  /// Sets the client connectivity state delegate and the `DispatchQueue` on which the delegate
+  /// should be called. If no `queue` is provided then gRPC will create a `DispatchQueue` on which
+  /// to run the delegate.
+  @discardableResult
+  public func withConnectivityStateDelegate(
+    _ delegate: ConnectivityStateDelegate?,
+    executingOn queue: DispatchQueue? = nil
+  ) -> Self {
     self.connectivityStateDelegate = delegate
+    self.connectivityStateDelegateQueue = queue
     return self
   }
 }

+ 1 - 1
Tests/GRPCTests/ConnectivityStateMonitorTests.swift

@@ -33,7 +33,7 @@ class ConnectivityStateMonitorTests: GRPCTestCase {
       ])
     }
 
-    let monitor = ConnectivityStateMonitor(delegate: recorder)
+    let monitor = ConnectivityStateMonitor(delegate: recorder, queue: nil)
     monitor.delegate = recorder
 
     monitor.updateState(to: .connecting, logger: self.logger)