Browse Source

Fix several completion queue leaks in tests; ensure that cancelling a session server-side always shuts down the corresponding completion queue.

Also add the completion queue's name to its thread label.
Daniel Alm 7 years ago
parent
commit
e00e1e8e80

+ 12 - 7
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -67,7 +67,9 @@ class CompletionQueue {
   /// Mutex for synchronizing access to operationGroups
   private let operationGroupsMutex: Mutex = Mutex()
   
-  private var hasBeenShutdown = false
+  private var _hasBeenShutdown = false
+  
+  public var hasBeenShutdown: Bool { return operationGroupsMutex.synchronize { _hasBeenShutdown } }
 
   /// Initializes a CompletionQueue
   ///
@@ -80,7 +82,7 @@ class CompletionQueue {
   
   deinit {
     operationGroupsMutex.synchronize {
-      hasBeenShutdown = true
+      _hasBeenShutdown = true
     }
     cgrpc_completion_queue_shutdown(underlyingCompletionQueue)
     cgrpc_completion_queue_drain(underlyingCompletionQueue)
@@ -101,7 +103,7 @@ class CompletionQueue {
   /// - Parameter operationGroup: the operation group to handle.
   func register(_ operationGroup: OperationGroup, onSuccess: () throws -> Void) throws {
     try operationGroupsMutex.synchronize {
-      guard !hasBeenShutdown
+      guard !_hasBeenShutdown
         else { throw CallError.completionQueueShutdown }
       operationGroups[operationGroup.tag] = operationGroup
       try onSuccess()
@@ -113,7 +115,11 @@ class CompletionQueue {
   /// - Parameter completion: a completion handler that is called when the queue stops running
   func runToCompletion(completion: (() -> Void)?) {
     // run the completion queue on a new background thread
-    let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
+    var threadLabel = "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread"
+    if let name = self.name {
+      threadLabel.append(" (\(name))")
+    }
+    let spinloopThreadQueue = DispatchQueue(label: threadLabel)
     spinloopThreadQueue.async {
       spinloop: while true {
         let event = cgrpc_completion_queue_get_next_event(self.underlyingCompletionQueue, 600)
@@ -147,7 +153,6 @@ class CompletionQueue {
         case GRPC_QUEUE_TIMEOUT:
           continue spinloop
         default:
-          print("CompletionQueue.runToCompletion error: unknown event type \(event.type)")
           break spinloop
         }
       }
@@ -165,9 +170,9 @@ class CompletionQueue {
   func shutdown() {
     var needsShutdown = false
     operationGroupsMutex.synchronize {
-      if !hasBeenShutdown {
+      if !_hasBeenShutdown {
         needsShutdown = true
-        hasBeenShutdown = true
+        _hasBeenShutdown = true
       }
     }
     if needsShutdown {

+ 1 - 0
Sources/SwiftGRPC/Runtime/ServerSession.swift

@@ -57,6 +57,7 @@ open class ServerSessionBase: ServerSession {
   
   public func cancel() {
     call.cancel()
+    handler.shutdown()
   }
   
   func sendInitialMetadataAndWait() throws {

+ 19 - 14
Sources/SwiftGRPC/Runtime/ServiceServer.swift

@@ -76,33 +76,38 @@ open class ServiceServer {
       
       do {
         do {
-          if let responseStatus = try strongSelf.handleMethod(unwrappedMethod, handler: handler) {
+          if let responseStatus = try strongSelf.handleMethod(unwrappedMethod, handler: handler),
+            !handler.completionQueue.hasBeenShutdown {
             // The handler wants us to send the status for them; do that.
             // But first, ensure that all outgoing messages have been enqueued, to avoid ending the stream prematurely:
             handler.call.messageQueueEmpty.wait()
             try handler.sendStatus(responseStatus)
           }
         } catch _ as HandleMethodError {
-          // The method is not implemented by the service - send a status saying so.
-          try handler.call.perform(OperationGroup(
-            call: handler.call,
-            operations: [
-              .sendInitialMetadata(Metadata()),
-              .receiveCloseOnServer,
-              .sendStatusFromServer(.unimplemented, "unknown method " + unwrappedMethod, Metadata())
-          ]) { _ in
-            handler.shutdown()
-          })
+          if !handler.completionQueue.hasBeenShutdown {
+            // The method is not implemented by the service - send a status saying so.
+            try handler.call.perform(OperationGroup(
+              call: handler.call,
+              operations: [
+                .sendInitialMetadata(Metadata()),
+                .receiveCloseOnServer,
+                .sendStatusFromServer(.unimplemented, "unknown method " + unwrappedMethod, Metadata())
+            ]) { _ in
+              handler.shutdown()
+            })
+          }
         }
       } catch {
         // The individual sessions' `run` methods (which are called by `self.handleMethod`) only throw errors if
         // they encountered an error that has not also been "seen" by the actual request handler implementation.
         // Therefore, this error is "really unexpected" and  should be logged here - there's nowhere else to log it otherwise.
-        print("ServiceServer error: \(error)")
+        print("ServiceServer unexpected error handling method '\(unwrappedMethod)': \(error)")
         do {
-          try handler.sendStatus((error as? ServerStatus) ?? .processingError)
+          if !handler.completionQueue.hasBeenShutdown {
+            try handler.sendStatus((error as? ServerStatus) ?? .processingError)
+          }
         } catch {
-          print("ServiceServer error sending status: \(error)")
+          print("ServiceServer unexpected error handling method '\(unwrappedMethod)'; sending status failed as well: \(error)")
           handler.shutdown()
         }
       }

+ 3 - 3
Tests/SwiftGRPCTests/ServerTimeoutTests.swift

@@ -26,17 +26,17 @@ fileprivate class TimingOutEchoProvider: Echo_EchoProvider {
   
   func expand(request: Echo_EchoRequest, session: Echo_EchoExpandSession) throws -> ServerStatus? {
     Thread.sleep(forTimeInterval: 0.2)
-    return nil
+    return .ok
   }
   
   func collect(session: Echo_EchoCollectSession) throws -> Echo_EchoResponse? {
     Thread.sleep(forTimeInterval: 0.2)
-    return nil
+    return Echo_EchoResponse()
   }
   
   func update(session: Echo_EchoUpdateSession) throws -> ServerStatus? {
     Thread.sleep(forTimeInterval: 0.2)
-    return nil
+    return .ok
   }
 }
 

+ 10 - 5
Tests/SwiftGRPCTests/ServiceClientTests.swift

@@ -18,25 +18,30 @@ import Foundation
 import XCTest
 
 class ServiceClientTests: BasicEchoTestCase {
-  private lazy var sharedChannel = Channel(address: address, secure: false)
+  private var sharedChannel: Channel?
 
   override func setUp() {
     super.setUp()
     sharedChannel = Channel(address: address, secure: false)
   }
+  
+  override func tearDown() {
+    sharedChannel = nil
+    super.tearDown()
+  }
 
   func testSharingChannelBetweenClientsUnaryAsync() {
     let firstCallExpectation = expectation(description: "First call completes successfully")
     let secondCallExpectation = expectation(description: "Second call completes successfully")
 
     do {
-      let client1 = Echo_EchoServiceClient(channel: sharedChannel)
+      let client1 = Echo_EchoServiceClient(channel: sharedChannel!)
       try _ = client1.get(Echo_EchoRequest(text: "foo")) { _, callResult in
         XCTAssertEqual(.ok, callResult.statusCode)
         firstCallExpectation.fulfill()
       }
 
-      let client2 = Echo_EchoServiceClient(channel: sharedChannel)
+      let client2 = Echo_EchoServiceClient(channel: sharedChannel!)
       try _ = client2.get(Echo_EchoRequest(text: "foo")) { _, callResult in
         XCTAssertEqual(.ok, callResult.statusCode)
         secondCallExpectation.fulfill()
@@ -50,7 +55,7 @@ class ServiceClientTests: BasicEchoTestCase {
 
   func testSharedChannelStillWorksAfterFirstUnaryClientCompletes() {
     do {
-      let client1 = Echo_EchoServiceClient(channel: sharedChannel)
+      let client1 = Echo_EchoServiceClient(channel: sharedChannel!)
       let response1 = try client1.get(Echo_EchoRequest(text: "foo")).text
       XCTAssertEqual("Swift echo get: foo", response1)
     } catch let error {
@@ -58,7 +63,7 @@ class ServiceClientTests: BasicEchoTestCase {
     }
 
     do {
-      let client2 = Echo_EchoServiceClient(channel: sharedChannel)
+      let client2 = Echo_EchoServiceClient(channel: sharedChannel!)
       let response2 = try client2.get(Echo_EchoRequest(text: "foo")).text
       XCTAssertEqual("Swift echo get: foo", response2)
     } catch let error {