Sfoglia il codice sorgente

Server interceptors retain cycle (#1559)

Motivation:

There's a code path through the ELF-based server handlers where the
interceptor pipeline does not get `nil`'d out to break the reference
cycle between the handler and the interceptors leading to a leak. This
can happen if the RPC ends unexpectedtly.

Modifications:

- Break the retain cycle on the next event-loop tick; this gives any
  clean up code a chance to run first.
- Break the retain cycle from the interceptors when they send end.

Results:

Fewer leaks.
George Barnett 2 anni fa
parent
commit
3f7c71413a

+ 3 - 0
Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift

@@ -137,6 +137,9 @@ public final class BidirectionalStreamingServerHandler<
     case let .creatingObserver(context),
          let .observing(_, context):
       context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
+      self.context.eventLoop.execute {
+        self.interceptors = nil
+      }
 
     case .completed:
       self.interceptors = nil

+ 3 - 0
Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift

@@ -138,6 +138,9 @@ public final class ClientStreamingServerHandler<
     case let .creatingObserver(context),
          let .observing(_, context):
       context.responsePromise.fail(GRPCStatus(code: .unavailable, message: nil))
+      self.context.eventLoop.execute {
+        self.interceptors = nil
+      }
 
     case .completed:
       self.interceptors = nil

+ 3 - 0
Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift

@@ -134,6 +134,9 @@ public final class ServerStreamingServerHandler<
     case let .createdContext(context),
          let .invokedFunction(context):
       context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
+      self.context.eventLoop.execute {
+        self.interceptors = nil
+      }
 
     case .completed:
       self.interceptors = nil

+ 3 - 0
Sources/GRPC/CallHandlers/UnaryServerHandler.swift

@@ -132,6 +132,9 @@ public final class UnaryServerHandler<
     case let .createdContext(context),
          let .invokedFunction(context):
       context.responsePromise.fail(GRPCStatus(code: .unavailable, message: nil))
+      self.context.eventLoop.execute {
+        self.interceptors = nil
+      }
 
     case .completed:
       self.interceptors = nil

+ 11 - 4
Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift

@@ -49,11 +49,14 @@ internal final class ServerInterceptorPipeline<Request, Response> {
 
   /// Called when a response part has traversed the interceptor pipeline.
   @usableFromInline
-  internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
+  internal var _onResponsePart: Optional<(
+    GRPCServerResponsePart<Response>,
+    EventLoopPromise<Void>?
+  ) -> Void>
 
   /// Called when a request part has traversed the interceptor pipeline.
   @usableFromInline
-  internal let _onRequestPart: (GRPCServerRequestPart<Request>) -> Void
+  internal var _onRequestPart: Optional<(GRPCServerRequestPart<Request>) -> Void>
 
   /// The index before the first user interceptor context index. (always -1).
   @usableFromInline
@@ -185,7 +188,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
       )
 
     case self._tailIndex:
-      self._onRequestPart(part)
+      self._onRequestPart?(part)
 
     default:
       self._userContexts[index].invokeReceive(part)
@@ -245,10 +248,11 @@ internal final class ServerInterceptorPipeline<Request, Response> {
   ) {
     switch index {
     case self._headIndex:
+      let onResponsePart = self._onResponsePart
       if part.isEnd {
         self.close()
       }
-      self._onResponsePart(part, promise)
+      onResponsePart?(part, promise)
 
     case self._tailIndex:
       // The next outbound index must exist: it will be the head or a user interceptor.
@@ -269,6 +273,9 @@ internal final class ServerInterceptorPipeline<Request, Response> {
     self._isOpen = false
     // Each context hold a ref to the pipeline; break the retain cycle.
     self._userContexts.removeAll()
+    // Drop the refs to the server handler.
+    self._onRequestPart = nil
+    self._onResponsePart = nil
   }
 }