2
0
Эх сурвалжийг харах

Drop user context when closing client interceptor pipeline (#1279)

Motivation:

Interceptor pipelines hold arrays of interceptor contexts. Each context
maintains a reference back to the pipeline. However the client
interceptor pipeline never broke this reference cycle when closing the
pipeline which means that using client interceptors introduces a memory
leak.

Modifications:

- Explicitly remove all user contexts when the client interceptor pipeline is
  torn down (the server pipeline already does this so is unaffected)
- Add allocation tests so we can see the overhead of using interceptors
  (and, more importantly, whether allocations are freed)

Result:

Using client interceptors no longer leaks.
George Barnett 4 жил өмнө
parent
commit
5ddbb6174a

+ 6 - 0
.github/workflows/ci.yaml

@@ -57,6 +57,8 @@ jobs:
               MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
               MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 204000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 211000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 211000
           - image: swift:5.3-focal
             env:
               MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 504000
@@ -65,6 +67,8 @@ jobs:
               MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
               MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 205000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 212000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 212000
           - image: swift:5.2-bionic
             env:
               MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 515000
@@ -73,6 +77,8 @@ jobs:
               MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
               MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
               MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 206000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 213000
+              MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 213000
     name: Performance Tests on ${{ matrix.image }}
     runs-on: ubuntu-latest
     container:

+ 88 - 2
Performance/allocations/tests/shared/Common.swift

@@ -19,10 +19,11 @@ import NIO
 func makeEchoServer(
   group: EventLoopGroup,
   host: String = "127.0.0.1",
-  port: Int = 0
+  port: Int = 0,
+  interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil
 ) -> EventLoopFuture<Server> {
   return Server.insecure(group: group)
-    .withServiceProviders([MinimalEchoProvider()])
+    .withServiceProviders([MinimalEchoProvider(interceptors: interceptors)])
     .bind(host: host, port: port)
 }
 
@@ -34,3 +35,88 @@ func makeClientConnection(
   return ClientConnection.insecure(group: group)
     .connect(host: host, port: port)
 }
+
+func makeEchoClientInterceptors(count: Int) -> Echo_EchoClientInterceptorFactoryProtocol? {
+  let factory = EchoClientInterceptors()
+  for _ in 0 ..< count {
+    factory.register { NoOpEchoClientInterceptor() }
+  }
+  return factory
+}
+
+func makeEchoServerInterceptors(count: Int) -> Echo_EchoServerInterceptorFactoryProtocol? {
+  let factory = EchoServerInterceptors()
+  for _ in 0 ..< count {
+    factory.register { NoOpEchoServerInterceptor() }
+  }
+  return factory
+}
+
+final class EchoClientInterceptors: Echo_EchoClientInterceptorFactoryProtocol {
+  internal typealias Factory = () -> ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>
+  private var factories: [Factory] = []
+
+  internal init(_ factories: Factory...) {
+    self.factories = factories
+  }
+
+  internal func register(_ factory: @escaping Factory) {
+    self.factories.append(factory)
+  }
+
+  private func makeInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.factories.map { $0() }
+  }
+
+  func makeGetInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+
+  func makeExpandInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+
+  func makeCollectInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+
+  func makeUpdateInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+}
+
+internal final class EchoServerInterceptors: Echo_EchoServerInterceptorFactoryProtocol {
+  internal typealias Factory = () -> ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
+  private var factories: [Factory] = []
+
+  internal init(_ factories: Factory...) {
+    self.factories = factories
+  }
+
+  internal func register(_ factory: @escaping Factory) {
+    self.factories.append(factory)
+  }
+
+  private func makeInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.factories.map { $0() }
+  }
+
+  func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+
+  func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+
+  func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+
+  func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
+    return self.makeInterceptors()
+  }
+}
+
+final class NoOpEchoClientInterceptor: ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse> {}
+final class NoOpEchoServerInterceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {}

+ 29 - 3
Performance/allocations/tests/test_unary_1k_ping_pong.swift

@@ -23,15 +23,31 @@ class UnaryPingPongBenchmark: Benchmark {
   private var group: EventLoopGroup!
   private var server: Server!
   private var client: ClientConnection!
+  private let clientInterceptors: Echo_EchoClientInterceptorFactoryProtocol?
+  private let serverInterceptors: Echo_EchoServerInterceptorFactoryProtocol?
 
-  init(rpcs: Int, request: String) {
+  init(
+    rpcs: Int,
+    request: String,
+    clientInterceptors: Int = 0,
+    serverInterceptors: Int = 0
+  ) {
     self.rpcs = rpcs
     self.request = .with { $0.text = request }
+    self.clientInterceptors = clientInterceptors > 0
+      ? makeEchoClientInterceptors(count: clientInterceptors)
+      : nil
+    self.serverInterceptors = serverInterceptors > 0
+      ? makeEchoServerInterceptors(count: serverInterceptors)
+      : nil
   }
 
   func setUp() throws {
     self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
-    self.server = try makeEchoServer(group: self.group).wait()
+    self.server = try makeEchoServer(
+      group: self.group,
+      interceptors: self.serverInterceptors
+    ).wait()
     self.client = makeClientConnection(
       group: self.group,
       port: self.server.channel.localAddress!.port!
@@ -45,7 +61,7 @@ class UnaryPingPongBenchmark: Benchmark {
   }
 
   func run() throws -> Int {
-    let echo = Echo_EchoClient(channel: self.client)
+    let echo = Echo_EchoClient(channel: self.client, interceptors: self.clientInterceptors)
     var responseLength = 0
 
     for _ in 0 ..< self.rpcs {
@@ -63,4 +79,14 @@ func run(identifier: String) {
     let benchmark = UnaryPingPongBenchmark(rpcs: 1000, request: "")
     return try! benchmark.runOnce()
   }
+
+  measure(identifier: identifier + "_interceptors_server") {
+    let benchmark = UnaryPingPongBenchmark(rpcs: 1000, request: "", serverInterceptors: 5)
+    return try! benchmark.runOnce()
+  }
+
+  measure(identifier: identifier + "_interceptors_client") {
+    let benchmark = UnaryPingPongBenchmark(rpcs: 1000, request: "", clientInterceptors: 5)
+    return try! benchmark.runOnce()
+  }
 }

+ 3 - 0
Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

@@ -443,6 +443,9 @@ extension ClientInterceptorPipeline {
     self._scheduledClose?.cancel()
     self._scheduledClose = nil
 
+    // Drop the contexts since they reference us.
+    self._userContexts.removeAll()
+
     // Cancel the transport.
     self._onCancel?(nil)
 

+ 5 - 1
Sources/GRPCPerformanceTests/Benchmarks/MinimalEchoProvider.swift

@@ -19,7 +19,11 @@ import NIO
 /// The echo provider that comes with the example does some string processing, we'll avoid some of
 /// that here so we're looking at the right things.
 public class MinimalEchoProvider: Echo_EchoProvider {
-  public let interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil
+  public let interceptors: Echo_EchoServerInterceptorFactoryProtocol?
+
+  public init(interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil) {
+    self.interceptors = interceptors
+  }
 
   public func get(
     request: Echo_EchoRequest,