Browse Source

Add remote peer info (#36)

Motivation:

The core package added remote peer info to the server context. We should
provide that info from connected channels.

Modification:

- Set the remote peer info
- Add tests

Result:

More insight into remote peer
George Barnett 1 year ago
parent
commit
6379b7109a

+ 44 - 3
Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift

@@ -191,6 +191,40 @@ package final class CommonHTTP2ServerTransport<
     }
   }
 
+  private func peerInfo(channel: any Channel) -> String {
+    guard let remote = channel.remoteAddress else {
+      return "<unknown>"
+    }
+
+    switch remote {
+    case .v4(let address):
+      // '!' is safe, v4 always has a port.
+      return "ipv4:\(address.host):\(remote.port!)"
+
+    case .v6(let address):
+      // '!' is safe, v6 always has a port.
+      return "ipv6:[\(address.host)]:\(remote.port!)"
+
+    case .unixDomainSocket:
+      // The pathname will be on the local address.
+      guard let local = channel.localAddress else {
+        // UDS but no local address; this shouldn't ever happen but at least note the transport
+        // as being UDS.
+        return "unix:<unknown>"
+      }
+
+      switch local {
+      case .unixDomainSocket:
+        // '!' is safe, UDS always has a path.
+        return "unix:\(local.pathname!)"
+
+      case .v4, .v6:
+        // Remote address is UDS but local isn't. This shouldn't ever happen.
+        return "unix:<unknown>"
+      }
+    }
+  }
+
   private func handleConnection(
     _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
     multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
@@ -199,6 +233,7 @@ package final class CommonHTTP2ServerTransport<
       _ context: ServerContext
     ) async -> Void
   ) async throws {
+    let peer = self.peerInfo(channel: connection.channel)
     try await connection.executeThenClose { inbound, _ in
       await withDiscardingTaskGroup { group in
         group.addTask {
@@ -213,7 +248,12 @@ package final class CommonHTTP2ServerTransport<
         do {
           for try await (stream, descriptor) in multiplexer.inbound {
             group.addTask {
-              await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
+              await self.handleStream(
+                stream,
+                handler: streamHandler,
+                descriptor: descriptor,
+                peer: peer
+              )
             }
           }
         } catch {
@@ -229,7 +269,8 @@ package final class CommonHTTP2ServerTransport<
       _ stream: RPCStream<Inbound, Outbound>,
       _ context: ServerContext
     ) async -> Void,
-    descriptor: EventLoopFuture<MethodDescriptor>
+    descriptor: EventLoopFuture<MethodDescriptor>,
+    peer: String
   ) async {
     // It's okay to ignore these errors:
     // - If we get an error because the http2Stream failed to close, then there's nothing we can do
@@ -267,7 +308,7 @@ package final class CommonHTTP2ServerTransport<
           )
         )
 
-        let context = ServerContext(descriptor: descriptor, cancellation: handle)
+        let context = ServerContext(descriptor: descriptor, peer: peer, cancellation: handle)
         await streamHandler(rpcStream, context)
       }
     }

+ 6 - 0
Sources/GRPCNIOTransportHTTP2Posix/HTTP2ServerTransport+Posix.swift

@@ -247,6 +247,12 @@ extension ServerBootstrap {
         to: VsockAddress(virtualSocket),
         childChannelInitializer: childChannelInitializer
       )
+    } else if let uds = address.unixDomainSocket {
+      return try await self.bind(
+        unixDomainSocketPath: uds.path,
+        cleanupExistingSocketFile: true,
+        childChannelInitializer: childChannelInitializer
+      )
     } else {
       return try await self.bind(
         to: NIOCore.SocketAddress(address),

+ 16 - 0
Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift

@@ -105,4 +105,20 @@ internal struct ControlClient {
       handler: body
     )
   }
+
+  internal func peerInfo<R>(
+    options: GRPCCore.CallOptions = .defaults,
+    _ body: @Sendable @escaping (
+      _ response: GRPCCore.ClientResponse<String>
+    ) async throws -> R = { try $0.message }
+  ) async throws -> R where R: Sendable {
+    try await self.client.unary(
+      request: ClientRequest(message: ""),
+      descriptor: MethodDescriptor(fullyQualifiedService: "Control", method: "PeerInfo"),
+      serializer: JSONSerializer(),
+      deserializer: JSONDeserializer(),
+      options: options,
+      handler: body
+    )
+  }
 }

