ソースを参照

Add a NIOPosix based client transport (#1909)

Motivation:

We have all the pieces in place for a client transport, we can now
create one.

Modifications:

- Add a NIOPosix based client transport

Result:

Can create a NIOPosix based client transport
George Barnett 1 年間 前
コミット
706023bc9c

+ 0 - 8
Package.swift

@@ -335,13 +335,6 @@ extension Target {
     ]
   )
 
-  static let grpcHTTP2TransportNIOTransportServicesTests: Target = .testTarget(
-    name: "GRPCHTTP2TransportNIOTransportServicesTests",
-    dependencies: [
-      .grpcHTTP2TransportNIOTransportServices
-    ]
-  )
-
   static let grpcCodeGenTests: Target = .testTarget(
     name: "GRPCCodeGenTests",
     dependencies: [
@@ -749,7 +742,6 @@ let package = Package(
     .grpcCodeGenTests,
     .grpcInterceptorsTests,
     .grpcHTTP2CoreTests,
-    .grpcHTTP2TransportNIOTransportServicesTests,
     .grpcProtobufTests,
     .grpcProtobufCodeGenTests,
     .inProcessInteroperabilityTests

+ 4 - 7
Sources/GRPCHTTP2TransportNIOPosix/GRPCHTTP2TransportNIOPosix.swift

@@ -174,11 +174,11 @@ extension HTTP2ServerTransport.Posix {
 extension NIOCore.SocketAddress {
   fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
     if let ipv4 = socketAddress.ipv4 {
-      self = try Self(ipAddress: ipv4.host, port: ipv4.port)
+      self = try Self(ipv4)
     } else if let ipv6 = socketAddress.ipv6 {
-      self = try Self(ipAddress: ipv6.host, port: ipv6.port)
+      self = try Self(ipv6)
     } else if let unixDomainSocket = socketAddress.unixDomainSocket {
-      self = try Self(unixDomainSocketPath: unixDomainSocket.path)
+      self = try Self(unixDomainSocket)
     } else {
       throw RPCError(
         code: .internalError,
@@ -197,10 +197,7 @@ extension ServerBootstrap {
   ) async throws -> NIOAsyncChannel<Output, Never> {
     if let virtualSocket = address.virtualSocket {
       return try await self.bind(
-        to: VsockAddress(
-          cid: VsockAddress.ContextID(rawValue: virtualSocket.contextID.rawValue),
-          port: VsockAddress.Port(rawValue: virtualSocket.port.rawValue)
-        ),
+        to: VsockAddress(virtualSocket),
         childChannelInitializer: childChannelInitializer
       )
     } else {

+ 204 - 0
Sources/GRPCHTTP2TransportNIOPosix/HTTP2ClientTransport+Posix.swift

@@ -0,0 +1,204 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+@_spi(Package) import GRPCHTTP2Core
+import NIOCore
+import NIOPosix
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension HTTP2ClientTransport {
+  /// A `ClientTransport` using HTTP/2 built on top of `NIOPosix`.
+  ///
+  /// This transport builds on top of SwiftNIO's Posix networking layer and is suitable for use
+  /// on Linux and Darwin based platform (macOS, iOS, etc.) However, it's *strongly* recommended
+  /// that if you are targeting Darwin platforms then you should use the `NIOTS` variant of
+  /// the `HTTP2ClientTransport`.
+  ///
+  /// To use this transport you need to provide a 'target' to connect to which will be resolved
+  /// by an appropriate resolver from the resolver registry. By default the resolver registry can
+  /// resolve DNS targets, IPv4 and IPv6 targets, Unix domain socket targets, and Virtual Socket
+  /// targets. If you use a custom target you must also provide an appropriately configured
+  /// registry.
+  ///
+  /// You can control various aspects of connection creation, management and RPC behavior via the
+  /// ``Config``. Load balancing policies and other RPC specific behavior can be configured via
+  /// the ``ServiceConfig`` (if it isn't provided by a resolver).
+  ///
+  /// Beyond creating the transport you don't need to interact with it directly, instead, pass it
+  /// to a `GRPCClient`:
+  ///
+  /// ```swift
+  /// try await withThrowingDiscardingTaskGroup {
+  ///   let transport = try HTTP2ClientTransport.Posix(target: .dns(host: "example.com"))
+  ///   let client = GRPCClient(transport: transport)
+  ///   group.addTask {
+  ///     try await client.run()
+  ///   }
+  ///
+  ///   // ...
+  /// }
+  /// ```
+  public struct Posix: ClientTransport {
+    private let channel: GRPCChannel
+
+    /// Creates a new Posix based HTTP/2 client transport.
+    ///
+    /// - Parameters:
+    ///   - target: A target to resolve.
+    ///   - resolverRegistry: A registry of resolver factories.
+    ///   - config: Configuration for the transport.
+    ///   - serviceConfig: Service config controlling how the transport should establish and
+    ///       load-balance connections.
+    ///   - eventLoopGroup: The underlying NIO `EventLoopGroup` to run connections on. This must
+    ///       be a `MultiThreadedEventLoopGroup` or an `EventLoop` from
+    ///       a `MultiThreadedEventLoopGroup`.
+    /// - Throws: When no suitable resolver could be found for the `target`.
+    public init(
+      target: any ResolvableTarget,
+      resolverRegistry: NameResolverRegistry = .defaults,
+      config: Config = .defaults,
+      serviceConfig: ServiceConfig = ServiceConfig(),
+      eventLoopGroup: any EventLoopGroup = .singletonMultiThreadedEventLoopGroup
+    ) throws {
+      guard let resolver = resolverRegistry.makeResolver(for: target) else {
+        throw RuntimeError(
+          code: .transportError,
+          message: """
+            No suitable resolvers to resolve '\(target)'. You must make sure that the resolver \
+            registry has a suitable name resolver factory registered for the given target.
+            """
+        )
+      }
+
+      // Configure a connector.
+      self.channel = GRPCChannel(
+        resolver: resolver,
+        connector: Connector(eventLoopGroup: eventLoopGroup, config: config),
+        config: GRPCChannel.Config(posix: config),
+        defaultServiceConfig: serviceConfig
+      )
+    }
+
+    public var retryThrottle: RetryThrottle? {
+      self.channel.retryThrottle
+    }
+
+    public func connect() async {
+      await self.channel.connect()
+    }
+
+    public func configuration(forMethod descriptor: MethodDescriptor) -> MethodConfig? {
+      self.channel.configuration(forMethod: descriptor)
+    }
+
+    public func close() {
+      self.channel.close()
+    }
+
+    public func withStream<T>(
+      descriptor: MethodDescriptor,
+      options: CallOptions,
+      _ closure: (RPCStream<Inbound, Outbound>) async throws -> T
+    ) async throws -> T {
+      try await self.channel.withStream(descriptor: descriptor, options: options, closure)
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension HTTP2ClientTransport.Posix {
+  struct Connector: HTTP2Connector {
+    private let config: HTTP2ClientTransport.Posix.Config
+    private let eventLoopGroup: any EventLoopGroup
+
+    init(eventLoopGroup: any EventLoopGroup, config: HTTP2ClientTransport.Posix.Config) {
+      self.eventLoopGroup = eventLoopGroup
+      self.config = config
+    }
+
+    func establishConnection(
+      to address: GRPCHTTP2Core.SocketAddress
+    ) async throws -> HTTP2Connection {
+      let (channel, multiplexer) = try await ClientBootstrap(
+        group: self.eventLoopGroup
+      ).connect(to: address) { channel in
+        channel.eventLoop.makeCompletedFuture {
+          try channel.pipeline.syncOperations.configureGRPCClientPipeline(
+            channel: channel,
+            config: GRPCChannel.Config(posix: self.config)
+          )
+        }
+      }
+
+      return HTTP2Connection(channel: channel, multiplexer: multiplexer, isPlaintext: false)
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension HTTP2ClientTransport.Posix {
+  public struct Config: Sendable {
+    /// Configuration for HTTP/2 connections.
+    public var http2: HTTP2ClientTransport.Config.HTTP2
+
+    /// Configuration for backoff used when establishing a connection.
+    public var backoff: HTTP2ClientTransport.Config.Backoff
+
+    /// Configuration for connection management.
+    public var connection: HTTP2ClientTransport.Config.Connection
+
+    /// Compression configuration.
+    public var compression: HTTP2ClientTransport.Config.Compression
+
+    /// Creates a new connection configuration.
+    ///
+    /// See also ``defaults``.
+    public init(
+      http2: HTTP2ClientTransport.Config.HTTP2,
+      backoff: HTTP2ClientTransport.Config.Backoff,
+      connection: HTTP2ClientTransport.Config.Connection,
+      compression: HTTP2ClientTransport.Config.Compression
+    ) {
+      self.http2 = http2
+      self.connection = connection
+      self.backoff = backoff
+      self.compression = compression
+    }
+
+    /// Default values.
+    public static var defaults: Self {
+      Self(
+        http2: .defaults,
+        backoff: .defaults,
+        connection: .defaults,
+        compression: .defaults
+      )
+    }
+  }
+}
+
+@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
+extension GRPCChannel.Config {
+  init(posix: HTTP2ClientTransport.Posix.Config) {
+    self.init(
+      http2: posix.http2,
+      backoff: posix.backoff,
+      connection: posix.connection,
+      compression: posix.compression
+    )
+  }
+}

+ 69 - 0
Sources/GRPCHTTP2TransportNIOPosix/NIOClientBootstrap+SocketAddress.swift

@@ -0,0 +1,69 @@
+/*
+ * Copyright 2024, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import GRPCCore
+import GRPCHTTP2Core
+import NIOCore
+import NIOPosix
+
+extension ClientBootstrap {
+  @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+  func connect<Result>(
+    to address: GRPCHTTP2Core.SocketAddress,
+    _ configure: @Sendable @escaping (Channel) -> EventLoopFuture<Result>
+  ) async throws -> Result {
+    if let ipv4 = address.ipv4 {
+      return try await self.connect(to: NIOCore.SocketAddress(ipv4), channelInitializer: configure)
+    } else if let ipv6 = address.ipv6 {
+      return try await self.connect(to: NIOCore.SocketAddress(ipv6), channelInitializer: configure)
+    } else if let uds = address.unixDomainSocket {
+      return try await self.connect(to: NIOCore.SocketAddress(uds), channelInitializer: configure)
+    } else if let vsock = address.virtualSocket {
+      return try await self.connect(to: VsockAddress(vsock), channelInitializer: configure)
+    } else {
+      throw RuntimeError(
+        code: .transportError,
+        message: """
+          Unhandled socket address '\(address)', this is a gRPC Swift bug. Please file an issue \
+          against the project.
+          """
+      )
+    }
+  }
+}
+
+extension NIOCore.SocketAddress {
+  init(_ address: GRPCHTTP2Core.SocketAddress.IPv4) throws {
+    try self.init(ipAddress: address.host, port: address.port)
+  }
+
+  init(_ address: GRPCHTTP2Core.SocketAddress.IPv6) throws {
+    try self.init(ipAddress: address.host, port: address.port)
+  }
+
+  init(_ address: GRPCHTTP2Core.SocketAddress.UnixDomainSocket) throws {
+    try self.init(unixDomainSocketPath: address.path)
+  }
+}
+
+extension NIOPosix.VsockAddress {
+  init(_ address: GRPCHTTP2Core.SocketAddress.VirtualSocket) {
+    self.init(
+      cid: ContextID(rawValue: address.contextID.rawValue),
+      port: Port(rawValue: address.port.rawValue)
+    )
+  }
+}

+ 0 - 17
Tests/GRPCHTTP2TransportNIOPosixTests/GRPCHTTP2TransportNIOPosixTests.swift

@@ -1,17 +0,0 @@
-/*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Add tests to this package.

+ 0 - 17
Tests/GRPCHTTP2TransportNIOTransportServicesTests/GRPCHTTP2TransportNIOTransportServicesTests.swift

@@ -1,17 +0,0 @@
-/*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Add tests to this package.