Browse Source

simple message queueing

Tim Burks 9 years ago
parent
commit
6e4eec8436

+ 4 - 4
Examples/Speech/google/longrunning/operations.proto

@@ -41,14 +41,14 @@ service Operations {
   // method to poll the operation result at intervals as recommended by the API
   // service.
   rpc GetOperation(GetOperationRequest) returns (Operation) {
-    option (google.api.http) = { get: "/v1/{name=operations/**}" };
+    //option (google.api.http) = { get: "/v1/{name=operations/**}" };
   }
 
   // Lists operations that match the specified filter in the request. If the
   // server doesn't support this method, it returns
   // `google.rpc.Code.UNIMPLEMENTED`.
   rpc ListOperations(ListOperationsRequest) returns (ListOperationsResponse) {
-    option (google.api.http) = { get: "/v1/{name=operations}" };
+    //option (google.api.http) = { get: "/v1/{name=operations}" };
   }
 
   // Starts asynchronous cancellation on a long-running operation.  The server
@@ -58,13 +58,13 @@ service Operations {
   // [Operations.GetOperation] or other methods to check whether the
   // cancellation succeeded or the operation completed despite cancellation.
   rpc CancelOperation(CancelOperationRequest) returns (google.protobuf.Empty) {
-    option (google.api.http) = { post: "/v1/{name=operations/**}:cancel" body: "*" };
+    //option (google.api.http) = { post: "/v1/{name=operations/**}:cancel" body: "*" };
   }
 
   // Deletes a long-running operation.  It indicates the client is no longer
   // interested in the operation result. It does not cancel the operation.
   rpc DeleteOperation(DeleteOperationRequest) returns (google.protobuf.Empty) {
-    option (google.api.http) = { delete: "/v1/{name=operations/**}" };
+    //option (google.api.http) = { delete: "/v1/{name=operations/**}" };
   }
 }
 

+ 5 - 0
Packages/gRPC/Sources/ByteBuffer.swift

@@ -41,6 +41,9 @@ class ByteBuffer {
   /// Pointer to underlying C representation
   var b: UnsafeMutableRawPointer!
 
+  /// The provider of data in the buffer (if needed) to ensure that it is retained.
+  private var source : Any?
+
   /// Initializes a ByteBuffer
   ///
   /// - Parameter b: the underlying C representation
@@ -53,6 +56,7 @@ class ByteBuffer {
   /// - Parameter string: a string to store in the buffer
   init(string: String) {
     self.b = cgrpc_byte_buffer_create_with_string(string)
+    self.source = string
   }
 
   /// Initializes a ByteBuffer
@@ -62,6 +66,7 @@ class ByteBuffer {
     data.withUnsafeBytes { (bytes) in
       self.b = cgrpc_byte_buffer_create_with_data(bytes, data.count)
     }
+    self.source = data
   }
 
   deinit {

+ 29 - 3
Packages/gRPC/Sources/Call.swift

@@ -56,6 +56,9 @@ public class Call {
   /// True if this instance is responsible for deleting the underlying C representation
   private var owned : Bool
 
+  private var pendingMessages : Array<Data>
+  private var writing : Bool
+
   /// Initializes a Call representation
   ///
   /// - Parameter call: the underlying C representation
@@ -64,6 +67,8 @@ public class Call {
     self.call = call
     self.owned = owned
     self.completionQueue = completionQueue
+    self.pendingMessages = []
+    self.writing = false
   }
 
   deinit {
@@ -78,8 +83,8 @@ public class Call {
   /// - Parameter tag: integer tag that will be attached to these operations
   /// - Returns: the result of initiating the call
   func performOperations(operations: OperationGroup,
-                                tag: Int64,
-                                completionQueue: CompletionQueue)
+                         tag: Int64,
+                         completionQueue: CompletionQueue)
     -> grpc_call_error {
       let mutex = CallLock.sharedInstance.mutex
       mutex.lock()
@@ -149,11 +154,31 @@ public class Call {
 
   // send a message over a streaming connection
   public func sendMessage(data: Data) {
+    DispatchQueue.main.async {
+      if self.writing {
+        self.pendingMessages.append(data)
+      } else {
+        self.writing = true
+        self.sendWithoutBlocking(data: data)
+      }
+    }
+  }
+
+  private func sendWithoutBlocking(data: Data) {
     let messageBuffer = ByteBuffer(data:data)
     let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
     let operations = OperationGroup(call:self, operations:[operation_sendMessage])
     { (event) in
-      print("client sendMessage complete with status \(event.type) \(event.tag)")
+      DispatchQueue.main.async {
+        print("client sendMessage complete with status \(event.type) \(event.tag)")
+        if self.pendingMessages.count > 0 {
+          let nextMessage = self.pendingMessages.first!
+          self.pendingMessages.removeFirst()
+          self.sendWithoutBlocking(data: nextMessage)
+        } else {
+          self.writing = false
+        }
+      }
     }
     let call_error = self.perform(call:self, operations:operations)
     if call_error != GRPC_CALL_OK {
@@ -161,6 +186,7 @@ public class Call {
     }
   }
 
+
   // receive a message over a streaming connection
   public func receiveMessage(callback:((Data!) -> Void)) -> Void {
     let operation_receiveMessage = Operation_ReceiveMessage()