+ 15 - 0
Tests/GRPCNIOTransportHTTP2Tests/ControlService.swift

@@ -62,6 +62,17 @@ struct ControlService: RegistrableRPCService {
         )
       }
     )
+    router.registerHandler(
+      forMethod: MethodDescriptor(fullyQualifiedService: "Control", method: "PeerInfo"),
+      deserializer: JSONDeserializer<String>(),
+      serializer: JSONSerializer<String>()
+    ) { request, context in
+      return StreamingServerResponse { response in
+        let info = try await self.peerInfo(context: context)
+        try await response.write(info)
+        return [:]
+      }
+    }
   }
 }
 
@@ -90,6 +101,10 @@ extension ControlService {
     }
   }
 
+  private func peerInfo(context: ServerContext) async throws -> String {
+    return context.peer
+  }
+
   private func handle(
     request: StreamingServerRequest<ControlInput>
   ) async throws -> StreamingServerResponse<ControlOutput> {

+ 36 - 3
Tests/GRPCNIOTransportHTTP2Tests/HTTP2TransportTests.swift

@@ -46,7 +46,8 @@ final class HTTP2TransportTests: XCTestCase {
   }
 
   func forEachTransportPair(
-    _ transport: [Transport] = .supported,
+    _ transport: [Transport] = [.init(server: .posix, client: .posix)],
+    serverAddress: SocketAddress = .ipv4(host: "127.0.0.1", port: 0),
     enableControlService: Bool = true,
     clientCompression: CompressionAlgorithm = .none,
     clientEnabledCompression: CompressionAlgorithmSet = .none,
@@ -57,6 +58,7 @@ final class HTTP2TransportTests: XCTestCase {
       try await withThrowingTaskGroup(of: Void.self) { group in
         let (server, address) = try await self.runServer(
           in: &group,
+          address: serverAddress,
           kind: pair.server,
           enableControlService: enableControlService,
           compression: serverCompression
@@ -134,6 +136,7 @@ final class HTTP2TransportTests: XCTestCase {
 
   private func runServer(
     in group: inout ThrowingTaskGroup<Void, any Error>,
+    address: SocketAddress,
     kind: Transport.Kind,
     enableControlService: Bool,
     compression: CompressionAlgorithmSet
@@ -144,7 +147,7 @@ final class HTTP2TransportTests: XCTestCase {
     case .posix:
       let server = GRPCServer(
         transport: .http2NIOPosix(
-          address: .ipv4(host: "127.0.0.1", port: 0),
+          address: address,
           config: .defaults(transportSecurity: .plaintext) {
             $0.compression.enabledAlgorithms = compression
           }
@@ -163,7 +166,7 @@ final class HTTP2TransportTests: XCTestCase {
       #if canImport(Network)
       let server = GRPCServer(
         transport: .http2NIOTS(
-          address: .ipv4(host: "127.0.0.1", port: 0),
+          address: address,
           config: .defaults(transportSecurity: .plaintext) {
             $0.compression.enabledAlgorithms = compression
           }
@@ -1614,6 +1617,36 @@ final class HTTP2TransportTests: XCTestCase {
       return "respect-my-authority"
     }
   }
+
+  func testPeerInfoIPv4() async throws {
+    try await self.forEachTransportPair(
+      serverAddress: .ipv4(host: "127.0.0.1", port: 0)
+    ) { control, _, _ in
+      let peerInfo = try await control.peerInfo()
+      let matches = peerInfo.matches(of: /ipv4:127.0.0.1:\d+/)
+      XCTAssertNotNil(matches)
+    }
+  }
+
+  func testPeerInfoIPv6() async throws {
+    try await self.forEachTransportPair(
+      serverAddress: .ipv6(host: "::1", port: 0)
+    ) { control, _, _ in
+      let peerInfo = try await control.peerInfo()
+      let matches = peerInfo.matches(of: /ipv6:[::1]:\d+/)
+      XCTAssertNotNil(matches)
+    }
+  }
+
+  func testPeerInfoUDS() async throws {
+    let path = "peer-info-uds"
+    try await self.forEachTransportPair(
+      serverAddress: .unixDomainSocket(path: path)
+    ) { control, _, _ in
+      let peerInfo = try await control.peerInfo()
+      XCTAssertEqual(peerInfo, "unix:peer-info-uds")
+    }
+  }
 }
 
 extension [HTTP2TransportTests.Transport] {