Browse Source

Expose APIs to customise NIOTS bootstraps (#2223)

`swift-nio-transport-services` added support for customising the
`NWParameters` used in the underlying connections in
https://github.com/apple/swift-nio-transport-services/pull/230.

This PR adds API in `grpc-swift` v1 to allow users to customise the
bootstraps used when creating gRPC connections using NIOTS. This will
allow them to, for example, customise their `NWParameters` using the new
APIs in `swift-nio-transport-services`.
Gus Cairo 9 months ago
parent
commit
3bbd57ba37

+ 1 - 1
Package.swift

@@ -40,7 +40,7 @@ let packageDependencies: [Package.Dependency] = [
   ),
   .package(
     url: "https://github.com/apple/swift-nio-transport-services.git",
-    from: "1.15.0"
+    from: "1.24.0"
   ),
   .package(
     url: "https://github.com/apple/swift-nio-extras.git",

+ 19 - 0
Sources/GRPC/ClientConnection.swift

@@ -33,6 +33,10 @@ import Foundation
 import NIOSSL
 #endif
 
+#if canImport(Network)
+import Network
+#endif
+
 /// Provides a single, managed connection to a server which is guaranteed to always use the same
 /// `EventLoop`.
 ///
@@ -469,6 +473,21 @@ extension ClientConnection {
     @preconcurrency
     public var debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
 
+    #if canImport(Network)
+    /// A closure allowing to customise the `NWParameters` used when establishing a connection using `NIOTransportServices`.
+    @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
+    public var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)? {
+      get {
+        self._nwParametersConfigurator as! (@Sendable (NWParameters) -> Void)?
+      }
+      set {
+        self._nwParametersConfigurator = newValue
+      }
+    }
+
+    private var _nwParametersConfigurator: (any Sendable)?
+    #endif
+
     #if canImport(NIOSSL)
     /// Create a `Configuration` with some pre-defined defaults. Prefer using
     /// `ClientConnection.secure(group:)` to build a connection secured with TLS or

+ 65 - 0
Sources/GRPC/ConnectionManagerChannelProvider.swift

@@ -22,6 +22,10 @@ import NIOTransportServices
 import NIOSSL
 #endif
 
+#if canImport(Network)
+import Network
+#endif
+
 @usableFromInline
 internal protocol ConnectionManagerChannelProvider {
   /// Make an `EventLoopFuture<Channel>`.
@@ -72,6 +76,52 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider {
   @usableFromInline
   internal var debugChannelInitializer: Optional<(Channel) -> EventLoopFuture<Void>>
 
+  #if canImport(Network)
+  @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
+  @usableFromInline
+  internal var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)? {
+    get {
+      self._nwParametersConfigurator as! (@Sendable (NWParameters) -> Void)?
+    }
+    set {
+      self._nwParametersConfigurator = newValue
+    }
+  }
+
+  private var _nwParametersConfigurator: (any Sendable)?
+  #endif
+
+  #if canImport(Network)
+  @inlinable
+  @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
+  internal init(
+    connectionTarget: ConnectionTarget,
+    connectionKeepalive: ClientConnectionKeepalive,
+    connectionIdleTimeout: TimeAmount,
+    tlsMode: TLSMode,
+    tlsConfiguration: GRPCTLSConfiguration?,
+    httpTargetWindowSize: Int,
+    httpMaxFrameSize: Int,
+    errorDelegate: ClientErrorDelegate?,
+    debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?,
+    nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
+  ) {
+    self.init(
+      connectionTarget: connectionTarget,
+      connectionKeepalive: connectionKeepalive,
+      connectionIdleTimeout: connectionIdleTimeout,
+      tlsMode: tlsMode,
+      tlsConfiguration: tlsConfiguration,
+      httpTargetWindowSize: httpTargetWindowSize,
+      httpMaxFrameSize: httpMaxFrameSize,
+      errorDelegate: errorDelegate,
+      debugChannelInitializer: debugChannelInitializer
+    )
+
+    self.nwParametersConfigurator = nwParametersConfigurator
+  }
+  #endif
+
   @inlinable
   internal init(
     connectionTarget: ConnectionTarget,
@@ -133,6 +183,12 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider {
       errorDelegate: configuration.errorDelegate,
       debugChannelInitializer: configuration.debugChannelInitializer
     )
+
+    #if canImport(Network)
+    if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *) {
+      self.nwParametersConfigurator = configuration.nwParametersConfigurator
+    }
+    #endif
   }
 
   private var serverHostname: String? {
@@ -222,6 +278,15 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider {
       _ = bootstrap.connectTimeout(connectTimeout)
     }
 
+    #if canImport(Network)
+    if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *),
+      let configurator = self.nwParametersConfigurator,
+      let transportServicesBootstrap = bootstrap as? NIOTSConnectionBootstrap
+    {
+      _ = transportServicesBootstrap.configureNWParameters(configurator)
+    }
+    #endif
+
     return bootstrap.connect(to: self.connectionTarget)
   }
 }

