瀏覽代碼

Fix an ObjC memory consumption increase as the app runs. (#914)

* Fix an ObjC memory consumption increase as the app runs.

This reflects https://github.com/grpc/grpc/pull/21798 in gRPC-ObjC.

* Fix spinloop handling.

* Fix `autoreleasepool` not being available on Linux.

* Fix object lifetime in ChannelArgumentTests.
Daniel Alm 5 年之前
父節點
當前提交
dd598a1c33

+ 47 - 31
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -19,6 +19,17 @@
 #endif
 import Foundation
 
+/// Execute the given closure and ensure we release all auto pools if needed.
+@inlinable
+internal func withAutoReleasePool<T>(_ execute: () throws -> T) rethrows -> T {
+  #if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
+  return try autoreleasepool {
+    try execute()
+  }
+  #else
+  return try execute()
+  #endif
+}
 /// A type indicating the kind of event returned by the completion queue
 enum CompletionType {
   case queueShutdown
@@ -121,39 +132,44 @@ class CompletionQueue {
     }
     let spinloopThreadQueue = DispatchQueue(label: threadLabel)
     spinloopThreadQueue.async {
-      spinloop: while true {
-        let event = cgrpc_completion_queue_get_next_event(self.underlyingCompletionQueue, 600)
-        switch event.type {
-        case GRPC_OP_COMPLETE:
-          let tag = Int(bitPattern:cgrpc_event_tag(event))
-          self.operationGroupsMutex.lock()
-          let operationGroup = self.operationGroups[tag]
-          self.operationGroupsMutex.unlock()
-          if let operationGroup = operationGroup {
-            // call the operation group completion handler
-            operationGroup.success = (event.success == 1)
-            operationGroup.completion?(operationGroup)
-            self.operationGroupsMutex.synchronize {
-              self.operationGroups[tag] = nil
+      var spinloopActive = true
+      while spinloopActive {
+        withAutoReleasePool {
+          let event = cgrpc_completion_queue_get_next_event(self.underlyingCompletionQueue, 600)
+          switch event.type {
+          case GRPC_OP_COMPLETE:
+            let tag = Int(bitPattern:cgrpc_event_tag(event))
+            self.operationGroupsMutex.lock()
+            let operationGroup = self.operationGroups[tag]
+            self.operationGroupsMutex.unlock()
+            if let operationGroup = operationGroup {
+              // call the operation group completion handler
+              operationGroup.success = (event.success == 1)
+              operationGroup.completion?(operationGroup)
+              self.operationGroupsMutex.synchronize {
+                self.operationGroups[tag] = nil
+              }
+            } else {
+              print("CompletionQueue.runToCompletion error: operation group with tag \(tag) not found")
             }
-          } else {
-            print("CompletionQueue.runToCompletion error: operation group with tag \(tag) not found")
-          }
-        case GRPC_QUEUE_SHUTDOWN:
-          self.operationGroupsMutex.lock()
-          let currentOperationGroups = self.operationGroups
-          self.operationGroups = [:]
-          self.operationGroupsMutex.unlock()
-          
-          for operationGroup in currentOperationGroups.values {
-            operationGroup.success = false
-            operationGroup.completion?(operationGroup)
+          case GRPC_QUEUE_SHUTDOWN:
+            self.operationGroupsMutex.lock()
+            let currentOperationGroups = self.operationGroups
+            self.operationGroups = [:]
+            self.operationGroupsMutex.unlock()
+
+            for operationGroup in currentOperationGroups.values {
+              operationGroup.success = false
+              operationGroup.completion?(operationGroup)
+            }
+            spinloopActive = false
+            return
+          case GRPC_QUEUE_TIMEOUT:
+            return
+          default:
+            spinloopActive = false
+            return
           }
-          break spinloop
-        case GRPC_QUEUE_TIMEOUT:
-          continue spinloop
-        default:
-          break spinloop
         }
       }
       // when the queue stops running, call the queue completion handler

+ 41 - 36
Sources/SwiftGRPC/Core/Server.swift

@@ -85,46 +85,51 @@ public class Server {
         var handler = Handler(underlyingServer: self.underlyingServer)
         // Tell gRPC to store the next call's information in this handler object.
         try handler.requestCall(tag: Server.handlerCallTag)
-        spinloop: while true {
-          // block while waiting for an incoming request
-          let event = self.completionQueue.wait(timeout: self.loopTimeout)
+        var spinloopActive = true
+        while spinloopActive {
+          try withAutoReleasePool {
+            // block while waiting for an incoming request
+            let event = self.completionQueue.wait(timeout: self.loopTimeout)
 
-          if event.type == .complete {
-            if event.tag == Server.handlerCallTag {
-              // run the handler and remove it when it finishes
-              if event.success != 0 {
-                // hold onto the handler while it runs
-                var strongHandlerReference: Handler?
-                strongHandlerReference = handler
-                // To prevent the "Variable 'strongHandlerReference' was written to, but never read" warning.
-                _ = strongHandlerReference
-                // this will start the completion queue on a new thread
-                handler.completionQueue.runToCompletion {
-                  // release the handler when it finishes
-                  strongHandlerReference = nil
-                }
-                
-                // Dispatch the handler function on a separate thread.
-                let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
-                // Needs to be copied, because we will change the value of `handler` right after this.
-                let handlerCopy = handler
-                handlerDispatchThreadQueue.async {
-                  handlerFunction(handlerCopy)
+            if event.type == .complete {
+              if event.tag == Server.handlerCallTag {
+                // run the handler and remove it when it finishes
+                if event.success != 0 {
+                  // hold onto the handler while it runs
+                  var strongHandlerReference: Handler?
+                  strongHandlerReference = handler
+                  // To prevent the "Variable 'strongHandlerReference' was written to, but never read" warning.
+                  _ = strongHandlerReference
+                  // this will start the completion queue on a new thread
+                  handler.completionQueue.runToCompletion {
+                    // release the handler when it finishes
+                    strongHandlerReference = nil
+                  }
+
+                  // Dispatch the handler function on a separate thread.
+                  let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
+                  // Needs to be copied, because we will change the value of `handler` right after this.
+                  let handlerCopy = handler
+                  handlerDispatchThreadQueue.async {
+                    handlerFunction(handlerCopy)
+                  }
                 }
-              }
 
-              // This handler has now been "used up" for the current call; replace it with a fresh one for the next
-              // loop iteration.
-              handler = Handler(underlyingServer: self.underlyingServer)
-              try handler.requestCall(tag: Server.handlerCallTag)
-            } else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
-              break spinloop
+                // This handler has now been "used up" for the current call; replace it with a fresh one for the next
+                // loop iteration.
+                handler = Handler(underlyingServer: self.underlyingServer)
+                try handler.requestCall(tag: Server.handlerCallTag)
+              } else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
+                spinloopActive = false
+                return
+              }
+            } else if event.type == .queueTimeout {
+              // Everything is fine, just start over *while continuing to use the existing handler*.
+              return
+            } else if event.type == .queueShutdown {
+              spinloopActive = false
+              return
             }
-          } else if event.type == .queueTimeout {
-            // Everything is fine, just start over *while continuing to use the existing handler*.
-            continue
-          } else if event.type == .queueShutdown {
-            break spinloop
           }
         }
       } catch {

+ 10 - 5
Tests/SwiftGRPCTests/ChannelArgumentTests.swift

@@ -62,27 +62,32 @@ class ChannelArgumentTests: BasicEchoTestCase {
 extension ChannelArgumentTests {
   func testArgumentKey() {
     let argument = Channel.Argument.defaultAuthority("default")
-    XCTAssertEqual(String(cString: argument.toCArg().wrapped.key), "grpc.default_authority")
+    let cArg = argument.toCArg()  // Needs to be stored in a variable to ensure it doesn't get deallocated too early.
+    XCTAssertEqual(String(cString: cArg.wrapped.key), "grpc.default_authority")
   }
 
   func testStringArgument() {
     let argument = Channel.Argument.primaryUserAgent("Primary/0.1")
-    XCTAssertEqual(String(cString: argument.toCArg().wrapped.value.string), "Primary/0.1")
+    let cArg = argument.toCArg()
+    XCTAssertEqual(String(cString: cArg.wrapped.value.string), "Primary/0.1")
   }
 
   func testIntegerArgument() {
     let argument = Channel.Argument.http2MaxPingsWithoutData(5)
-    XCTAssertEqual(argument.toCArg().wrapped.value.integer, 5)
+    let cArg = argument.toCArg()
+    XCTAssertEqual(cArg.wrapped.value.integer, 5)
   }
 
   func testBoolArgument() {
     let argument = Channel.Argument.keepAlivePermitWithoutCalls(true)
-    XCTAssertEqual(argument.toCArg().wrapped.value.integer, 1)
+    let cArg = argument.toCArg()
+    XCTAssertEqual(cArg.wrapped.value.integer, 1)
   }
 
   func testTimeIntervalArgument() {
     let argument = Channel.Argument.keepAliveTime(2.5)
-    XCTAssertEqual(argument.toCArg().wrapped.value.integer, 2500) // in ms
+    let cArg = argument.toCArg()
+    XCTAssertEqual(cArg.wrapped.value.integer, 2500) // in ms
   }
 }