2
0
Эх сурвалжийг харах

Add remote and local address metadata to loggers (#1195)

Motivation:

In some cases having the local and remote address for a connection in
log messages can make debugging easier.

Modifications:

- Add the local and remote address to logs in the idle handler, error
  handler and client transport
- Adopt `GRPCLogger` a little more widely to avoid having to explicitly
  set the `source`

Result:

More logging metadata
George Barnett 4 жил өмнө
parent
commit
9013e4e1ad

+ 9 - 9
.travis.yml

@@ -37,34 +37,34 @@ jobs:
       script: ./.travis-script.sh -t -a # tests with tsan, run allocation tests
       env:
         - SWIFT_VERSION=5.4
-        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests=513000
-        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request=225000
+        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests=515000
+        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request=227000
         - MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests=112000
         - MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request=67000
         - MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request=63000
-        - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong=214000
+        - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong=216000
     - <<: *tests
       name: "Unit Tests: Ubuntu 18.04 (Swift 5.3)"
       script: ./.travis-script.sh -t -a # test with tsan, run allocation tests
       env:
         - SWIFT_VERSION=5.3.3
-        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests=513000
-        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request=225000
+        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests=515000
+        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request=227000
         - MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests=112000
         - MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request=67000
         - MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request=63000
-        - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong=214000
+        - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong=216000
     - <<: *tests
       name: "Unit Tests: Ubuntu 18.04 (Swift 5.2)"
       script: ./.travis-script.sh -a # run allocation tests
       env:
         - SWIFT_VERSION=5.2.5
-        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests=524000
-        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request=227000
+        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests=526000
+        - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request=229000
         - MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests=112000
         - MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request=67000
         - MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request=63000
-        - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong=215000
+        - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong=217000
     - <<: *tests
       name: "Unit Tests: Xcode 12.2"
       os: osx

+ 9 - 4
Sources/GRPC/DelegatingErrorHandler.swift

@@ -21,18 +21,23 @@ import NIOSSL
 /// A channel handler which allows caught errors to be passed to a `ClientErrorDelegate`. This
 /// handler is intended to be used in the client channel pipeline after the HTTP/2 stream
 /// multiplexer to handle errors which occur on the underlying connection.
-class DelegatingErrorHandler: ChannelInboundHandler {
+internal final class DelegatingErrorHandler: ChannelInboundHandler {
   typealias InboundIn = Any
 
-  private let logger: Logger
+  private var logger: Logger
   private let delegate: ClientErrorDelegate?
 
-  init(logger: Logger, delegate: ClientErrorDelegate?) {
+  internal init(logger: Logger, delegate: ClientErrorDelegate?) {
     self.logger = logger
     self.delegate = delegate
   }
 
-  func errorCaught(context: ChannelHandlerContext, error: Error) {
+  internal func channelActive(context: ChannelHandlerContext) {
+    self.logger.addIPAddressMetadata(local: context.localAddress, remote: context.remoteAddress)
+    context.fireChannelActive()
+  }
+
+  internal func errorCaught(context: ChannelHandlerContext, error: Error) {
     // We can ignore unclean shutdown since gRPC is self-terminated and therefore not prone to
     // truncation attacks.
     //

+ 2 - 2
Sources/GRPC/GRPCClientChannelHandler.swift

@@ -284,7 +284,7 @@ public enum GRPCCallType {
 /// }
 /// ```
 internal final class GRPCClientChannelHandler {
-  private let logger: Logger
+  private let logger: GRPCLogger
   private var stateMachine: GRPCClientStateMachine
 
   /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
@@ -292,7 +292,7 @@ internal final class GRPCClientChannelHandler {
   /// - Parameters:
   ///   - callType: Type of RPC call being made.
   ///   - logger: Logger.
-  internal init(callType: GRPCCallType, logger: Logger) {
+  internal init(callType: GRPCCallType, logger: GRPCLogger) {
     self.logger = logger
     switch callType {
     case .unary:

+ 5 - 5
Sources/GRPC/GRPCIdleHandler.swift

@@ -36,9 +36,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
   /// The mode we're operating in.
   private let mode: Mode
 
-  /// A logger.
-  private let logger: Logger
-
   private var context: ChannelHandlerContext?
 
   /// The mode of operation: the client tracks additional connection state in the connection
@@ -78,7 +75,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
       maximumPingsWithoutData: configuration.maximumPingsWithoutData,
       minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData
     )
-    self.logger = logger
   }
 
   init(
@@ -99,7 +95,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
       minimumReceivedPingIntervalWithoutData: configuration.minimumReceivedPingIntervalWithoutData,
       maximumPingStrikes: configuration.maximumPingStrikes
     )
-    self.logger = logger
   }
 
   private func sendGoAway(lastStreamID streamID: HTTP2StreamID) {
@@ -245,6 +240,11 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
   }
 
   func channelActive(context: ChannelHandlerContext) {
+    self.stateMachine.logger.addIPAddressMetadata(
+      local: context.localAddress,
+      remote: context.remoteAddress
+    )
+
     // No state machine action here.
     switch self.mode {
     case let .client(connectionManager, multiplexer):

+ 1 - 1
Sources/GRPC/GRPCIdleHandlerStateMachine.swift

@@ -240,7 +240,7 @@ struct GRPCIdleHandlerStateMachine {
   private var state: State
 
   /// A logger.
-  private let logger: Logger
+  internal var logger: Logger
 
   /// Create a new state machine.
   init(role: Role, logger: Logger) {

+ 42 - 0
Sources/GRPC/GRPCLogger.swift

@@ -14,10 +14,12 @@
  * limitations under the License.
  */
 import Logging
+import NIO
 
 /// Wraps `Logger` to always provide the source as "GRPC".
 ///
 /// See https://github.com/apple/swift-log/issues/145 for rationale.
+@usableFromInline
 internal struct GRPCLogger {
   private var logger: Logger
 
@@ -71,6 +73,46 @@ internal struct GRPCLogger {
       line: line
     )
   }
+
+  internal func notice(
+    _ message: @autoclosure () -> Logger.Message,
+    metadata: @autoclosure () -> Logger.Metadata? = nil,
+    file: String = #file,
+    function: String = #function,
+    line: UInt = #line
+  ) {
+    self.logger.notice(
+      message(),
+      metadata: metadata(),
+      source: "GRPC",
+      file: file,
+      function: function,
+      line: line
+    )
+  }
+
+  internal func warning(
+    _ message: @autoclosure () -> Logger.Message,
+    metadata: @autoclosure () -> Logger.Metadata? = nil,
+    file: String = #file,
+    function: String = #function,
+    line: UInt = #line
+  ) {
+    self.logger.warning(
+      message(),
+      metadata: metadata(),
+      source: "GRPC",
+      file: file,
+      function: function,
+      line: line
+    )
+  }
+}
+
+extension GRPCLogger {
+  internal mutating func addIPAddressMetadata(local: SocketAddress?, remote: SocketAddress?) {
+    self.logger.addIPAddressMetadata(local: local, remote: remote)
+  }
 }
 
 extension Logger {

+ 1 - 1
Sources/GRPC/Interceptor/ClientInterceptorContext.swift

@@ -36,7 +36,7 @@ public struct ClientInterceptorContext<Request, Response> {
 
   /// A logger.
   public var logger: Logger {
-    return self._pipeline.logger
+    return self._pipeline.logger.unwrapped
   }
 
   /// The type of the RPC, e.g. "unary".

+ 6 - 5
Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

@@ -60,9 +60,8 @@ import NIOHTTP2
 @usableFromInline
 internal final class ClientInterceptorPipeline<Request, Response> {
   /// A logger.
-  internal var logger: Logger {
-    return self.details.options.logger
-  }
+  @usableFromInline
+  internal var logger: GRPCLogger
 
   /// The `EventLoop` this RPC is being executed on.
   @usableFromInline
@@ -135,6 +134,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   internal init(
     eventLoop: EventLoop,
     details: CallDetails,
+    logger: GRPCLogger,
     interceptors: [ClientInterceptor<Request, Response>],
     errorDelegate: ClientErrorDelegate?,
     onError: @escaping (Error) -> Void,
@@ -144,6 +144,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ) {
     self.eventLoop = eventLoop
     self.details = details
+    self.logger = logger
 
     self._errorDelegate = errorDelegate
     self._onError = onError
@@ -284,13 +285,13 @@ internal final class ClientInterceptorPipeline<Request, Response> {
       unwrappedError = errorContext.error
       self._errorDelegate?.didCatchError(
         errorContext.error,
-        logger: self.logger,
+        logger: self.logger.unwrapped,
         file: errorContext.file,
         line: errorContext.line
       )
     } else {
       unwrappedError = error
-      self._errorDelegate?.didCatchErrorWithoutContext(error, logger: self.logger)
+      self._errorDelegate?.didCatchErrorWithoutContext(error, logger: self.logger.unwrapped)
     }
 
     // Emit the unwrapped error.

+ 14 - 16
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -68,9 +68,7 @@ internal final class ClientTransport<Request, Response> {
   internal let callDetails: CallDetails
 
   /// A logger.
-  internal var logger: Logger {
-    return self.callDetails.options.logger
-  }
+  internal var logger: GRPCLogger
 
   /// Is the call streaming requests?
   private var isStreamingRequests: Bool {
@@ -116,11 +114,14 @@ internal final class ClientTransport<Request, Response> {
   ) {
     self.callEventLoop = eventLoop
     self.callDetails = details
+    let logger = GRPCLogger(wrapping: details.options.logger)
+    self.logger = logger
     self.serializer = serializer
     self.deserializer = deserializer
     self._pipeline = ClientInterceptorPipeline(
       eventLoop: eventLoop,
       details: details,
+      logger: logger,
       interceptors: interceptors,
       errorDelegate: errorDelegate,
       onError: onError,
@@ -315,9 +316,12 @@ extension ClientTransport {
   /// On-loop implementation of `transportActivated(channel:)`.
   private func _transportActivated(channel: Channel) {
     self.callEventLoop.assertInEventLoop()
-    self.logger.debug("activated stream channel", source: "GRPC")
 
     if self.state.activate() {
+      self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
+
+      self._pipeline?.logger = self.logger
+      self.logger.debug("activated stream channel")
       self.channel = channel
       self.unbuffer()
     } else {
@@ -831,7 +835,7 @@ extension ClientTransport {
     self.logger.debug("buffering request part", metadata: [
       "request_part": "\(part.name)",
       "call_state": self.stateForLogging,
-    ], source: "GRPC")
+    ])
     self.writeBuffer.append(.init(request: part, promise: promise))
   }
 
@@ -848,7 +852,7 @@ extension ClientTransport {
 
     self.logger.debug("unbuffering request parts", metadata: [
       "request_parts": "\(self.writeBuffer.count)",
-    ], source: "GRPC")
+    ])
 
     // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
     // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
@@ -858,7 +862,7 @@ extension ClientTransport {
       while let write = self.writeBuffer.popFirst() {
         self.logger.debug("unbuffering request part", metadata: [
           "request_part": "\(write.request.name)",
-        ], source: "GRPC")
+        ])
 
         if !shouldFlush {
           shouldFlush = self.shouldFlush(after: write.request)
@@ -875,13 +879,9 @@ extension ClientTransport {
     }
 
     if self.writeBuffer.isEmpty {
-      self.logger.debug("request buffer drained", source: "GRPC")
+      self.logger.debug("request buffer drained")
     } else {
-      self.logger.notice(
-        "unbuffering aborted",
-        metadata: ["call_state": self.stateForLogging],
-        source: "GRPC"
-      )
+      self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging])
     }
 
     // We're unbuffered. What now?
@@ -896,9 +896,7 @@ extension ClientTransport {
   /// Fails any promises that come with buffered writes with `error`.
   /// - Parameter error: The `Error` to fail promises with.
   private func failBufferedWrites(with error: Error) {
-    self.logger.debug("failing buffered writes", metadata: [
-      "call_state": self.stateForLogging,
-    ], source: "GRPC")
+    self.logger.debug("failing buffered writes", metadata: ["call_state": self.stateForLogging])
 
     while let write = self.writeBuffer.popFirst() {
       write.promise?.fail(error)

+ 12 - 1
Sources/GRPC/Logger.swift

@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 import Logging
+import NIO
 
 /// Keys for `Logger` metadata.
 enum MetadataKey {
@@ -21,7 +22,6 @@ enum MetadataKey {
   static let connectionID = "grpc_connection_id"
 
   static let eventLoop = "event_loop"
-  static let remoteAddress = "remote_address"
 
   static let h2StreamID = "h2_stream_id"
   static let h2ActiveStreams = "h2_active_streams"
@@ -32,3 +32,14 @@ enum MetadataKey {
 
   static let error = "error"
 }
+
+extension Logger {
+  internal mutating func addIPAddressMetadata(local: SocketAddress?, remote: SocketAddress?) {
+    if let local = local?.ipAddress {
+      self[metadataKey: "grpc.conn.addr_local"] = "\(local)"
+    }
+    if let remote = remote?.ipAddress {
+      self[metadataKey: "grpc.conn.addr_remote"] = "\(remote)"
+    }
+  }
+}

+ 4 - 2
Sources/GRPC/Server.swift

@@ -107,8 +107,10 @@ public final class Server {
       .childChannelInitializer { channel in
         var configuration = configuration
         configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
-        configuration.logger[metadataKey: MetadataKey.remoteAddress] = channel.remoteAddress
-          .map { "\($0)" } ?? "n/a"
+        configuration.logger.addIPAddressMetadata(
+          local: channel.localAddress,
+          remote: channel.remoteAddress
+        )
 
         do {
           let sync = channel.pipeline.syncOperations

+ 1 - 1
Sources/GRPC/_EmbeddedThroughput.swift

@@ -28,7 +28,7 @@ extension EmbeddedChannel {
     responseType: Response.Type = Response.self
   ) -> EventLoopFuture<Void> {
     return self.pipeline.addHandlers([
-      GRPCClientChannelHandler(callType: callType, logger: logger),
+      GRPCClientChannelHandler(callType: callType, logger: GRPCLogger(wrapping: logger)),
       GRPCClientCodecHandler(
         serializer: ProtobufSerializer<Request>(),
         deserializer: ProtobufDeserializer<Response>()

+ 3 - 1
Tests/GRPCTests/ClientInterceptorPipelineTests.swift

@@ -38,9 +38,11 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     onRequestPart: @escaping (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void,
     onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
   ) -> ClientInterceptorPipeline<Request, Response> {
+    let callDetails = details ?? self.makeCallDetails()
     return ClientInterceptorPipeline(
       eventLoop: self.embeddedEventLoop,
-      details: details ?? self.makeCallDetails(),
+      details: callDetails,
+      logger: callDetails.options.logger.wrapped,
       interceptors: interceptors,
       errorDelegate: errorDelegate,
       onError: onError,

+ 1 - 1
Tests/GRPCTests/GRPCClientChannelHandlerTests.swift

@@ -35,7 +35,7 @@ class GRPCClientChannelHandlerTests: GRPCTestCase {
   func doTestDataFrameWithEndStream(dataContainsMessage: Bool) throws {
     let handler = GRPCClientChannelHandler(
       callType: .unary,
-      logger: self.clientLogger
+      logger: GRPCLogger(wrapping: self.clientLogger)
     )
 
     let channel = EmbeddedChannel(handler: handler)

+ 4 - 1
Tests/GRPCTests/GRPCStatusCodeTests.swift

@@ -29,7 +29,10 @@ class GRPCStatusCodeTests: GRPCTestCase {
   override func setUp() {
     super.setUp()
 
-    let handler = GRPCClientChannelHandler(callType: .unary, logger: self.logger)
+    let handler = GRPCClientChannelHandler(
+      callType: .unary,
+      logger: GRPCLogger(wrapping: self.logger)
+    )
     self.channel = EmbeddedChannel(handler: handler)
   }