Browse Source

Make most completion blocks optional, so that clients don't need to pass them.

I've also changed several completion blocks to accept a boolean argument whether the corresponding operation was successful, instead of just never calling the completion block in case of errors.
In general, I think we should soon take a hard look at how errors are handled currently and ensure that no errors can lead to unexpected behavior (e.g. some blocks never getting called etc.), both in the client and server use-cases.
Daniel Alm 8 years ago
parent
commit
d76b9e9f7e

+ 18 - 22
Examples/Echo/Generated/echo.grpc.swift

@@ -66,7 +66,7 @@ internal final class Echo_EchoGetCall {
   /// - Throws: `BinaryEncodingError` if encoding fails. `CallError` if fails to call.
   fileprivate func start(request: Echo_EchoRequest,
                          metadata: Metadata,
-                         completion: @escaping (Echo_EchoResponse?, CallResult)->())
+                         completion: @escaping ((Echo_EchoResponse?, CallResult)->()))
     throws -> Echo_EchoGetCall {
 
       let requestData = try request.serializedData()
@@ -102,7 +102,7 @@ internal final class Echo_EchoExpandCall {
   /// Call this once with the message to send. Nonblocking.
   fileprivate func start(request: Echo_EchoRequest,
                          metadata: Metadata,
-                         completion: @escaping (CallResult) -> ())
+                         completion: ((CallResult) -> ())?)
     throws -> Echo_EchoExpandCall {
       let requestData = try request.serializedData()
       try call.start(.serverStreaming,
@@ -164,7 +164,7 @@ internal final class Echo_EchoCollectCall {
   }
 
   /// Call this to start a call. Nonblocking.
-  fileprivate func start(metadata:Metadata, completion:@escaping (CallResult)->())
+  fileprivate func start(metadata:Metadata, completion: ((CallResult)->())?)
     throws -> Echo_EchoCollectCall {
       try self.call.start(.clientStreaming, metadata:metadata, completion:completion)
       return self
@@ -231,7 +231,7 @@ internal final class Echo_EchoUpdateCall {
   }
 
   /// Call this to start a call. Nonblocking.
-  fileprivate func start(metadata:Metadata, completion:@escaping (CallResult)->())
+  fileprivate func start(metadata:Metadata, completion: ((CallResult)->())?)
     throws -> Echo_EchoUpdateCall {
       try self.call.start(.bidiStreaming, metadata:metadata, completion:completion)
       return self
@@ -289,10 +289,8 @@ internal final class Echo_EchoUpdateCall {
   }
 
   /// Call this to close the sending connection. Nonblocking.
-  internal func closeSend(completion:@escaping ()->()) throws {
-    try call.close() {
-      completion()
-    }
+  internal func closeSend(completion: (()->())?) throws {
+	try call.close(completion: completion)
   }
 
   /// Cancel the call.
@@ -352,7 +350,7 @@ internal final class Echo_EchoService {
   }
   /// Asynchronous. Unary.
   internal func get(_ request: Echo_EchoRequest,
-                  completion: @escaping (Echo_EchoResponse?, CallResult)->())
+                  completion: @escaping ((Echo_EchoResponse?, CallResult)->()))
     throws
     -> Echo_EchoGetCall {
       return try Echo_EchoGetCall(channel).start(request:request,
@@ -362,7 +360,7 @@ internal final class Echo_EchoService {
   /// Asynchronous. Server-streaming.
   /// Send the initial message.
   /// Use methods on the returned object to get streamed responses.
-  internal func expand(_ request: Echo_EchoRequest, completion: @escaping (CallResult)->())
+  internal func expand(_ request: Echo_EchoRequest, completion: ((CallResult)->())?)
     throws
     -> Echo_EchoExpandCall {
       return try Echo_EchoExpandCall(channel).start(request:request, metadata:metadata, completion:completion)
@@ -370,7 +368,7 @@ internal final class Echo_EchoService {
   /// Asynchronous. Client-streaming.
   /// Use methods on the returned object to stream messages and
   /// to close the connection and wait for a final response.
-  internal func collect(completion: @escaping (CallResult)->())
+  internal func collect(completion: ((CallResult)->())?)
     throws
     -> Echo_EchoCollectCall {
       return try Echo_EchoCollectCall(channel).start(metadata:metadata, completion:completion)
@@ -378,7 +376,7 @@ internal final class Echo_EchoService {
   /// Asynchronous. Bidirectional-streaming.
   /// Use methods on the returned object to stream messages,
   /// to wait for replies, and to close the connection.
-  internal func update(completion: @escaping (CallResult)->())
+  internal func update(completion: ((CallResult)->())?)
     throws
     -> Echo_EchoUpdateCall {
       return try Echo_EchoUpdateCall(channel).start(metadata:metadata, completion:completion)
@@ -450,8 +448,8 @@ internal final class Echo_EchoExpandSession : Echo_EchoSession {
   }
 
   /// Send a message. Nonblocking.
-  internal func send(_ response: Echo_EchoResponse, completion: @escaping ()->()) throws {
-    try handler.sendResponse(message:response.serializedData()) {completion()}
+  internal func send(_ response: Echo_EchoResponse, completion: ((Bool)->())?) throws {
+	try handler.sendResponse(message:response.serializedData(), completion: completion)
   }
 
   /// Run the session. Internal.
@@ -468,7 +466,7 @@ internal final class Echo_EchoExpandSession : Echo_EchoSession {
               try self.handler.sendStatus(statusCode:self.statusCode,
                                           statusMessage:self.statusMessage,
                                           trailingMetadata:self.trailingMetadata,
-                                          completion:{})
+                                          completion:nil)
             } catch (let error) {
               print("error: \(error)")
             }
@@ -518,7 +516,7 @@ internal final class Echo_EchoCollectSession : Echo_EchoSession {
 
   /// Run the session. Internal.
   fileprivate func run(queue:DispatchQueue) throws {
-    try self.handler.sendMetadata(initialMetadata:initialMetadata) {
+    try self.handler.sendMetadata(initialMetadata:initialMetadata) { _ in
       queue.async {
         do {
           try self.provider.collect(session:self)
@@ -563,8 +561,8 @@ internal final class Echo_EchoUpdateSession : Echo_EchoSession {
   }
 
   /// Send a message. Nonblocking.
-  internal func send(_ response: Echo_EchoResponse, completion: @escaping ()->()) throws {
-    try handler.sendResponse(message:response.serializedData()) {completion()}
+  internal func send(_ response: Echo_EchoResponse, completion: ((Bool)->())?) throws {
+	try handler.sendResponse(message:response.serializedData(), completion: completion)
   }
 
   /// Close a connection. Blocks until the connection is closed.
@@ -572,15 +570,13 @@ internal final class Echo_EchoUpdateSession : Echo_EchoSession {
     let sem = DispatchSemaphore(value: 0)
     try self.handler.sendStatus(statusCode:self.statusCode,
                                 statusMessage:self.statusMessage,
-                                trailingMetadata:self.trailingMetadata) {
-                                  sem.signal()
-    }
+                                trailingMetadata:self.trailingMetadata) { _ in sem.signal() }
     _ = sem.wait(timeout: DispatchTime.distantFuture)
   }
 
   /// Run the session. Internal.
   fileprivate func run(queue:DispatchQueue) throws {
-    try self.handler.sendMetadata(initialMetadata:initialMetadata) {
+    try self.handler.sendMetadata(initialMetadata:initialMetadata) { _ in
       queue.async {
         do {
           try self.provider.update(session:self)

+ 3 - 5
Plugin/Templates/client-call-bidistreaming.swift

@@ -8,7 +8,7 @@
   }
 
   /// Call this to start a call. Nonblocking.
-  fileprivate func start(metadata:Metadata, completion:@escaping (CallResult)->())
+  fileprivate func start(metadata:Metadata, completion: ((CallResult)->())?)
     throws -> {{ .|call:file,service,method }} {
       try self.call.start(.bidiStreaming, metadata:metadata, completion:completion)
       return self
@@ -66,10 +66,8 @@
   }
 
   /// Call this to close the sending connection. Nonblocking.
-  {{ access }} func closeSend(completion:@escaping ()->()) throws {
-    try call.close() {
-      completion()
-    }
+  {{ access }} func closeSend(completion: (()->())?) throws {
+	try call.close(completion: completion)
   }
 
   /// Cancel the call.

+ 1 - 1
Plugin/Templates/client-call-clientstreaming.swift

@@ -8,7 +8,7 @@
   }
 
   /// Call this to start a call. Nonblocking.
-  fileprivate func start(metadata:Metadata, completion:@escaping (CallResult)->())
+  fileprivate func start(metadata:Metadata, completion: ((CallResult)->())?)
     throws -> {{ .|call:file,service,method }} {
       try self.call.start(.clientStreaming, metadata:metadata, completion:completion)
       return self

+ 1 - 1
Plugin/Templates/client-call-serverstreaming.swift

@@ -10,7 +10,7 @@
   /// Call this once with the message to send. Nonblocking.
   fileprivate func start(request: {{ method|input }},
                          metadata: Metadata,
-                         completion: @escaping (CallResult) -> ())
+                         completion: ((CallResult) -> ())?)
     throws -> {{ .|call:file,service,method }} {
       let requestData = try request.serializedData()
       try call.start(.serverStreaming,

+ 1 - 1
Plugin/Templates/client-call-unary.swift

@@ -31,7 +31,7 @@
   /// - Throws: `BinaryEncodingError` if encoding fails. `CallError` if fails to call.
   fileprivate func start(request: {{ method|input }},
                          metadata: Metadata,
-                         completion: @escaping ({{ method|output }}?, CallResult)->())
+                         completion: @escaping (({{ method|output }}?, CallResult)->()))
     throws -> {{ .|call:file,service,method }} {
 
       let requestData = try request.serializedData()

+ 3 - 3
Plugin/Templates/client.swift

@@ -86,7 +86,7 @@
   /// Asynchronous. Server-streaming.
   /// Send the initial message.
   /// Use methods on the returned object to get streamed responses.
-  {{ access }} func {{ method|methodDescriptorName|lowercase }}(_ request: {{ method|input }}, completion: @escaping (CallResult)->())
+  {{ access }} func {{ method|methodDescriptorName|lowercase }}(_ request: {{ method|input }}, completion: ((CallResult)->())?)
     throws
     -> {{ .|call:file,service,method }} {
       return try {{ .|call:file,service,method }}(channel).start(request:request, metadata:metadata, completion:completion)
@@ -96,7 +96,7 @@
   /// Asynchronous. Client-streaming.
   /// Use methods on the returned object to stream messages and
   /// to close the connection and wait for a final response.
-  {{ access }} func {{ method|methodDescriptorName|lowercase }}(completion: @escaping (CallResult)->())
+  {{ access }} func {{ method|methodDescriptorName|lowercase }}(completion: ((CallResult)->())?)
     throws
     -> {{ .|call:file,service,method }} {
       return try {{ .|call:file,service,method }}(channel).start(metadata:metadata, completion:completion)
@@ -106,7 +106,7 @@
   /// Asynchronous. Bidirectional-streaming.
   /// Use methods on the returned object to stream messages,
   /// to wait for replies, and to close the connection.
-  {{ access }} func {{ method|methodDescriptorName|lowercase }}(completion: @escaping (CallResult)->())
+  {{ access }} func {{ method|methodDescriptorName|lowercase }}(completion: ((CallResult)->())?)
     throws
     -> {{ .|call:file,service,method }} {
       return try {{ .|call:file,service,method }}(channel).start(metadata:metadata, completion:completion)

+ 4 - 6
Plugin/Templates/server-session-bidistreaming.swift

@@ -31,8 +31,8 @@
   }
 
   /// Send a message. Nonblocking.
-  {{ access }} func send(_ response: {{ method|output }}, completion: @escaping ()->()) throws {
-    try handler.sendResponse(message:response.serializedData()) {completion()}
+  {{ access }} func send(_ response: {{ method|output }}, completion: ((Bool)->())?) throws {
+	try handler.sendResponse(message:response.serializedData(), completion: completion)
   }
 
   /// Close a connection. Blocks until the connection is closed.
@@ -40,15 +40,13 @@
     let sem = DispatchSemaphore(value: 0)
     try self.handler.sendStatus(statusCode:self.statusCode,
                                 statusMessage:self.statusMessage,
-                                trailingMetadata:self.trailingMetadata) {
-                                  sem.signal()
-    }
+                                trailingMetadata:self.trailingMetadata) { _ in sem.signal() }
     _ = sem.wait(timeout: DispatchTime.distantFuture)
   }
 
   /// Run the session. Internal.
   fileprivate func run(queue:DispatchQueue) throws {
-    try self.handler.sendMetadata(initialMetadata:initialMetadata) {
+    try self.handler.sendMetadata(initialMetadata:initialMetadata) { _ in
       queue.async {
         do {
           try self.provider.{{ method|methodDescriptorName|lowercase }}(session:self)

+ 1 - 1
Plugin/Templates/server-session-clientstreaming.swift

@@ -35,7 +35,7 @@
 
   /// Run the session. Internal.
   fileprivate func run(queue:DispatchQueue) throws {
-    try self.handler.sendMetadata(initialMetadata:initialMetadata) {
+    try self.handler.sendMetadata(initialMetadata:initialMetadata) { _ in
       queue.async {
         do {
           try self.provider.{{ method|methodDescriptorName|lowercase }}(session:self)

+ 3 - 3
Plugin/Templates/server-session-serverstreaming.swift

@@ -9,8 +9,8 @@
   }
 
   /// Send a message. Nonblocking.
-  {{ access }} func send(_ response: {{ method|output }}, completion: @escaping ()->()) throws {
-    try handler.sendResponse(message:response.serializedData()) {completion()}
+  {{ access }} func send(_ response: {{ method|output }}, completion: ((Bool)->())?) throws {
+	try handler.sendResponse(message:response.serializedData(), completion: completion)
   }
 
   /// Run the session. Internal.
@@ -27,7 +27,7 @@
               try self.handler.sendStatus(statusCode:self.statusCode,
                                           statusMessage:self.statusMessage,
                                           trailingMetadata:self.trailingMetadata,
-                                          completion:{})
+                                          completion:nil)
             } catch (let error) {
               print("error: \(error)")
             }

+ 10 - 6
Sources/gRPC/Call.swift

@@ -210,7 +210,7 @@ public class Call {
   public func start(_ style: CallStyle,
                     metadata: Metadata,
                     message: Data? = nil,
-                    completion: @escaping (CallResult) -> Void) throws {
+                    completion: ((CallResult) -> Void)? = nil) throws {
     var operations: [Operation] = []
     switch style {
     case .unary:
@@ -243,7 +243,9 @@ public class Call {
     }
     try perform(OperationGroup(call: self,
                                operations: operations,
-                               completion: { op in completion(CallResult(op)) }))
+                               completion: completion != nil
+                                ? { op in completion?(CallResult(op)) }
+                                : nil))
   }
 
   /// Sends a message over a streaming connection.
@@ -296,7 +298,7 @@ public class Call {
 
   // Receive a message over a streaming connection.
   /// - Throws: `CallError` if fails to call.
-  public func receiveMessage(callback: @escaping ((Data!) throws -> Void)) throws {
+  public func receiveMessage(callback: @escaping ((Data?) throws -> Void)) throws {
     try perform(OperationGroup(call: self, operations: [.receiveMessage]) { operationGroup in
       if operationGroup.success {
         if let messageBuffer = operationGroup.receivedMessage() {
@@ -310,9 +312,11 @@ public class Call {
 
   // Closes a streaming connection.
   /// - Throws: `CallError` if fails to call.
-  public func close(completion: @escaping (() -> Void)) throws {
-    try perform(OperationGroup(call: self, operations: [.sendCloseFromClient]) { _ in completion()
-    })
+  public func close(completion: (() -> Void)? = nil) throws {
+    try perform(OperationGroup(call: self, operations: [.sendCloseFromClient],
+                               completion: completion != nil
+                                ? { op in completion?() }
+                                : nil))
   }
 
   // Get the current message queue length

+ 6 - 6
Sources/gRPC/CompletionQueue.swift

@@ -97,8 +97,8 @@ class CompletionQueue {
   ///
   /// - Parameter callbackQueue: a DispatchQueue to use to call the completion handler
   /// - Parameter completion: a completion handler that is called when the queue stops running
-  func runToCompletion(callbackQueue: DispatchQueue? = DispatchQueue.main,
-                                _ completion: @escaping () -> Void) {
+  func runToCompletion(callbackQueue: DispatchQueue = DispatchQueue.main,
+                       completion: (() -> Void)?) {
     // run the completion queue on a new background thread
     DispatchQueue.global().async {
       var running = true
@@ -114,7 +114,7 @@ class CompletionQueue {
             // call the operation group completion handler
             do {
               operationGroup.success = (event.success == 1)
-              try operationGroup.completion(operationGroup)
+              try operationGroup.completion?(operationGroup)
             } catch (let callError) {
               print("CompletionQueue runToCompletion: grpc error \(callError)")
             }
@@ -128,7 +128,7 @@ class CompletionQueue {
           do {
             for operationGroup in self.operationGroups.values {
               operationGroup.success = false
-              try operationGroup.completion(operationGroup)
+              try operationGroup.completion?(operationGroup)
             }
           } catch (let callError) {
             print("CompletionQueue runToCompletion: grpc error \(callError)")
@@ -143,7 +143,7 @@ class CompletionQueue {
           break
         }
       }
-      if let callbackQueue = callbackQueue {
+      if let completion = completion {
         callbackQueue.async {
           // when the queue stops running, call the queue completion handler
           completion()
@@ -154,7 +154,7 @@ class CompletionQueue {
 
   /// Runs a completion queue
   func run() {
-    runToCompletion(callbackQueue: nil) {}
+    runToCompletion(completion: nil)
   }
 
   /// Shuts down a completion queue

+ 24 - 34
Sources/gRPC/Handler.swift

@@ -94,7 +94,7 @@ public class Handler {
   /// Receive the message sent with a call
   ///
   public func receiveMessage(initialMetadata: Metadata,
-                             completion: @escaping ((Data?) throws -> Void)) throws {
+                             completion: @escaping (Data?) throws -> Void) throws {
     let operations = OperationGroup(call: call,
                                     operations: [
                                       .sendInitialMetadata(initialMetadata),
@@ -126,6 +126,7 @@ public class Handler {
                                       .sendStatusFromServer(statusCode, statusMessage, trailingMetadata),
                                       .sendMessage(messageBuffer)
     ]) { operationGroup in
+      // TODO(timburks): Should we also shut down do in case of error?
       if operationGroup.success {
         self.shutdown()
       }
@@ -146,6 +147,7 @@ public class Handler {
                                       .receiveCloseOnServer,
                                       .sendStatusFromServer(statusCode, statusMessage, trailingMetadata)
     ]) { operationGroup in
+      // TODO(timburks): Should we also shut down do in case of error?
       if operationGroup.success {
         self.shutdown()
       }
@@ -163,61 +165,51 @@ public class Handler {
   /// - Parameter initialMetadata: initial metadata to send
   /// - Parameter completion: a completion handler to call after the metadata has been sent
   public func sendMetadata(initialMetadata: Metadata,
-                           completion: @escaping (() throws -> Void)) throws {
+                           completion: ((Bool) throws -> Void)? = nil) throws {
     let operations = OperationGroup(call: call,
-                                    operations: [.sendInitialMetadata(initialMetadata)]) { operationGroup in
-      if operationGroup.success {
-        try completion()
-      } else {
-        try completion()
-      }
-    }
+                                    operations: [.sendInitialMetadata(initialMetadata)],
+                                    completion: completion != nil
+                                      ? { operationGroup in try completion?(operationGroup.success) }
+                                      : nil)
     try call.perform(operations)
   }
-
+  
   /// Receive the message sent with a call
   ///
   /// - Parameter completion: a completion handler to call after the message has been received
   /// - Returns: a tuple containing status codes and a message (if available)
-  public func receiveMessage(completion: (@escaping (Data?) throws -> Void)) throws {
+  public func receiveMessage(completion: @escaping (Data?) throws -> Void) throws {
     let operations = OperationGroup(call: call, operations: [.receiveMessage]) { operationGroup in
       if operationGroup.success {
-        if let message = operationGroup.receivedMessage() {
-          try completion(message.data())
-        } else {
-          try completion(nil)
-        }
+        try completion(operationGroup.receivedMessage()?.data())
       } else {
         try completion(nil)
       }
     }
     try call.perform(operations)
   }
-
+  
   /// Sends the response to a request
   ///
   /// - Parameter message: the message to send
   /// - Parameter completion: a completion handler to call after the response has been sent
   public func sendResponse(message: Data,
-                           completion: @escaping () throws -> Void) throws {
+                           completion: ((Bool) throws -> Void)? = nil) throws {
     let operations = OperationGroup(call: call,
-                                    operations: [.sendMessage(ByteBuffer(data: message))]) { operationGroup in
-      if operationGroup.success {
-        try completion()
-      }
-    }
+                                    operations: [.sendMessage(ByteBuffer(data: message))],
+                                    completion: completion != nil
+                                      ? { operationGroup in try completion?(operationGroup.success) }
+                                      : nil)
     try call.perform(operations)
   }
-
+  
   /// Recognize when the client has closed a request
   ///
   /// - Parameter completion: a completion handler to call after request has been closed
-  public func receiveClose(completion: @escaping () throws -> Void) throws {
+  public func receiveClose(completion: @escaping (Bool) throws -> Void) throws {
     let operations = OperationGroup(call: call,
                                     operations: [.receiveCloseOnServer]) { operationGroup in
-      if operationGroup.success {
-        try completion()
-      }
+                                      try completion(operationGroup.success)
     }
     try call.perform(operations)
   }
@@ -231,17 +223,15 @@ public class Handler {
   public func sendStatus(statusCode: StatusCode,
                          statusMessage: String,
                          trailingMetadata: Metadata,
-                         completion: @escaping (() -> Void)) throws {
+                         completion: ((Bool) -> Void)? = nil) throws {
     let operations = OperationGroup(call: call,
                                     operations: [
                                       .sendStatusFromServer(statusCode,
                                                             statusMessage,
                                                             trailingMetadata)
-    ]) { operationGroup in
-      if operationGroup.success {
-        completion()
-      }
-    }
+    ], completion: completion != nil
+      ? { operationGroup in completion?(operationGroup.success) }
+      : nil)
     try call.perform(operations)
   }
 }

+ 2 - 2
Sources/gRPC/OperationGroup.swift

@@ -41,7 +41,7 @@ class OperationGroup {
   var underlyingOperations: UnsafeMutableRawPointer?
 
   /// Completion handler that is called when the group completes
-  var completion: ((OperationGroup) throws -> Void)
+  let completion: ((OperationGroup) throws -> Void)?
 
   /// Indicates that the OperationGroup completed successfully
   var success: Bool = false
@@ -81,7 +81,7 @@ class OperationGroup {
   /// - Parameter operations: an array of operations
   init(call: Call,
        operations: [Operation],
-       completion: @escaping ((OperationGroup) throws -> Void)) {
+       completion: ((OperationGroup) throws -> Void)? = nil) {
     self.call = call
     self.operations = operations
     self.completion = completion

+ 2 - 8
Sources/gRPC/Server.swift

@@ -34,7 +34,7 @@ public class Server {
   private var handlersMutex: Mutex = Mutex()
 
   /// Optional callback when server stops serving
-  private var onCompletion: (() -> Void)?
+  public var onCompletion: (() -> Void)?
 
   /// Initializes a Server
   ///
@@ -108,17 +108,11 @@ public class Server {
           running = false
         }
       }
-      if let onCompletion = self.onCompletion {
-        onCompletion()
-      }
+      self.onCompletion?()
     }
   }
 
   public func stop() {
     cgrpc_server_stop(underlyingServer)
   }
-
-  public func onCompletion(completion: @escaping (() -> Void)) {
-    onCompletion = completion
-  }
 }