Browse Source

Echo app non-streaming server works with new API

Tim Burks 9 years ago
parent
commit
a9d8c631f3

+ 2 - 2
Examples/Echo/Swift/Echo/AppDelegate.swift

@@ -41,7 +41,7 @@ class AppDelegate: NSObject, NSApplicationDelegate {
 
   func applicationDidFinishLaunching(_ aNotification: Notification) {
     gRPC.initialize()
-    //startServer(address:"localhost:8081")
+    startServer(address:"localhost:8081")
   }
 
   func log(_ message: String) {
@@ -58,7 +58,7 @@ class AppDelegate: NSObject, NSApplicationDelegate {
       server.start()
 
       while(true) {
-        let (callError, completionType, requestHandler) = server.getNextRequest(timeout:1.0)
+        let (callError, completionType, requestHandler) = server.getNextRequest(timeout:600.0)
         if (callError != GRPC_CALL_OK) {
           self.log("Call error \(callError)")
           self.log("------------------------------")

+ 2 - 2
Examples/Echo/Swift/Echo/EchoViewController.swift

@@ -86,7 +86,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
 
     if (self.streamingButton.intValue == 0) {
       client = Client(address:address)
-      self.client!.completionQueue.run()
+      self.client!.completionQueue.run() {}
 
       DispatchQueue.global().async {
         // build the message
@@ -126,7 +126,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     else {
       if (!streaming) {
         client = Client(address:address)
-        self.client!.completionQueue.run()
+        self.client!.completionQueue.run() {}
 
         call = client?.createCall(host: "foo.test.google.fr",
                                   method: "/echo.Echo/Update",

+ 1 - 0
Packages/CgRPC/Sources/cgrpc.h

@@ -145,6 +145,7 @@ cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s);
 grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq,
                                                  double timeout);
 void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
+void cgrpc_completion_queue_shutdown(cgrpc_completion_queue *cq);
 
 // server request handlers
 cgrpc_handler *cgrpc_handler_create_with_server(cgrpc_server *server);

+ 5 - 0
Packages/CgRPC/Sources/completion_queue.c

@@ -49,3 +49,8 @@ void cgrpc_completion_queue_drain(grpc_completion_queue *cq) {
     ev = grpc_completion_queue_next(cq, cgrpc_deadline_in_seconds_from_now(5), NULL);
   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
 }
+
+void cgrpc_completion_queue_shutdown(cgrpc_completion_queue *cq) {
+  grpc_completion_queue_shutdown(cq);
+}
+

+ 1 - 0
Packages/gRPC/Sources/Client.swift

@@ -49,6 +49,7 @@ public class Client {
   public init(address: String) {
     c = cgrpc_client_create(address)
     completionQueue = CompletionQueue(cq:cgrpc_client_completion_queue(c))
+    completionQueue.name = "Client"
   }
 
   deinit {

+ 7 - 2
Packages/gRPC/Sources/CompletionQueue.swift

@@ -37,6 +37,8 @@
 /// A gRPC Completion Queue
 public class CompletionQueue {
 
+  var name : String!
+
   /// Pointer to underlying C representation
   var cq : UnsafeMutableRawPointer!
 
@@ -58,7 +60,7 @@ public class CompletionQueue {
     return cgrpc_completion_queue_get_next_event(cq, timeout);
   }
 
-  public func run() {
+  public func run(completion:@escaping () -> Void) {
     DispatchQueue.global().async {
       var running = true
       while (running) {
@@ -80,7 +82,10 @@ public class CompletionQueue {
           break
         }
       }
-      print("exiting completion queue loop")
+      DispatchQueue.main.async {
+        print("exiting completion queue loop " + self.name)
+        completion()
+      }
     }
   }
 }

+ 9 - 1
Packages/gRPC/Sources/Handler.swift

@@ -41,7 +41,7 @@ public class Handler {
   var h: UnsafeMutableRawPointer!
 
   /// Completion queue for handler response operations
-  var completionQueue: CompletionQueue
+  public var completionQueue: CompletionQueue
 
   /// Metadata received with the request
   public var requestMetadata: Metadata
@@ -53,6 +53,7 @@ public class Handler {
     self.h = h;
     self.requestMetadata = Metadata()
     self.completionQueue = CompletionQueue(cq:cgrpc_handler_get_completion_queue(h))
+    self.completionQueue.name = "Handler"
   }
 
   deinit {
@@ -141,8 +142,15 @@ public class Handler {
         operation_sendMessage])
     {(call_error) in
       print("server response complete")
+      print("finished with handler \(self)")
+      self.shutdown()
     }
     self.completionQueue.operationGroups[operations.tag] = operations
     _ = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
   }
+
+
+  func shutdown() {
+    cgrpc_completion_queue_shutdown(completionQueue.cq)
+  }
 }

+ 11 - 0
Packages/gRPC/Sources/Server.swift

@@ -43,12 +43,17 @@ public class Server {
   /// Completion queue used for server operations
   var completionQueue: CompletionQueue
 
+  /// Active handlers
+  var handlers : NSMutableSet!
+
   /// Initializes a Server
   ///
   /// - Parameter address: the address where the server will listen
   public init(address:String) {
     s = cgrpc_server_create(address)
     completionQueue = CompletionQueue(cq:cgrpc_server_get_completion_queue(s))
+    completionQueue.name = "Server " + address
+    handlers = NSMutableSet()
   }
 
   deinit {
@@ -71,6 +76,12 @@ public class Server {
     } else {
       let event = self.completionQueue.waitForCompletion(timeout:timeout)
       if (event.type == GRPC_OP_COMPLETE) {
+
+        handler.completionQueue.run() {
+          self.handlers.remove(handler)
+        }
+        self.handlers.add(handler)
+
         return (GRPC_CALL_OK, GRPC_OP_COMPLETE, handler)
       } else {
         return (GRPC_CALL_OK, event.type, nil)