Browse Source

Shift the responsibility for draining and destroying a completion queue to the queue itself. This is needed because it appears that otherwise, the underlying completion queue gets deallocated during its spinloop, which it doesn't like.

Daniel Alm 7 years ago
parent
commit
dc451e4984

+ 3 - 0
Sources/CgRPC/shim/cgrpc.h

@@ -110,6 +110,9 @@ void grpc_shutdown(void);
 const char *grpc_version_string(void);
 const char *grpc_g_stands_for(void);
 
+void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
+void grpc_completion_queue_destroy(cgrpc_completion_queue *cq);
+
 // helper
 void cgrpc_free_copied_string(char *string);
 

+ 0 - 2
Sources/CgRPC/shim/channel.c

@@ -71,8 +71,6 @@ void cgrpc_channel_destroy(cgrpc_channel *c) {
   c->channel = NULL;
 
   grpc_completion_queue_shutdown(c->completion_queue);
-  cgrpc_completion_queue_drain(c->completion_queue);
-  grpc_completion_queue_destroy(c->completion_queue);
   free(c);
 }
 

+ 0 - 2
Sources/CgRPC/shim/handler.c

@@ -33,8 +33,6 @@ cgrpc_handler *cgrpc_handler_create_with_server(cgrpc_server *server) {
 
 void cgrpc_handler_destroy(cgrpc_handler *h) {
   grpc_completion_queue_shutdown(h->completion_queue);
-  cgrpc_completion_queue_drain(h->completion_queue);
-  grpc_completion_queue_destroy(h->completion_queue);
   grpc_metadata_array_destroy(&(h->request_metadata_recv));
   grpc_call_details_destroy(&(h->call_details));
   if (h->server_call) {

+ 0 - 2
Sources/CgRPC/shim/server.c

@@ -77,8 +77,6 @@ void cgrpc_server_destroy(cgrpc_server *server) {
   server->server = NULL;
 
   grpc_completion_queue_shutdown(server->completion_queue);
-  cgrpc_completion_queue_drain(server->completion_queue);
-  grpc_completion_queue_destroy(server->completion_queue);
 }
 
 void cgrpc_server_start(cgrpc_server *server) {

+ 5 - 0
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -77,6 +77,11 @@ class CompletionQueue {
     self.underlyingCompletionQueue = underlyingCompletionQueue
     self.name = name
   }
+  
+  deinit {
+    cgrpc_completion_queue_drain(underlyingCompletionQueue)
+    grpc_completion_queue_destroy(underlyingCompletionQueue)
+  }
 
   /// Waits for an operation group to complete
   ///

+ 5 - 5
Sources/SwiftGRPC/Core/Server.swift

@@ -66,8 +66,7 @@ public class Server {
     cgrpc_server_start(underlyingServer)
     // run the server on a new background thread
     dispatchQueue.async {
-      var running = true
-      while running {
+      spinloop: while true {
         do {
           let handler = Handler(underlyingServer: self.underlyingServer)
           try handler.requestCall(tag: Server.handlerCallTag)
@@ -97,16 +96,17 @@ public class Server {
                 }
               }
             } else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
-              running = false // exit the loop
+              break spinloop
             }
           } else if event.type == .queueTimeout {
             // everything is fine
+            continue
           } else if event.type == .queueShutdown {
-            running = false
+            break spinloop
           }
         } catch {
           print("server call error: \(error)")
-          running = false
+          break spinloop
         }
       }
       self.onCompletion?()

+ 3 - 1
Tests/SwiftGRPCTests/EchoTests.swift

@@ -53,10 +53,12 @@ extension EchoTests {
   func testUnaryLotsOfRequests() {
     // No need to spam the log with 50k lines.
     server.shouldLogRequests = false
+    // Sending that many requests at once can sometimes trip things up, it seems.
+    client.timeout = 5.0
     let clockStart = clock()
     let numberOfRequests = 50_000
     for i in 0..<numberOfRequests {
-      if i % 1_000 == 0 {
+      if i % 1_000 == 0 && i > 0 {
         print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
       }
       XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text)

+ 78 - 49
Tests/SwiftGRPCTests/GRPCTests.swift

@@ -46,7 +46,8 @@ class gRPCTests: XCTestCase {
 
 let address = "localhost:8085"
 let host = "example.com"
-let clientText = "hello, server!"
+let evenClientText = "hello, server!"
+let oddClientText = "hello, server, please fail!"
 let serverText = "hello, client!"
 let initialClientMetadata =
   [
@@ -87,6 +88,10 @@ let helloBiDiStream = "/hello.bidi-stream"
 let oddStatusMessage = "OK"
 let evenStatusMessage = "some other status message"
 
+// Parsing very large messages as String is very inefficient,
+// so we avoid it anything above this threshold.
+let sizeThresholdForReturningDataVerbatim = 10_000
+
 func runTest(useSSL: Bool) {
   gRPC.initialize()
 
@@ -147,62 +152,78 @@ func runClient(useSSL: Bool) throws {
   }
 
   channel.host = host
+  let largeMessage = Data(repeating: 88 /* 'X' */, count: 4_000_000)
   for _ in 0..<10 {
     // Send several calls to each server we spin up, to ensure that each individual server can handle many requests.
     try callUnary(channel: channel)
     try callServerStream(channel: channel)
     try callBiDiStream(channel: channel)
   }
+  // Test sending a large message.
+  try callUnaryIndividual(channel: channel, message: largeMessage, shouldSucceed: true)
+  try callUnaryIndividual(channel: channel, message: largeMessage, shouldSucceed: true)
 }
 
 func callUnary(channel: Channel) throws {
-  let message = clientText.data(using: .utf8)
-
+  let evenMessage = evenClientText.data(using: .utf8)!
+  let oddMessage = oddClientText.data(using: .utf8)!
   for i in 0..<steps {
-    let sem = DispatchSemaphore(value: 0)
-    let method = hello
-    let call = channel.makeCall(method)
-    let metadata = Metadata(initialClientMetadata)
-    try call.start(.unary, metadata: metadata, message: message) {
-      response in
-      // verify the basic response from the server
-      XCTAssertEqual(response.statusCode, .ok)
-      XCTAssertEqual(response.statusMessage, (i % 2  == 0) ? evenStatusMessage : oddStatusMessage)
-
-      // verify the message from the server
-      if (i % 2) == 0 {
-        if let resultData = response.resultData {
+    try callUnaryIndividual(channel: channel,
+                            message: (i % 2) == 0 ? evenMessage : oddMessage,
+                            shouldSucceed: (i % 2) == 0)
+  }
+}
+
+func callUnaryIndividual(channel: Channel, message: Data, shouldSucceed: Bool) throws {
+  let sem = DispatchSemaphore(value: 0)
+  let method = hello
+  let call = channel.makeCall(method)
+  let metadata = Metadata(initialClientMetadata)
+  try call.start(.unary, metadata: metadata, message: message) {
+    response in
+    // verify the basic response from the server
+    XCTAssertEqual(response.statusCode, .ok)
+    XCTAssertEqual(response.statusMessage, shouldSucceed ? evenStatusMessage : oddStatusMessage)
+    
+    //print("response.resultData?.count", response.resultData?.count)
+    
+    // verify the message from the server
+    if shouldSucceed {
+      if let resultData = response.resultData {
+        if resultData.count >= sizeThresholdForReturningDataVerbatim {
+          XCTAssertEqual(message, resultData)
+        } else {
           let messageString = String(data: resultData, encoding: .utf8)
           XCTAssertEqual(messageString, serverText)
-        } else {
-          XCTFail("callUnary response missing")
         }
-      }
-      
-      // verify the initial metadata from the server
-      if let initialMetadata = response.initialMetadata {
-        verify_metadata(initialMetadata, expected: initialServerMetadata)
-      } else {
-        XCTFail("callUnary initial metadata missing")
-      }
-      
-      // verify the trailing metadata from the server
-      if let trailingMetadata = response.trailingMetadata {
-        verify_metadata(trailingMetadata, expected: trailingServerMetadata)
       } else {
-        XCTFail("callUnary trailing metadata missing")
+        XCTFail("callUnary response missing")
       }
-
-      // report completion
-      sem.signal()
     }
-    // wait for the call to complete
-    _ = sem.wait()
+    
+    // verify the initial metadata from the server
+    if let initialMetadata = response.initialMetadata {
+      verify_metadata(initialMetadata, expected: initialServerMetadata)
+    } else {
+      XCTFail("callUnary initial metadata missing")
+    }
+    
+    // verify the trailing metadata from the server
+    if let trailingMetadata = response.trailingMetadata {
+      verify_metadata(trailingMetadata, expected: trailingServerMetadata)
+    } else {
+      XCTFail("callUnary trailing metadata missing")
+    }
+    
+    // report completion
+    sem.signal()
   }
+  // wait for the call to complete
+  _ = sem.wait()
 }
 
 func callServerStream(channel: Channel) throws {
-  let message = clientText.data(using: .utf8)
+  let message = evenClientText.data(using: .utf8)
   let metadata = Metadata(initialClientMetadata)
 
   let sem = DispatchSemaphore(value: 0)
@@ -300,14 +321,13 @@ func callBiDiStream(channel: Channel) throws {
 }
 
 func runServer(server: Server) throws -> DispatchSemaphore {
-  var requestCount = 0
   let sem = DispatchSemaphore(value: 0)
   server.run { requestHandler in
     do {
       if let method = requestHandler.method {
         switch method {
         case hello:
-          try handleUnary(requestHandler: requestHandler, requestCount: requestCount)
+          try handleUnary(requestHandler: requestHandler)
         case helloServerStream:
           try handleServerStream(requestHandler: requestHandler)
         case helloBiDiStream:
@@ -316,8 +336,6 @@ func runServer(server: Server) throws -> DispatchSemaphore {
           XCTFail("Invalid method \(method)")
         }
       }
-
-      requestCount += 1
     } catch {
       XCTFail("error \(error)")
     }
@@ -330,33 +348,44 @@ func runServer(server: Server) throws -> DispatchSemaphore {
   return sem
 }
 
-func handleUnary(requestHandler: Handler, requestCount: Int) throws {
+func handleUnary(requestHandler: Handler) throws {
   XCTAssertEqual(requestHandler.host, host)
   XCTAssertEqual(requestHandler.method, hello)
   let initialMetadata = requestHandler.requestMetadata
   verify_metadata(initialMetadata, expected: initialClientMetadata)
   let initialMetadataToSend = Metadata(initialServerMetadata)
+  let receiveSem = DispatchSemaphore(value: 0)
+  var inputMessage: Data?
   try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
     if let messageData = $0 {
-      let messageString = String(data: messageData, encoding: .utf8)
-      XCTAssertEqual(messageString, clientText)
+      inputMessage = messageData
+      if messageData.count < sizeThresholdForReturningDataVerbatim {
+        let messageString = String(data: messageData, encoding: .utf8)!
+        XCTAssertTrue(messageString == evenClientText || messageString == oddClientText,
+                      "handleUnary unexpected message string \(messageString)")
+      }
     } else {
       XCTFail("handleUnary message missing")
     }
+    receiveSem.signal()
   }
+  receiveSem.wait()
 
   // We need to return status OK in both cases, as it seems like the server might never send out the last few messages
   // once it has been asked to send a non-OK status. Alternatively, we could send a non-OK status here, but then we
   // would need to sleep for a few milliseconds before sending the non-OK status.
-  let replyMessage = serverText.data(using: .utf8)!
-  if (requestCount % 2) == 0 {
-    let trailingMetadataToSend = Metadata(trailingServerMetadata)
+  let replyMessage = (inputMessage == nil || inputMessage!.count < sizeThresholdForReturningDataVerbatim)
+    ? serverText.data(using: .utf8)!
+    : inputMessage!
+  let trailingMetadataToSend = Metadata(trailingServerMetadata)
+  if let inputMessage = inputMessage,
+    inputMessage.count >= sizeThresholdForReturningDataVerbatim
+      || inputMessage == evenClientText.data(using: .utf8)! {
     try requestHandler.sendResponse(message: replyMessage,
                                     status: ServerStatus(code: .ok,
                                                          message: evenStatusMessage,
                                                          trailingMetadata: trailingMetadataToSend))
   } else {
-    let trailingMetadataToSend = Metadata(trailingServerMetadata)
     try requestHandler.sendStatus(ServerStatus(code: .ok,
                                                message: oddStatusMessage,
                                                trailingMetadata: trailingMetadataToSend))
@@ -373,7 +402,7 @@ func handleServerStream(requestHandler: Handler) throws {
   try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
     if let messageData = $0 {
       let messageString = String(data: messageData, encoding: .utf8)
-      XCTAssertEqual(messageString, clientText)
+      XCTAssertEqual(messageString, evenClientText)
     } else {
       XCTFail("handleServerStream message missing")
     }