Browse Source

clean up error handling in Call and related classes

Tim Burks 9 years ago
parent
commit
7e77bdb12d

+ 10 - 10
Examples/Echo/Swift/Echo/EchoServer.swift

@@ -57,14 +57,14 @@ class EchoServer {
     print("Server Starting")
     print("GRPC version " + gRPC.version())
 
-     server.run {(requestHandler) in
+    server.run {(requestHandler) in
       print("Received request to " + requestHandler.host
         + " calling " + requestHandler.method
         + " from " + requestHandler.caller)
 
       // NONSTREAMING
       if (requestHandler.method == "/echo.Echo/Get") {
-        requestHandler.receiveMessage(initialMetadata:Metadata())
+        _ = requestHandler.receiveMessage(initialMetadata:Metadata())
         {(requestData) in
           if let requestData = requestData,
             let requestMessage =
@@ -72,8 +72,8 @@ class EchoServer {
             requestMessage.forOneField("text") {(field) in
               let replyMessage = fileDescriptorSet.makeMessage("EchoResponse")!
               replyMessage.addField("text", value:"Swift nonstreaming echo " + field.string())
-              requestHandler.sendResponse(message:replyMessage.data(),
-                                          trailingMetadata:Metadata())
+              _ = requestHandler.sendResponse(message:replyMessage.data(),
+                                              trailingMetadata:Metadata())
             }
           }
         }
@@ -81,7 +81,7 @@ class EchoServer {
 
       // STREAMING
       if (requestHandler.method == "/echo.Echo/Update") {
-        requestHandler.sendMetadata(
+        _ = requestHandler.sendMetadata(
           initialMetadata: Metadata(),
           completion: {
 
@@ -90,8 +90,8 @@ class EchoServer {
               requestHandler: requestHandler)
 
             // we seem to never get this, but I'm told it's what we're supposed to do
-            requestHandler.receiveClose() {
-              requestHandler.sendStatus(trailingMetadata: Metadata(), completion: {
+            _ = requestHandler.receiveClose() {
+              _ = requestHandler.sendStatus(trailingMetadata: Metadata(), completion: {
                 print("status sent")
                 requestHandler.shutdown()
               })
@@ -104,21 +104,21 @@ class EchoServer {
 
   func handleMessage(fileDescriptorSet: FileDescriptorSet,
                      requestHandler: Handler) {
-    requestHandler.receiveMessage()
+    _ = requestHandler.receiveMessage()
       {(requestData) in
         if let requestData = requestData,
           let requestMessage = fileDescriptorSet.readMessage("EchoRequest", data:requestData) {
           requestMessage.forOneField("text") {(field) in
             let replyMessage = fileDescriptorSet.makeMessage("EchoResponse")!
             replyMessage.addField("text", value:"Swift streaming echo " + field.string())
-            requestHandler.sendResponse(message:replyMessage.data()) {
+            _ = requestHandler.sendResponse(message:replyMessage.data()) {
               // after we've sent our response, prepare to handle another message
               self.handleMessage(fileDescriptorSet:fileDescriptorSet, requestHandler:requestHandler)
             }
           }
         } else {
           // if we get an empty message (nil buffer), we close the connection
-          requestHandler.sendStatus(trailingMetadata: Metadata(), completion: {
+          _ = requestHandler.sendStatus(trailingMetadata: Metadata(), completion: {
             print("status sent")
             requestHandler.shutdown()
           })

+ 33 - 24
Packages/gRPC/Sources/Call.swift

@@ -239,18 +239,19 @@ public class Call {
     let messageBuffer = ByteBuffer(data:data)
     let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
     let operations = OperationGroup(call:self, operations:[operation_sendMessage])
-    { (event) in
-
-      // TODO: if the event failed, shut down
-
-      DispatchQueue.main.async {
-        if self.pendingMessages.count > 0 {
-          let nextMessage = self.pendingMessages.first!
-          self.pendingMessages.removeFirst()
-          _ = self.sendWithoutBlocking(data: nextMessage)
-        } else {
-          self.writing = false
+    {(success) in
+      if success {
+        DispatchQueue.main.async {
+          if self.pendingMessages.count > 0 {
+            let nextMessage = self.pendingMessages.first!
+            self.pendingMessages.removeFirst()
+            _ = self.sendWithoutBlocking(data: nextMessage)
+          } else {
+            self.writing = false
+          }
         }
+      } else {
+        // TODO: if the event failed, shut down
       }
     }
     return self.perform(operations)
@@ -261,9 +262,11 @@ public class Call {
   public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> CallError {
     let operation_receiveMessage = Operation_ReceiveMessage()
     let operations = OperationGroup(call:self, operations:[operation_receiveMessage])
-    { (event) in
-      if let messageBuffer = operation_receiveMessage.message() {
-        callback(messageBuffer.data())
+    {(success) in
+      if success {
+        if let messageBuffer = operation_receiveMessage.message() {
+          callback(messageBuffer.data())
+        }
       }
     }
     return self.perform(operations)
@@ -273,8 +276,8 @@ public class Call {
   private func sendInitialMetadata(metadata: Metadata) -> CallError {
     let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
     let operations = OperationGroup(call:self, operations:[operation_sendInitialMetadata])
-    { (success) in
-      if (success) {
+    {(success) in
+      if success {
         print("call successful")
       } else {
         return
@@ -287,10 +290,12 @@ public class Call {
   private func receiveInitialMetadata() -> CallError {
     let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
     let operations = OperationGroup(call:self, operations:[operation_receiveInitialMetadata])
-    { (event) in
-      let initialMetadata = operation_receiveInitialMetadata.metadata()
-      for j in 0..<initialMetadata.count() {
-        print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
+    {(success) in
+      if success {
+        let initialMetadata = operation_receiveInitialMetadata.metadata()
+        for j in 0..<initialMetadata.count() {
+          print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
+        }
       }
     }
     return self.perform(operations)
@@ -301,8 +306,10 @@ public class Call {
     let operation_receiveStatus = Operation_ReceiveStatusOnClient()
     let operations = OperationGroup(call:self,
                                     operations:[operation_receiveStatus])
-    { (event) in
-      print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
+    {(success) in
+      if success {
+        print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
+      }
     }
     return self.perform(operations)
   }
@@ -311,8 +318,10 @@ public class Call {
   public func close(completion:@escaping (() -> Void)) -> CallError {
     let operation_sendCloseFromClient = Operation_SendCloseFromClient()
     let operations = OperationGroup(call:self, operations:[operation_sendCloseFromClient])
-    { (event) in
-      completion()
+    {(success) in
+      if success {
+        completion()
+      }
     }
     return self.perform(operations)
   }

+ 2 - 2
Packages/gRPC/Sources/Client.swift

@@ -51,7 +51,7 @@ public class Client {
     underlyingClient = cgrpc_client_create(address)
     completionQueue = CompletionQueue(underlyingCompletionQueue:cgrpc_client_completion_queue(underlyingClient))
     completionQueue.name = "Client" // only for debugging
-    self.completionQueue.run() {} // start a loop that watches the client's completion queue
+    self.completionQueue.run() // start a loop that watches the client's completion queue
   }
 
   /// Initializes a gRPC client
@@ -69,7 +69,7 @@ public class Client {
     }
     completionQueue = CompletionQueue(underlyingCompletionQueue:cgrpc_client_completion_queue(underlyingClient))
     completionQueue.name = "Client" // only for debugging
-    self.completionQueue.run() {} // start a loop that watches the client's completion queue
+    self.completionQueue.run() // start a loop that watches the client's completion queue
   }
 
   deinit {

+ 17 - 16
Packages/gRPC/Sources/CompletionQueue.swift

@@ -71,7 +71,7 @@ internal struct CompletionQueueEvent {
 internal class CompletionQueue {
 
   /// Optional user-provided name for the queue
-  internal var name : String!
+  internal var name : String?
 
   /// Pointer to underlying C representation
   private var underlyingCompletionQueue : UnsafeMutableRawPointer
@@ -83,11 +83,11 @@ internal class CompletionQueue {
   ///
   /// - Parameter cq: the underlying C representation
   init(underlyingCompletionQueue: UnsafeMutableRawPointer) {
-    // NOT OWNED, so we don't dealloc it in a deinit
+    // The underlying completion queue NOT OWNED by this class, so we don't dealloc it in a deinit
     self.underlyingCompletionQueue = underlyingCompletionQueue
   }
 
-  /// Waits for an event to complete
+  /// Waits for an operation group to complete
   ///
   /// - Parameter timeout: a timeout value in seconds
   /// - Returns: a grpc_completion_type code indicating the result of waiting
@@ -96,8 +96,10 @@ internal class CompletionQueue {
     return CompletionQueueEvent(event)
   }
 
-  /// Run a completion queue
-  internal func run(completion:@escaping () -> Void) {
+  /// Runs a completion queue and call a completion handler when finished
+  ///
+  /// - Parameter: a completion handler that is called when the queue stops running
+  internal func runToCompletion(_ completion:@escaping () -> Void) {
     DispatchQueue.global().async {
       var running = true
       while (running) {
@@ -105,15 +107,9 @@ internal class CompletionQueue {
         switch (event.type) {
         case GRPC_OP_COMPLETE:
           let tag = cgrpc_event_tag(event)
-          if let operations = self.operationGroups[tag] {
-
-            print("[\(self.name)] event success=\(event.success)")
-            if event.success == 0 {
-              print("something bad happened")
-            } else {
-              // call the operation group completion handler
-              operations.completion(event.success == 1)
-            }
+          if let operationGroup = self.operationGroups[tag] {
+            // call the operation group completion handler
+            operationGroup.completion(event.success == 1)
             self.operationGroups[tag] = nil
           }
           break
@@ -127,13 +123,18 @@ internal class CompletionQueue {
         }
       }
       DispatchQueue.main.async {
-        // call the queue completion handler
+        // when the queue stops running, call the queue completion handler
         completion()
       }
     }
   }
 
-  /// Shutdown a completion queue
+  /// Runs a completion queue
+  internal func run() -> Void {
+    self.runToCompletion() {}
+  }
+
+  /// Shuts down a completion queue
   internal func shutdown() -> Void {
     cgrpc_completion_queue_shutdown(underlyingCompletionQueue)
   }

+ 32 - 30
Packages/gRPC/Sources/Handler.swift

@@ -100,7 +100,7 @@ public class Handler {
   ///
   /// - Returns: a tuple containing status codes and a message (if available)
   public func receiveMessage(initialMetadata: Metadata,
-                             completion:@escaping ((Data?) -> Void)) -> Void {
+                             completion:@escaping ((Data?) -> Void)) -> CallError {
     let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:initialMetadata);
     let operation_receiveMessage = Operation_ReceiveMessage()
     let operations = OperationGroup(
@@ -115,7 +115,7 @@ public class Handler {
         completion(nil)
       }
     }
-    _ = call.perform(operations)
+    return call.perform(operations)
   }
 
   /// Sends the response to a request
@@ -123,7 +123,7 @@ public class Handler {
   /// - Parameter message: the message to send
   /// - Returns: a tuple containing status codes
   public func sendResponse(message: Data,
-                           trailingMetadata: Metadata) -> Void {
+                           trailingMetadata: Metadata) -> CallError {
     let operation_receiveCloseOnServer = Operation_ReceiveCloseOnServer();
     let operation_sendStatusFromServer = Operation_SendStatusFromServer(status:0,
                                                                         statusDetails:"OK",
@@ -136,10 +136,12 @@ public class Handler {
         operation_receiveCloseOnServer,
         operation_sendStatusFromServer,
         operation_sendMessage])
-    {(call_error) in
-      self.shutdown()
+    {(success) in
+      if success {
+        self.shutdown()
+      }
     }
-    _ = call.perform(operations)
+    return call.perform(operations)
   }
 
   /// shutdown the handler's completion queue
@@ -149,27 +151,27 @@ public class Handler {
 
   /// Send initial metadata in response to a connection
   public func sendMetadata(initialMetadata: Metadata,
-                           completion:@escaping (() -> Void)) {
+                           completion:@escaping (() -> Void)) -> CallError {
     let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:initialMetadata);
     let operations = OperationGroup(call:call, operations:[operation_sendInitialMetadata])
     {(success) in
-      if (success) {
+      if success {
         completion()
       } else {
         completion()
       }
     }
-    _ = call.perform(operations)
+    return call.perform(operations)
   }
 
   /// Receive the message sent with a call
   ///
   /// - Returns: a tuple containing status codes and a message (if available)
-  public func receiveMessage(completion:(@escaping (Data?) -> Void)) -> Void {
+  public func receiveMessage(completion:(@escaping (Data?) -> Void)) -> CallError {
     let operation_receiveMessage = Operation_ReceiveMessage()
     let operations = OperationGroup(call:call, operations:[operation_receiveMessage])
     {(success) in
-      if (success) {
+      if success {
         print("server receiveMessage complete")
         if let message = operation_receiveMessage.message() {
           completion(message.data())
@@ -180,9 +182,7 @@ public class Handler {
         completion(nil)
       }
     }
-
-    let call_error = call.perform(operations)
-    print("perform receiveMessage \(call_error)")
+    return call.perform(operations)
   }
 
   /// Sends the response to a request
@@ -190,39 +190,41 @@ public class Handler {
   /// - Parameter message: the message to send
   /// - Returns: a tuple containing status codes
   public func sendResponse(message: Data,
-                           completion: @escaping () -> Void) -> Void {
+                           completion: @escaping () -> Void) -> CallError {
     let operation_sendMessage = Operation_SendMessage(message:ByteBuffer(data:message))
     let operations = OperationGroup(call:call, operations:[operation_sendMessage])
-    {(event) in
-      print("server sendResponse complete")
-      completion()
+    {(success) in
+      if success {
+        completion()
+      }
     }
-    _ = call.perform(operations)
+    return call.perform(operations)
   }
 
   /// Recognize when the client has closed a request
-  public func receiveClose(completion: @escaping () -> Void) -> Void {
+  public func receiveClose(completion: @escaping () -> Void) -> CallError {
     let operation_receiveClose = Operation_ReceiveCloseOnServer()
     let operations = OperationGroup(call:call, operations:[operation_receiveClose])
-    {(event) in
-      print("server receiveClose complete")
-      completion()
+    {(success) in
+      if success {
+        completion()
+      }
     }
-    let call_error = call.perform(operations)
-    print("perform receiveClose \(call_error)")
+    return call.perform(operations)
   }
 
   /// Send final status to the client
   public func sendStatus(trailingMetadata: Metadata,
-                         completion:@escaping (() -> Void)) -> Void {
+                         completion:@escaping (() -> Void)) -> CallError {
     let operation_sendStatusFromServer = Operation_SendStatusFromServer(status:0,
                                                                         statusDetails:"OK",
                                                                         metadata:trailingMetadata)
     let operations = OperationGroup(call:call, operations:[operation_sendStatusFromServer])
-    {(event) in
-      print("server sendStatus complete")
-      completion()
+    {(success) in
+      if success {
+        completion()
+      }
     }
-    _ = call.perform(operations)
+    return call.perform(operations)
   }
 }

+ 3 - 4
Packages/gRPC/Sources/Server.swift

@@ -83,15 +83,14 @@ public class Server {
           // not good, let's break
           break
         }
-        // blocks
+        // block while waiting for an incoming request
         let event = self.completionQueue.wait(timeout:600)
         if (event.type == .complete) {
           if event.tag == 101 {
             // run the handler and remove it when it finishes
             if event.success != 0 {
               self.handlers.add(handler)
-              handler.completionQueue.run() {
-                // on completion
+              handler.completionQueue.runToCompletion() {
                 self.handlers.remove(handler)
               }
               handlerFunction(handler)
@@ -130,7 +129,7 @@ public class Server {
     } else {
       let event = self.completionQueue.wait(timeout:timeout)
       if (event.type == .complete) {
-        handler.completionQueue.run() {
+        handler.completionQueue.runToCompletion() {
           self.handlers.remove(handler)
         }
         self.handlers.add(handler)