Browse Source

Call API improvements

Tim Burks 9 years ago
parent
commit
ba39d9ad3c

+ 41 - 123
Examples/Echo/Swift/Echo/EchoViewController.swift

@@ -41,8 +41,8 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
   @IBOutlet weak var streamingButton: NSButton!
 
   private var streaming = false
-  var client: Client?
-  var call: Call?
+  var client: Client!
+  var call: Call!
   var fileDescriptorSet : FileDescriptorSet
 
   required init?(coder:NSCoder) {
@@ -93,55 +93,48 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
 
     if (self.streamingButton.intValue == 0) {
       // NONSTREAMING
-      client = Client(address:address)
-
-      DispatchQueue.global().async {
-        // build the message
-        if let requestMessage = self.fileDescriptorSet.createMessage(name:"EchoRequest") {
-          requestMessage.addField(name:"text", value:self.messageField.stringValue)
-
-          let requestHost = "foo.test.google.fr"
-          let requestMethod = "/echo.Echo/Get"
-          let requestBuffer = ByteBuffer(data:requestMessage.serialize())
-          let requestMetadata = Metadata()
-
-          _ = self.client!.performRequest(host:requestHost,
-                                          method:requestMethod,
-                                          message:requestBuffer,
-                                          metadata:requestMetadata)
-          { (response) in
-            self.log("Received status: \(response.status) " + response.statusDetails)
-            if let responseBuffer = response.message,
-              let responseMessage = self.fileDescriptorSet.readMessage(
-                name:"EchoResponse",
-                proto:responseBuffer.data()) {
-              responseMessage.forOneField(name:"text") {(field) in
-                DispatchQueue.main.async {
-                  self.outputField.stringValue = field.string()
-                }
-              }
-            } else {
+
+      // build the message
+      if let requestMessage = self.fileDescriptorSet.createMessage(name:"EchoRequest") {
+        requestMessage.addField(name:"text", value:self.messageField.stringValue)
+        let requestHost = "foo.test.google.fr"
+        let requestMethod = "/echo.Echo/Get"
+        let requestMetadata = Metadata()
+
+        client = Client(address:address)
+        call = self.client.createCall(host: requestHost, method: requestMethod, timeout: 30.0)
+        call.performNonStreamingCall(messageData: requestMessage.serialize(),
+                                     metadata: requestMetadata)
+        { (response) in
+          self.log("Received status: \(response.status) " + response.statusDetails)
+          if let responseBuffer = response.message,
+            let responseMessage = self.fileDescriptorSet.readMessage(
+              name:"EchoResponse",
+              proto:responseBuffer.data()) {
+            responseMessage.forOneField(name:"text") {(field) in
               DispatchQueue.main.async {
-                self.outputField.stringValue = "No message received. gRPC Status \(response.status) " + response.statusDetails
+                self.outputField.stringValue = field.string()
               }
             }
+          } else {
+            DispatchQueue.main.async {
+              self.outputField.stringValue = "No message received. gRPC Status \(response.status) " + response.statusDetails
+            }
           }
         }
       }
     }
     else {
-      // NONSTREAMING
-
+      // STREAMING
       if (!streaming) {
         client = Client(address:address)
-
         call = client?.createCall(host: "foo.test.google.fr",
                                   method: "/echo.Echo/Update",
                                   timeout: 600.0)
-
-        self.sendInitialMetadata()
-        self.receiveInitialMetadata()
-        self.receiveStatus()
+        let metadata = Metadata(
+          pairs:[MetadataPair(key:"x-goog-api-key", value:"YOUR_API_KEY"),
+                 MetadataPair(key:"x-ios-bundle-identifier", value:Bundle.main.bundleIdentifier!)])
+        call.start(metadata:metadata)
         self.receiveMessage() // this should take a block in which we specify what to do
         streaming = true
       }
@@ -149,105 +142,30 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     }
   }
 
-  func sendInitialMetadata() {
-    let metadata = Metadata(
-      pairs:[MetadataPair(key:"x-goog-api-key", value:"YOUR_API_KEY"),
-             MetadataPair(key:"x-ios-bundle-identifier", value:Bundle.main.bundleIdentifier!)])
-    let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
-    let operations = OperationGroup(call:call!, operations:[operation_sendInitialMetadata])
-    { (event) in
-      print("client sendInitialMetadata complete with status \(event.type) \(event.tag)")
-      if (event.type == GRPC_OP_COMPLETE) {
-        print("call status \(event.type) \(event.tag)")
-      } else {
-        return
-      }
-    }
-    let call_error = client!.perform(call:call!, operations:operations)
-    if call_error != GRPC_CALL_OK {
-      print("call error: \(call_error)")
-    }
-  }
-
-  func receiveInitialMetadata() {
-    let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
-    let operations = OperationGroup(call:call!, operations:[operation_receiveInitialMetadata])
-    { (event) in
-      print("client receiveInitialMetadata complete with status \(event.type) \(event.tag)")
-      let initialMetadata = operation_receiveInitialMetadata.metadata()
-      for j in 0..<initialMetadata.count() {
-        print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
-      }
-    }
-    let call_error = client!.perform(call:call!, operations:operations)
-    if call_error != GRPC_CALL_OK {
-      print("call error \(call_error)")
-    }
-  }
-
   func sendMessage() {
     let requestMessage = self.fileDescriptorSet.createMessage(name:"EchoRequest")!
     requestMessage.addField(name:"text", value:self.messageField.stringValue)
     let messageData = requestMessage.serialize()
-    let messageBuffer = ByteBuffer(data:messageData)
-    let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
-    let operations = OperationGroup(call:call!, operations:[operation_sendMessage])
-    { (event) in
-      print("client sendMessage complete with status \(event.type) \(event.tag)")
-    }
-    let call_error = client!.perform(call:call!, operations:operations)
-    if call_error != GRPC_CALL_OK {
-      print("call error \(call_error)")
-    }
-  }
-
-  func receiveStatus() {
-    let operation_receiveStatus = Operation_ReceiveStatusOnClient()
-    let operations = OperationGroup(call:call!,
-                                    operations:[operation_receiveStatus])
-    { (event) in
-      print("client receiveStatus complete with status \(event.type) \(event.tag)")
-      print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
-    }
-    let call_error = client!.perform(call:call!, operations:operations)
-    if call_error != GRPC_CALL_OK {
-      print("call error \(call_error)")
-    }
+    call.sendMessage(data:messageData)
   }
 
   func receiveMessage() {
-    let operation_receiveMessage = Operation_ReceiveMessage()
-    let operations = OperationGroup(call:call!, operations:[operation_receiveMessage])
-    { (event) in
-      print("client receiveMessage complete with status \(event.type) \(event.tag)")
-      if let messageBuffer = operation_receiveMessage.message() {
-        let responseMessage = self.fileDescriptorSet.readMessage(
-          name:"EchoResponse",
-          proto:messageBuffer.data())!
-        responseMessage.forOneField(name:"text") {(field) in
-          DispatchQueue.main.async {
-            self.outputField.stringValue = field.string()
-          }
-          self.receiveMessage()
+    call.receiveMessage() {(data) in
+      let responseMessage = self.fileDescriptorSet.readMessage(
+        name:"EchoResponse",
+        proto:data)!
+      responseMessage.forOneField(name:"text") {(field) in
+        DispatchQueue.main.async {
+          self.outputField.stringValue = field.string()
         }
+        self.receiveMessage()
       }
     }
-    let call_error = client!.perform(call:call!, operations:operations)
-    if call_error != GRPC_CALL_OK {
-      print("call error \(call_error)")
-    }
   }
 
   func sendClose() {
-    let operation_sendCloseFromClient = Operation_SendCloseFromClient()
-    let operations = OperationGroup(call:call!, operations:[operation_sendCloseFromClient])
-    { (event) in
-      print("client sendClose complete with status \(event.type) \(event.tag)")
+    call.close() {
       self.streaming = false
     }
-    let call_error = client!.perform(call:call!, operations:operations)
-    if call_error != GRPC_CALL_OK {
-      print("call error \(call_error)")
-    }
   }
 }

+ 147 - 5
Packages/gRPC/Sources/Call.swift

@@ -71,11 +71,6 @@ public class Call {
     }
   }
 
-  // coming soon
-  func start() {
-
-  }
-
   /// Initiate performance of a call without waiting for completion
   ///
   /// - Parameter operations: array of operations to be performed
@@ -91,4 +86,151 @@ public class Call {
       mutex.unlock()
       return error
   }
+
+
+  /// Performs a nonstreaming gRPC API call
+  ///
+  /// - Parameter host: the gRPC host name for the call
+  /// - Parameter method: the gRPC method name for the call
+  /// - Parameter message: a ByteBuffer containing the message to send
+  /// - Parameter metadata: metadata to send with the call
+  /// - Returns: a CallResponse object containing results of the call
+  public func performNonStreamingCall(messageData: NSData,
+                                      metadata: Metadata,
+                                      completion: ((CallResponse) -> Void)) -> Void   {
+
+    let messageBuffer = ByteBuffer(data:messageData)
+
+    let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
+    let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
+    let operation_sendCloseFromClient = Operation_SendCloseFromClient()
+    let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
+    let operation_receiveStatusOnClient = Operation_ReceiveStatusOnClient()
+    let operation_receiveMessage = Operation_ReceiveMessage()
+
+    let group = OperationGroup(call:self,
+                               operations:[operation_sendInitialMetadata,
+                                           operation_sendMessage,
+                                           operation_sendCloseFromClient,
+                                           operation_receiveInitialMetadata,
+                                           operation_receiveStatusOnClient,
+                                           operation_receiveMessage])
+    { (event) in
+      print("client nonstreaming call complete")
+      if (event.type == GRPC_OP_COMPLETE) {
+        let response = CallResponse(status:operation_receiveStatusOnClient.status(),
+                                    statusDetails:operation_receiveStatusOnClient.statusDetails(),
+                                    message:operation_receiveMessage.message(),
+                                    initialMetadata:operation_receiveInitialMetadata.metadata(),
+                                    trailingMetadata:operation_receiveStatusOnClient.metadata())
+        completion(response)
+      } else {
+        completion(CallResponse(completion: event.type))
+      }
+    }
+    let call_error = self.perform(call: self, operations: group)
+    print ("call error = \(call_error)")
+  }
+
+  public func perform(call: Call, operations: OperationGroup) -> grpc_call_error {
+    self.completionQueue.operationGroups[operations.tag] = operations
+    return call.performOperations(operations:operations,
+                                  tag:operations.tag,
+                                  completionQueue: self.completionQueue)
+  }
+
+  public func sendMessage(data: NSData) {
+    let messageBuffer = ByteBuffer(data:data)
+    let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
+    let operations = OperationGroup(call:self, operations:[operation_sendMessage])
+    { (event) in
+      print("client sendMessage complete with status \(event.type) \(event.tag)")
+    }
+    let call_error = self.perform(call:self, operations:operations)
+    if call_error != GRPC_CALL_OK {
+      print("call error \(call_error)")
+    }
+  }
+
+  public func receiveMessage(callback:((NSData!) -> Void)) -> Void {
+    let operation_receiveMessage = Operation_ReceiveMessage()
+    let operations = OperationGroup(call:self, operations:[operation_receiveMessage])
+    { (event) in
+      print("client receiveMessage complete with status \(event.type) \(event.tag)")
+      if let messageBuffer = operation_receiveMessage.message() {
+        callback(messageBuffer.data())
+      }
+    }
+    let call_error = self.perform(call:self, operations:operations)
+    if call_error != GRPC_CALL_OK {
+      print("call error \(call_error)")
+    }
+  }
+
+  // start a streaming connection
+  public func start(metadata: Metadata) {
+    self.sendInitialMetadata(metadata: metadata)
+    self.receiveInitialMetadata()
+    self.receiveStatus()
+  }
+
+  private func sendInitialMetadata(metadata: Metadata) {
+    let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
+    let operations = OperationGroup(call:self, operations:[operation_sendInitialMetadata])
+    { (event) in
+      print("client sendInitialMetadata complete with status \(event.type) \(event.tag)")
+      if (event.type == GRPC_OP_COMPLETE) {
+        print("call status \(event.type) \(event.tag)")
+      } else {
+        return
+      }
+    }
+    let call_error = self.perform(call:self, operations:operations)
+    if call_error != GRPC_CALL_OK {
+      print("call error: \(call_error)")
+    }
+  }
+
+  private func receiveInitialMetadata() {
+    let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
+    let operations = OperationGroup(call:self, operations:[operation_receiveInitialMetadata])
+    { (event) in
+      print("client receiveInitialMetadata complete with status \(event.type) \(event.tag)")
+      let initialMetadata = operation_receiveInitialMetadata.metadata()
+      for j in 0..<initialMetadata.count() {
+        print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
+      }
+    }
+    let call_error = self.perform(call:self, operations:operations)
+    if call_error != GRPC_CALL_OK {
+      print("call error \(call_error)")
+    }
+  }
+
+  private func receiveStatus() {
+    let operation_receiveStatus = Operation_ReceiveStatusOnClient()
+    let operations = OperationGroup(call:self,
+                                    operations:[operation_receiveStatus])
+    { (event) in
+      print("client receiveStatus complete with status \(event.type) \(event.tag)")
+      print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
+    }
+    let call_error = self.perform(call:self, operations:operations)
+    if call_error != GRPC_CALL_OK {
+      print("call error \(call_error)")
+    }
+  }
+
+  public func close(completion:(() -> Void)) {
+    let operation_sendCloseFromClient = Operation_SendCloseFromClient()
+    let operations = OperationGroup(call:self, operations:[operation_sendCloseFromClient])
+    { (event) in
+      print("client sendClose complete with status \(event.type) \(event.tag)")
+      completion()
+    }
+    let call_error = self.perform(call:self, operations:operations)
+    if call_error != GRPC_CALL_OK {
+      print("call error \(call_error)")
+    }
+  }
 }

+ 1 - 54
Packages/gRPC/Sources/Client.swift

@@ -50,7 +50,7 @@ public class Client {
     c = cgrpc_client_create(address)
     completionQueue = CompletionQueue(cq:cgrpc_client_completion_queue(c))
     completionQueue.name = "Client" // only for debugging
-    self.completionQueue.run() {} // start a loop that watches the queue
+    self.completionQueue.run() {} // start a loop that watches the client's completion queue
   }
 
   deinit {
@@ -67,57 +67,4 @@ public class Client {
     let call = cgrpc_client_create_call(c, method, host, timeout)!
     return Call(call:call, owned:true, completionQueue:self.completionQueue)
   }
-
-  /// Performs a nonstreaming gRPC API call
-  ///
-  /// - Parameter host: the gRPC host name for the call
-  /// - Parameter method: the gRPC method name for the call
-  /// - Parameter message: a ByteBuffer containing the message to send
-  /// - Parameter metadata: metadata to send with the call
-  /// - Returns: a CallResponse object containing results of the call
-  public func performRequest(host: String,
-                             method: String,
-                             message: ByteBuffer,
-                             metadata: Metadata,
-                             completion: ((CallResponse) -> Void)) -> Call   {
-    let call = createCall(host:host, method:method, timeout:600.0)
-
-    let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
-    let operation_sendMessage = Operation_SendMessage(message:message)
-    let operation_sendCloseFromClient = Operation_SendCloseFromClient()
-    let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
-    let operation_receiveStatusOnClient = Operation_ReceiveStatusOnClient()
-    let operation_receiveMessage = Operation_ReceiveMessage()
-
-    let group = OperationGroup(call:call,
-                               operations:[operation_sendInitialMetadata,
-                                           operation_sendMessage,
-                                           operation_sendCloseFromClient,
-                                           operation_receiveInitialMetadata,
-                                           operation_receiveStatusOnClient,
-                                           operation_receiveMessage])
-    { (event) in
-      print("client nonstreaming call complete")
-      if (event.type == GRPC_OP_COMPLETE) {
-        let response = CallResponse(status:operation_receiveStatusOnClient.status(),
-                                    statusDetails:operation_receiveStatusOnClient.statusDetails(),
-                                    message:operation_receiveMessage.message(),
-                                    initialMetadata:operation_receiveInitialMetadata.metadata(),
-                                    trailingMetadata:operation_receiveStatusOnClient.metadata())
-        completion(response)
-      } else {
-        completion(CallResponse(completion: event.type))
-      }
-    }
-    let call_error = self.perform(call: call, operations: group)
-    print ("call error = \(call_error)")
-    return call
-  }
-
-  public func perform(call: Call, operations: OperationGroup) -> grpc_call_error {
-    self.completionQueue.operationGroups[operations.tag] = operations
-    return call.performOperations(operations:operations,
-                                  tag:operations.tag,
-                                  completionQueue: self.completionQueue)
-  }
 }

+ 0 - 1
Packages/gRPC/Sources/Handler.swift

@@ -243,6 +243,5 @@ public class Handler {
     _ = call.performOperations(operations:operations, tag:operations.tag, completionQueue: self.completionQueue)
   }
 
-  
 
 }