+ 39 - 0
Sources/GRPC/ConnectionPool/GRPCChannelPool.swift

@@ -19,6 +19,10 @@ import NIOPosix
 
 import struct Foundation.UUID
 
+#if canImport(Network)
+import Network
+#endif
+
 public enum GRPCChannelPool {
   /// Make a new ``GRPCChannel`` on which calls may be made to gRPC services.
   ///
@@ -191,6 +195,12 @@ extension GRPCChannelPool {
         return SwiftLogNoOpLogHandler()
       }
     )
+
+    #if canImport(Network)
+    /// `TransportServices` related configuration. This will be ignored unless an appropriate event loop group
+    /// (e.g. `NIOTSEventLoopGroup`) is used.
+    public var transportServices: TransportServices = .defaults
+    #endif
   }
 }
 
@@ -299,6 +309,35 @@ extension GRPCChannelPool.Configuration {
   }
 }
 
+#if canImport(Network)
+extension GRPCChannelPool.Configuration {
+  public struct TransportServices: Sendable {
+    /// Default transport services configuration.
+    public static let defaults = Self()
+
+    @inlinable
+    public static func with(_ configure: (inout Self) -> Void) -> Self {
+      var configuration = Self.defaults
+      configure(&configuration)
+      return configuration
+    }
+
+    /// A closure allowing to customise the `NWParameters` used when establishing a connection using `NIOTransportServices`.
+    @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
+    public var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)? {
+      get {
+        self._nwParametersConfigurator as! (@Sendable (NWParameters) -> Void)?
+      }
+      set {
+        self._nwParametersConfigurator = newValue
+      }
+    }
+
+    private var _nwParametersConfigurator: (any Sendable)?
+  }
+}
+#endif  // canImport(Network)
+
 /// The ID of a connection in the connection pool.
 public struct GRPCConnectionID: Hashable, Sendable, CustomStringConvertible {
   private enum Value: Sendable, Hashable {

+ 31 - 1
Sources/GRPC/ConnectionPool/PooledChannel.swift

@@ -79,7 +79,36 @@ internal final class PooledChannel: GRPCChannel {
 
     self._scheme = scheme
 
-    let provider = DefaultChannelProvider(
+    let provider: DefaultChannelProvider
+    #if canImport(Network)
+    if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *) {
+      provider = DefaultChannelProvider(
+        connectionTarget: configuration.target,
+        connectionKeepalive: configuration.keepalive,
+        connectionIdleTimeout: configuration.idleTimeout,
+        tlsMode: tlsMode,
+        tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
+        httpTargetWindowSize: configuration.http2.targetWindowSize,
+        httpMaxFrameSize: configuration.http2.maxFrameSize,
+        errorDelegate: configuration.errorDelegate,
+        debugChannelInitializer: configuration.debugChannelInitializer,
+        nwParametersConfigurator: configuration.transportServices.nwParametersConfigurator
+      )
+    } else {
+      provider = DefaultChannelProvider(
+        connectionTarget: configuration.target,
+        connectionKeepalive: configuration.keepalive,
+        connectionIdleTimeout: configuration.idleTimeout,
+        tlsMode: tlsMode,
+        tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
+        httpTargetWindowSize: configuration.http2.targetWindowSize,
+        httpMaxFrameSize: configuration.http2.maxFrameSize,
+        errorDelegate: configuration.errorDelegate,
+        debugChannelInitializer: configuration.debugChannelInitializer
+      )
+    }
+    #else
+    provider = DefaultChannelProvider(
       connectionTarget: configuration.target,
       connectionKeepalive: configuration.keepalive,
       connectionIdleTimeout: configuration.idleTimeout,
@@ -90,6 +119,7 @@ internal final class PooledChannel: GRPCChannel {
       errorDelegate: configuration.errorDelegate,
       debugChannelInitializer: configuration.debugChannelInitializer
     )
+    #endif
 
     self._pool = PoolManager.makeInitializedPoolManager(
       using: configuration.eventLoopGroup,

+ 42 - 0
Sources/GRPC/Server.swift

@@ -127,6 +127,20 @@ public final class Server: @unchecked Sendable {
         _ = transportServicesBootstrap.tlsOptions(from: tlsConfiguration)
       }
     }
+
+    if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *),
+      let configurator = configuration.listenerNWParametersConfigurator,
+      let transportServicesBootstrap = bootstrap as? NIOTSListenerBootstrap
+    {
+      _ = transportServicesBootstrap.configureNWParameters(configurator)
+    }
+
+    if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *),
+      let configurator = configuration.childChannelNWParametersConfigurator,
+      let transportServicesBootstrap = bootstrap as? NIOTSListenerBootstrap
+    {
+      _ = transportServicesBootstrap.configureChildNWParameters(configurator)
+    }
     #endif  // canImport(Network)
 
     return
@@ -384,6 +398,34 @@ extension Server {
     /// the need to recalculate this dictionary each time we receive an rpc.
     internal var serviceProvidersByName: [Substring: CallHandlerProvider]
 
+    #if canImport(Network)
+    /// A closure allowing to customise the listener's `NWParameters` used when establishing a connection using `NIOTransportServices`.
+    @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
+    public var listenerNWParametersConfigurator: (@Sendable (NWParameters) -> Void)? {
+      get {
+        self._listenerNWParametersConfigurator as! (@Sendable (NWParameters) -> Void)?
+      }
+      set {
+        self._listenerNWParametersConfigurator = newValue
+      }
+    }
+
+    private var _listenerNWParametersConfigurator: (any Sendable)?
+
+    /// A closure allowing to customise the child channels' `NWParameters` used when establishing connections using `NIOTransportServices`.
+    @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *)
+    public var childChannelNWParametersConfigurator: (@Sendable (NWParameters) -> Void)? {
+      get {
+        self._childChannelNWParametersConfigurator as! (@Sendable (NWParameters) -> Void)?
+      }
+      set {
+        self._childChannelNWParametersConfigurator = newValue
+      }
+    }
+
+    private var _childChannelNWParametersConfigurator: (any Sendable)?
+    #endif
+
     /// CORS configuration for gRPC-Web support.
     public var webCORS = Configuration.CORS()
 

+ 52 - 0
Tests/GRPCTests/ConnectionManagerTests.swift

@@ -22,6 +22,12 @@ import XCTest
 
 @testable import GRPC
 
+#if canImport(Network)
+import NIOConcurrencyHelpers
+import NIOTransportServices
+import Network
+#endif
+
 class ConnectionManagerTests: GRPCTestCase {
   private let loop = EmbeddedEventLoop()
   private let recorder = RecordingConnectivityDelegate()
@@ -1412,6 +1418,52 @@ extension ConnectionManagerTests {
       XCTAssert(error is DoomedChannelError)
     }
   }
+
+  #if canImport(Network)
+  func testDefaultChannelProvider_NWParametersConfigurator() throws {
+    // For this test, we want an actual connection to be established, since otherwise the parameters
+    // configurator won't be run: NIOTS will only apply the parameters on the NWConnection at the
+    // point of activating it.
+
+    // Start a server
+    let serverConfig = Server.Configuration.default(
+      target: .hostAndPort("localhost", 0),
+      eventLoopGroup: NIOTSEventLoopGroup.singleton,
+      serviceProviders: []
+    )
+    let server = try Server.start(configuration: serverConfig).wait()
+    defer {
+      try? server.close().wait()
+    }
+
+    // Create a connection manager, and configure it to increase a counter in its NWParameters
+    // configurator closure.
+    let counter = NIOLockedValueBox(0)
+    let group = NIOTSEventLoopGroup.singleton
+    var configuration = ClientConnection.Configuration.default(
+      target: .socketAddress(server.channel.localAddress!),
+      eventLoopGroup: group
+    )
+    configuration.nwParametersConfigurator = { _ in
+      counter.withLockedValue { $0 += 1 }
+    }
+    let manager = ConnectionManager(
+      configuration: configuration,
+      connectivityDelegate: self.monitor,
+      idleBehavior: .closeWhenIdleTimeout,
+      logger: self.logger
+    )
+    defer {
+      try? manager.shutdown().wait()
+    }
+
+    // Wait for the connection to be established.
+    _ = try manager.getHTTP2Multiplexer().wait()
+
+    // At this point, the configurator must have been called.
+    XCTAssertEqual(1, counter.withLockedValue({ $0 }))
+  }
+  #endif
 }
 
 internal struct Change: Hashable, CustomStringConvertible {

+ 45 - 4
Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift

@@ -24,8 +24,13 @@ import NIOPosix
 import NIOSSL
 import XCTest
 
+#if canImport(Network)
+import Network
+import NIOTransportServices
+#endif
+
 final class GRPCChannelPoolTests: GRPCTestCase {
-  private var group: MultiThreadedEventLoopGroup!
+  private var group: (any EventLoopGroup)!
   private var server: Server?
   private var channel: GRPCChannel?
 
@@ -50,8 +55,26 @@ final class GRPCChannelPoolTests: GRPCTestCase {
     super.tearDown()
   }
 
-  private func configureEventLoopGroup(threads: Int = System.coreCount) {
-    self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
+  private enum TestEventLoopGroupType {
+    case multiThreadedEventLoopGroup
+    case transportServicesEventLoopGroup
+  }
+
+  private func configureEventLoopGroup(
+    threads: Int = System.coreCount,
+    eventLoopGroupType: TestEventLoopGroupType = .multiThreadedEventLoopGroup
+  ) {
+    switch eventLoopGroupType {
+    case .multiThreadedEventLoopGroup:
+      self.group = MultiThreadedEventLoopGroup(numberOfThreads: threads)
+
+    case .transportServicesEventLoopGroup:
+      #if canImport(Network)
+      self.group = NIOTSEventLoopGroup(loopCount: threads)
+      #else
+      fatalError("NIOTS is not available on this platform.")
+      #endif
+    }
   }
 
   private func makeServerBuilder(withTLS: Bool) -> Server.Builder {
@@ -108,9 +131,10 @@ final class GRPCChannelPoolTests: GRPCTestCase {
   private func setUpClientAndServer(
     withTLS tls: Bool,
     threads: Int = System.coreCount,
+    eventLoopGroupType: TestEventLoopGroupType = .multiThreadedEventLoopGroup,
     _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
   ) {
-    self.configureEventLoopGroup(threads: threads)
+    self.configureEventLoopGroup(threads: threads, eventLoopGroupType: eventLoopGroupType)
     self.startServer(withTLS: tls)
     self.startChannel(withTLS: tls) {
       // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't
@@ -618,5 +642,22 @@ final class GRPCChannelPoolTests: GRPCTestCase {
 
     XCTAssertGreaterThan(statsEvents.count, 0)
   }
+
+  #if canImport(Network)
+  func testNWParametersConfigurator() throws {
+    let counter = NIOLockedValueBox(0)
+    self.setUpClientAndServer(withTLS: false, eventLoopGroupType: .transportServicesEventLoopGroup)
+    { configuration in
+      configuration.transportServices.nwParametersConfigurator = { _ in
+        counter.withLockedValue { $0 += 1 }
+      }
+    }
+
+    // Execute an RPC to make sure a channel gets created/activated and the parameters configurator run.
+    try self.doTestUnaryRPCs(count: 1)
+
+    XCTAssertEqual(1, counter.withLockedValue({ $0 }))
+  }
+  #endif  // canImport(Network)
 }
 #endif  // canImport(NIOSSL)

+ 76 - 0
Tests/GRPCTests/ServerTests.swift

@@ -0,0 +1,76 @@
+/*
+ * Copyright 2025, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import EchoImplementation
+import EchoModel
+import GRPC
+import NIOConcurrencyHelpers
+import NIOTransportServices
+import XCTest
+
+#if canImport(Network)
+import Network
+#endif
+
+class ServerTests: GRPCTestCase {
+  #if canImport(Network)
+  func testParametersConfigurators() throws {
+    let listenerCounter = NIOLockedValueBox(0)
+    let childChannelsCounter = NIOLockedValueBox(0)
+    let group = NIOTSEventLoopGroup()
+    defer {
+      try? group.syncShutdownGracefully()
+    }
+
+    var serverConfiguration = Server.Configuration.default(
+      target: .hostAndPort("localhost", 0),
+      eventLoopGroup: group,
+      serviceProviders: []
+    )
+    serverConfiguration.listenerNWParametersConfigurator = { _ in
+      listenerCounter.withLockedValue { $0 += 1 }
+    }
+    serverConfiguration.childChannelNWParametersConfigurator = { _ in
+      childChannelsCounter.withLockedValue { $0 += 1 }
+    }
+
+    let server = try Server.start(configuration: serverConfiguration).wait()
+    defer {
+      try? server.close().wait()
+    }
+
+    // The listener channel should be up and running after starting the server
+    XCTAssertEqual(1, listenerCounter.withLockedValue({ $0 }))
+    // However we don't have any child channels set up as there are no active connections
+    XCTAssertEqual(0, childChannelsCounter.withLockedValue({ $0 }))
+
+    // Start a client and execute a request so that a connection is established.
+    let channel = try GRPCChannelPool.with(
+      target: .hostAndPort("localhost", server.channel.localAddress!.port!),
+      transportSecurity: .plaintext,
+      eventLoopGroup: group
+    )
+    defer {
+      try? channel.close().wait()
+    }
+    let echo = Echo_EchoNIOClient(channel: channel)
+    _ = try echo.get(.with { $0.text = "" }).status.wait()
+
+    // Now the configurator should have run.
+    XCTAssertEqual(1, childChannelsCounter.withLockedValue({ $0 }))
+  }
+  #endif
+}