2
0
Эх сурвалжийг харах

Streaming client and server in the Echo example

Tim Burks 9 жил өмнө
parent
commit
ff5b40f486

+ 65 - 37
Examples/Echo/Swift/Echo/AppDelegate.swift

@@ -44,52 +44,80 @@ class AppDelegate: NSObject, NSApplicationDelegate {
     startServer(address:"localhost:8081")
   }
 
-  func log(_ message: String) {
-    print(message)
-  }
-
   func startServer(address:String) {
     let fileDescriptorSet = FileDescriptorSet(filename:"echo.out")
-    DispatchQueue.global().async {
-      self.log("Server Starting")
-      self.log("GRPC version " + gRPC.version())
+    print("Server Starting")
+    print("GRPC version " + gRPC.version())
 
-      let server = gRPC.Server(address:address)
-      server.start()
+    let server = gRPC.Server(address:address)
 
-      while(true) {
-        let (callError, completionType, requestHandler) = server.getNextRequest(timeout:600.0)
-        if (callError != GRPC_CALL_OK) {
-          self.log("Call error \(callError)")
-          self.log("------------------------------")
-        } else if (completionType == GRPC_OP_COMPLETE) {
-          if let requestHandler = requestHandler {
-            self.log("Received request to " + requestHandler.host()
-              + " calling " + requestHandler.method()
-              + " from " + requestHandler.caller())
+    server.run {(requestHandler) in
+      print("Received request to " + requestHandler.host()
+        + " calling " + requestHandler.method()
+        + " from " + requestHandler.caller())
 
-            requestHandler.receiveMessage(initialMetadata:Metadata())
-            {(requestBuffer) in
-              if let requestBuffer = requestBuffer,
-                let requestMessage = fileDescriptorSet.readMessage(name:"EchoRequest",
-                                                                   proto:requestBuffer.data()) {
-                requestMessage.forOneField(name:"text") {(field) in
-                  let replyMessage = fileDescriptorSet.createMessage(name:"EchoResponse")!
-                  let text = "echo " + field.string()
-                  replyMessage.addField(name:"text", value:text)
-                  requestHandler.sendResponse(message:ByteBuffer(data:replyMessage.serialize()),
-                                              trailingMetadata:Metadata())
-                }
-              }
+      if (requestHandler.method() == "/echo.Echo/Get") {
+        requestHandler.receiveMessage(initialMetadata:Metadata())
+        {(requestBuffer) in
+          if let requestBuffer = requestBuffer,
+            let requestMessage =
+            fileDescriptorSet.readMessage(name:"EchoRequest",
+                                          proto:requestBuffer.data()) {
+            requestMessage.forOneField(name:"text") {(field) in
+              let replyMessage = fileDescriptorSet.createMessage(name:"EchoResponse")!
+              let text = "echo " + field.string()
+              replyMessage.addField(name:"text", value:text)
+              requestHandler.sendResponse(message:ByteBuffer(data:replyMessage.serialize()),
+                                          trailingMetadata:Metadata())
             }
           }
-        } else if (completionType == GRPC_QUEUE_TIMEOUT) {
-          // everything is fine
-        } else if (completionType == GRPC_QUEUE_SHUTDOWN) {
-          // we should stop
         }
       }
+
+      if (requestHandler.method() == "/echo.Echo/Update") {
+        requestHandler.sendMetadata(
+          initialMetadata: Metadata(),
+          completion: {
+            self.handleMessage(
+              fileDescriptorSet: fileDescriptorSet,
+              requestHandler: requestHandler)
+            requestHandler.receiveClose() {
+              requestHandler.sendStatus(trailingMetadata: Metadata(), completion: {
+                print("status sent")
+                requestHandler.shutdown()
+              })
+            }
+          }
+        )
+
+      }
     }
   }
-}
 
+  func handleMessage(fileDescriptorSet: FileDescriptorSet,
+                     requestHandler: Handler) {
+    requestHandler.receiveMessage()
+      {(requestBuffer) in
+        if let requestBuffer = requestBuffer,
+          let requestMessage =
+          fileDescriptorSet.readMessage(name:"EchoRequest",
+                                        proto:requestBuffer.data()) {
+          requestMessage.forOneField(name:"text") {(field) in
+            let replyMessage = fileDescriptorSet.createMessage(name:"EchoResponse")!
+            let text = "echo " + field.string()
+            replyMessage.addField(name:"text", value:text)
+            requestHandler.sendResponse(
+            message:ByteBuffer(data:replyMessage.serialize())) {
+              self.handleMessage(fileDescriptorSet:fileDescriptorSet, requestHandler:requestHandler)
+            }
+          }
+        } else {
+          // an empty message means close the connection
+          requestHandler.sendStatus(trailingMetadata: Metadata(), completion: {
+            print("status sent")
+            requestHandler.shutdown()
+          })
+        }
+    }
+  }
+}

+ 6 - 7
Examples/Echo/Swift/Echo/EchoViewController.swift

@@ -104,7 +104,6 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
                                           metadata:requestMetadata)
           { (response) in
             self.log("Received status: \(response.status) " + response.statusDetails)
-
             if let responseBuffer = response.message,
               let responseMessage = self.fileDescriptorSet.readMessage(
                 name:"EchoResponse",
@@ -149,13 +148,13 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
     let operations = OperationGroup(call:call!, operations:[operation_sendInitialMetadata])
     { (event) in
+      print("client sendInitialMetadata complete with status \(event.type) \(event.tag)")
       if (event.type == GRPC_OP_COMPLETE) {
         print("call status \(event.type) \(event.tag)")
       } else {
         return
       }
     }
-
     let call_error = client!.perform(call:call!, operations:operations)
     if call_error != GRPC_CALL_OK {
       print("call error: \(call_error)")
@@ -166,7 +165,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
     let operations = OperationGroup(call:call!, operations:[operation_receiveInitialMetadata])
     { (event) in
-      print("receive initial metadata status \(event.type) \(event.tag)")
+      print("client receiveInitialMetadata complete with status \(event.type) \(event.tag)")
       let initialMetadata = operation_receiveInitialMetadata.metadata()
       for j in 0..<initialMetadata.count() {
         print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
@@ -186,7 +185,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
     let operations = OperationGroup(call:call!, operations:[operation_sendMessage])
     { (event) in
-      print("send message call status \(event.type) \(event.tag)")
+      print("client sendMessage complete with status \(event.type) \(event.tag)")
     }
     let call_error = client!.perform(call:call!, operations:operations)
     if call_error != GRPC_CALL_OK {
@@ -199,7 +198,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     let operations = OperationGroup(call:call!,
                                     operations:[operation_receiveStatus])
     { (event) in
-      print("receive status call status \(event.type) \(event.tag)")
+      print("client receiveStatus complete with status \(event.type) \(event.tag)")
       print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
     }
     let call_error = client!.perform(call:call!, operations:operations)
@@ -212,7 +211,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     let operation_receiveMessage = Operation_ReceiveMessage()
     let operations = OperationGroup(call:call!, operations:[operation_receiveMessage])
     { (event) in
-      print("call status \(event.type) \(event.tag)")
+      print("client receiveMessage complete with status \(event.type) \(event.tag)")
       if let messageBuffer = operation_receiveMessage.message() {
         let responseMessage = self.fileDescriptorSet.readMessage(
           name:"EchoResponse",
@@ -235,7 +234,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     let operation_sendCloseFromClient = Operation_SendCloseFromClient()
     let operations = OperationGroup(call:call!, operations:[operation_sendCloseFromClient])
     { (event) in
-      print("send close call status \(event.type) \(event.tag)")
+      print("client sendClose complete with status \(event.type) \(event.tag)")
       self.streaming = false
     }
     let call_error = client!.perform(call:call!, operations:operations)

+ 1 - 0
Packages/CgRPC/Sources/call.c

@@ -1,3 +1,4 @@
+
 /*
  *
  * Copyright 2016, Google Inc.

+ 1 - 0
Packages/gRPC/Sources/Client.swift

@@ -96,6 +96,7 @@ public class Client {
                                            operation_receiveStatusOnClient,
                                            operation_receiveMessage])
     { (event) in
+      print("client nonstreaming call complete")
       if (event.type == GRPC_OP_COMPLETE) {
         let response = CallResponse(status:operation_receiveStatusOnClient.status(),
                                     statusDetails:operation_receiveStatusOnClient.statusDetails(),

+ 91 - 1
Packages/gRPC/Sources/Handler.swift

@@ -150,7 +150,97 @@ public class Handler {
   }
 
 
-  func shutdown() {
+  public func shutdown() {
     cgrpc_completion_queue_shutdown(completionQueue.cq)
   }
+
+
+  public func sendMetadata(initialMetadata: Metadata,
+                           completion:(() -> Void)) {
+    let call = self.call()
+    let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:initialMetadata);
+    let operations = OperationGroup(call:call, operations:[operation_sendInitialMetadata])
+    {(event) in
+      if (event.type == GRPC_OP_COMPLETE) {
+        print("server sendMetadata complete")
+        completion()
+      } else {
+        completion()
+      }
+    }
+    self.completionQueue.operationGroups[operations.tag] = operations
+    _ = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
+  }
+
+  /// Receive the message sent with a call
+  ///
+  /// - Returns: a tuple containing status codes and a message (if available)
+  public func receiveMessage(completion:((ByteBuffer?) -> Void)) -> Void {
+    let call = self.call()
+    let operation_receiveMessage = Operation_ReceiveMessage()
+    let operations = OperationGroup(call:call, operations:[operation_receiveMessage])
+    {(event) in
+      if (event.type == GRPC_OP_COMPLETE) {
+        print("server receiveMessage complete")
+        if let message = operation_receiveMessage.message() {
+          completion(message)
+        } else {
+          completion(nil)
+        }
+      } else {
+        completion(nil)
+      }
+    }
+    self.completionQueue.operationGroups[operations.tag] = operations
+    let call_error = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
+    print("perform receiveMessage \(call_error)")
+  }
+
+  /// Sends the response to a request
+  ///
+  /// - Parameter message: the message to send
+  /// - Returns: a tuple containing status codes
+  public func sendResponse(message: ByteBuffer,
+                           completion: @escaping () -> Void) -> Void {
+    let call = self.call()
+    let operation_sendMessage = Operation_SendMessage(message:message)
+    let operations = OperationGroup(call:call, operations:[operation_sendMessage])
+    {(event) in
+      print("server sendResponse complete")
+      completion()
+    }
+    self.completionQueue.operationGroups[operations.tag] = operations
+    _ = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
+  }
+
+  public func receiveClose(completion: @escaping () -> Void) -> Void {
+    let call = self.call()
+    let operation_receiveClose = Operation_ReceiveCloseOnServer()
+    let operations = OperationGroup(call:call, operations:[operation_receiveClose])
+    {(event) in
+      print("server receiveClose complete")
+      completion()
+    }
+    self.completionQueue.operationGroups[operations.tag] = operations
+    let call_error = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
+    print("perform receiveClose \(call_error)")
+  }
+
+  public func sendStatus(trailingMetadata: Metadata,
+                         completion:(() -> Void)) -> Void {
+    let call = self.call()
+    let operation_sendStatusFromServer = Operation_SendStatusFromServer(status:0,
+                                                                        statusDetails:"OK",
+                                                                        metadata:trailingMetadata)
+    let operations = OperationGroup(call:call, operations:[operation_sendStatusFromServer])
+    {(event) in
+      print("server sendStatus complete")
+      completion()
+    }
+    self.completionQueue.operationGroups[operations.tag] = operations
+    _ = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
+  }
+
+  
+
 }

+ 19 - 2
Packages/gRPC/Sources/Server.swift

@@ -65,6 +65,25 @@ public class Server {
     cgrpc_server_start(s);
   }
 
+  public func run(handler: @escaping (Handler) -> Void) {
+    self.start()
+    DispatchQueue.global().async {
+      while(true) {
+        let (callError, completionType, requestHandler) =
+          self.getNextRequest(timeout:600.0)
+        if (callError != GRPC_CALL_OK) {
+          print("Call error \(callError)")
+        } else if (completionType == GRPC_OP_COMPLETE) {
+          handler(requestHandler!)
+        } else if (completionType == GRPC_QUEUE_TIMEOUT) {
+          // everything is fine
+        } else if (completionType == GRPC_QUEUE_SHUTDOWN) {
+          break
+        }
+      }
+    }
+  }
+
   /// Gets the next request sent to the server
   ///
   /// - Returns: a tuple containing the results of waiting and a possible Handler for the request
@@ -76,12 +95,10 @@ public class Server {
     } else {
       let event = self.completionQueue.waitForCompletion(timeout:timeout)
       if (event.type == GRPC_OP_COMPLETE) {
-
         handler.completionQueue.run() {
           self.handlers.remove(handler)
         }
         self.handlers.add(handler)
-
         return (GRPC_CALL_OK, GRPC_OP_COMPLETE, handler)
       } else {
         return (GRPC_CALL_OK, event.type, nil)