Browse Source

Ensure that errors in client calls are always provided to the user.

The expected behavior is now for the client call's completion handler to be called when the whole call is completed. You can check that completion handler's `statusCode` for the call's result. The intermediate send/receive call completion handlers will also get called with an error, but that error's status code is most likely to be `.unknown` (because send/receive calls don't receive the final status from the server).
In addition, `CallResult` now also includes the operation group's success flag, but that flag is much less important now.

This includes:
- Make most completion handlers non-throwing (their errors were for the most part discarded, anyway)
- Extract Receive-Streaming into a common protocol
- Changing many completion handler interfaces to be called with a `CallResult` instead of `Data?` and `ResultOrRPCError<ReceivedType?>` instead of `(ReceivedType, Error)`
- Not throwing on end of stream, but passing `nil` to the completion handler instead
- Making `ServerSessionUnary` always return an error when one of its calls fail
- Adding test to ensure the proper error behavior.
Daniel Alm 7 years ago
parent
commit
ac7979d5e0
28 changed files with 596 additions and 443 deletions
  1. 6 11
      .gitignore
  2. 4 6
      Sources/Examples/Echo/EchoProvider.swift
  3. 13 9
      Sources/Examples/Echo/Generated/echo.grpc.swift
  4. 9 18
      Sources/Examples/Echo/main.swift
  5. 16 126
      Sources/SwiftGRPC/Core/Call.swift
  6. 76 0
      Sources/SwiftGRPC/Core/CallError.swift
  7. 63 0
      Sources/SwiftGRPC/Core/CallResult.swift
  8. 5 13
      Sources/SwiftGRPC/Core/CompletionQueue.swift
  9. 18 10
      Sources/SwiftGRPC/Core/Handler.swift
  10. 3 3
      Sources/SwiftGRPC/Core/OperationGroup.swift
  11. 9 49
      Sources/SwiftGRPC/Runtime/ClientCallBidirectionalStreaming.swift
  12. 19 27
      Sources/SwiftGRPC/Runtime/ClientCallClientStreaming.swift
  13. 9 49
      Sources/SwiftGRPC/Runtime/ClientCallServerStreaming.swift
  14. 2 2
      Sources/SwiftGRPC/Runtime/ClientCallUnary.swift
  15. 55 0
      Sources/SwiftGRPC/Runtime/RPCError.swift
  16. 0 22
      Sources/SwiftGRPC/Runtime/ServerError.swift
  17. 14 0
      Sources/SwiftGRPC/Runtime/ServerSession.swift
  18. 11 29
      Sources/SwiftGRPC/Runtime/ServerSessionBidirectionalStreaming.swift
  19. 11 24
      Sources/SwiftGRPC/Runtime/ServerSessionClientStreaming.swift
  20. 21 2
      Sources/SwiftGRPC/Runtime/ServerSessionUnary.swift
  21. 3 5
      Sources/SwiftGRPC/Runtime/ServiceServer.swift
  22. 59 0
      Sources/SwiftGRPC/Runtime/StreamReceiving.swift
  23. 3 10
      Sources/protoc-gen-swiftgrpc/Generator-Client.swift
  24. 9 6
      Sources/protoc-gen-swiftgrpc/Generator-Methods.swift
  25. 2 4
      Sources/protoc-gen-swiftgrpc/Generator-Server.swift
  26. 20 12
      Tests/SwiftGRPCTests/EchoTests.swift
  27. 126 0
      Tests/SwiftGRPCTests/ErrorHandlingTests.swift
  28. 10 6
      Tests/SwiftGRPCTests/GRPCTests.swift

+ 6 - 11
.gitignore

@@ -2,16 +2,11 @@
 project.xcworkspace
 xcuserdata
 .build
-protoc-gen-swift
-protoc-gen-swiftgrpc
+/protoc-gen-swift
+/protoc-gen-swiftgrpc
 third_party/**
-Plugin/Packages/**
-Plugin/Sources/protoc-gen-swiftgrpc/templates.swift
-Plugin/protoc-*
-Plugin/swiftgrpc.log
-Plugin/echo.*.swift
-Echo
-test.out
-echo.pid
-SwiftGRPC.xcodeproj
+/Echo
+/test.out
+/echo.pid
+/SwiftGRPC.xcodeproj
 Package.resolved

+ 4 - 6
Sources/Examples/Echo/EchoProvider.swift

@@ -45,10 +45,9 @@ class EchoProvider: Echo_EchoProvider {
     var parts: [String] = []
     while true {
       do {
-        let request = try session.receive()
+        guard let request = try session.receive()
+          else { break }  // End of stream
         parts.append(request.text)
-      } catch ServerError.endOfStream {
-        break
       } catch (let error) {
         print("\(error)")
       }
@@ -63,7 +62,8 @@ class EchoProvider: Echo_EchoProvider {
     var count = 0
     while true {
       do {
-        let request = try session.receive()
+        guard let request = try session.receive()
+          else { break }  // End of stream
         var response = Echo_EchoResponse()
         response.text = "Swift echo update (\(count)): \(request.text)"
         count += 1
@@ -72,8 +72,6 @@ class EchoProvider: Echo_EchoProvider {
             print("update error: \(error)")
           }
         }
-      } catch ServerError.endOfStream {
-        break
       } catch (let error) {
         print("\(error)")
         break

+ 13 - 9
Sources/Examples/Echo/Generated/echo.grpc.swift

@@ -33,9 +33,9 @@ fileprivate final class Echo_EchoGetCallBase: ClientCallUnaryBase<Echo_EchoReque
 
 internal protocol Echo_EchoExpandCall: ClientCallServerStreaming {
   /// Call this to wait for a result. Blocking.
-  func receive() throws -> Echo_EchoResponse
+  func receive() throws -> Echo_EchoResponse?
   /// Call this to wait for a result. Nonblocking.
-  func receive(completion: @escaping (Echo_EchoResponse?, ClientError?) -> Void) throws
+  func receive(completion: @escaping (ResultOrRPCError<Echo_EchoResponse?>) -> Void) throws
 }
 
 fileprivate final class Echo_EchoExpandCallBase: ClientCallServerStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoExpandCall {
@@ -53,7 +53,7 @@ internal protocol Echo_EchoCollectCall: ClientCallClientStreaming {
   /// Call this to close the connection and wait for a response. Blocking.
   func closeAndReceive() throws -> Echo_EchoResponse
   /// Call this to close the connection and wait for a response. Nonblocking.
-  func closeAndReceive(completion: @escaping (Echo_EchoResponse?, ClientError?) -> Void) throws
+  func closeAndReceive(completion: @escaping (ResultOrRPCError<Echo_EchoResponse>) -> Void) throws
 }
 
 fileprivate final class Echo_EchoCollectCallBase: ClientCallClientStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoCollectCall {
@@ -68,9 +68,9 @@ class Echo_EchoCollectCallTestStub: ClientCallClientStreamingTestStub<Echo_EchoR
 
 internal protocol Echo_EchoUpdateCall: ClientCallBidirectionalStreaming {
   /// Call this to wait for a result. Blocking.
-  func receive() throws -> Echo_EchoResponse
+  func receive() throws -> Echo_EchoResponse?
   /// Call this to wait for a result. Nonblocking.
-  func receive(completion: @escaping (Echo_EchoResponse?, ClientError?) -> Void) throws
+  func receive(completion: @escaping (ResultOrRPCError<Echo_EchoResponse?>) -> Void) throws
 
   /// Call this to send each message in the request stream.
   func send(_ message: Echo_EchoRequest, completion: @escaping (Error?) -> Void) throws
@@ -210,8 +210,10 @@ fileprivate final class Echo_EchoExpandSessionBase: ServerSessionServerStreaming
 class Echo_EchoExpandSessionTestStub: ServerSessionServerStreamingTestStub<Echo_EchoResponse>, Echo_EchoExpandSession {}
 
 internal protocol Echo_EchoCollectSession: ServerSessionClientStreaming {
-  /// Receive a message. Blocks until a message is received or the client closes the connection.
-  func receive() throws -> Echo_EchoRequest
+  /// Call this to wait for a result. Blocking.
+  func receive() throws -> Echo_EchoRequest?
+  /// Call this to wait for a result. Nonblocking.
+  func receive(completion: @escaping (ResultOrRPCError<Echo_EchoRequest?>) -> Void) throws
 
   /// Send a response and close the connection.
   func sendAndClose(_ response: Echo_EchoResponse) throws
@@ -222,8 +224,10 @@ fileprivate final class Echo_EchoCollectSessionBase: ServerSessionClientStreamin
 class Echo_EchoCollectSessionTestStub: ServerSessionClientStreamingTestStub<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoCollectSession {}
 
 internal protocol Echo_EchoUpdateSession: ServerSessionBidirectionalStreaming {
-  /// Receive a message. Blocks until a message is received or the client closes the connection.
-  func receive() throws -> Echo_EchoRequest
+  /// Call this to wait for a result. Blocking.
+  func receive() throws -> Echo_EchoRequest?
+  /// Call this to wait for a result. Nonblocking.
+  func receive(completion: @escaping (ResultOrRPCError<Echo_EchoRequest?>) -> Void) throws
 
   /// Send a message. Nonblocking.
   func send(_ response: Echo_EchoResponse, completion: ((Error?) -> Void)?) throws

+ 9 - 18
Sources/Examples/Echo/main.swift

@@ -109,16 +109,13 @@ Group {
       callResult = result
       sem.signal()
     }
-    var running = true
-    while running {
-      do {
-        let responseMessage = try expandCall.receive()
-        print("expand received: \(responseMessage.text)")
-      } catch ClientError.endOfStream {
-        running = false
-      }
+    while true {
+      guard let responseMessage = try expandCall.receive()
+        else { break }  // End of stream
+      print("expand received: \(responseMessage.text)")
     }
     _ = sem.wait()
+
     if let statusCode = callResult?.statusCode {
       print("expand completed with code \(statusCode)")
     }
@@ -150,6 +147,7 @@ Group {
     let responseMessage = try collectCall.closeAndReceive()
     print("collect received: \(responseMessage.text)")
     _ = sem.wait()
+
     if let statusCode = callResult?.statusCode {
       print("collect completed with code \(statusCode)")
     }
@@ -182,17 +180,10 @@ Group {
     try updateCall.closeSend()
 
     while true {
-      do {
-        let responseMessage = try updateCall.receive()
-        print("update received: \(responseMessage.text)")
-      } catch ClientError.endOfStream {
-        break
-      } catch (let error) {
-        print("update receive error: \(error)")
-        break
-      }
+      guard let responseMessage = try updateCall.receive()
+        else { break }  // End of stream
+      print("update received: \(responseMessage.text)")
     }
-
     _ = sem.wait()
 
     if let statusCode = callResult?.statusCode {

+ 16 - 126
Sources/SwiftGRPC/Core/Call.swift

@@ -30,113 +30,6 @@ public enum CallWarning: Error {
   case blocked
 }
 
-public enum CallError: Error {
-  case ok
-  case unknown
-  case notOnServer
-  case notOnClient
-  case alreadyAccepted
-  case alreadyInvoked
-  case notInvoked
-  case alreadyFinished
-  case tooManyOperations
-  case invalidFlags
-  case invalidMetadata
-  case invalidMessage
-  case notServerCompletionQueue
-  case batchTooBig
-  case payloadTypeMismatch
-
-  static func callError(grpcCallError error: grpc_call_error) -> CallError {
-    switch error {
-    case GRPC_CALL_OK:
-      return .ok
-    case GRPC_CALL_ERROR:
-      return .unknown
-    case GRPC_CALL_ERROR_NOT_ON_SERVER:
-      return .notOnServer
-    case GRPC_CALL_ERROR_NOT_ON_CLIENT:
-      return .notOnClient
-    case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
-      return .alreadyAccepted
-    case GRPC_CALL_ERROR_ALREADY_INVOKED:
-      return .alreadyInvoked
-    case GRPC_CALL_ERROR_NOT_INVOKED:
-      return .notInvoked
-    case GRPC_CALL_ERROR_ALREADY_FINISHED:
-      return .alreadyFinished
-    case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
-      return .tooManyOperations
-    case GRPC_CALL_ERROR_INVALID_FLAGS:
-      return .invalidFlags
-    case GRPC_CALL_ERROR_INVALID_METADATA:
-      return .invalidMetadata
-    case GRPC_CALL_ERROR_INVALID_MESSAGE:
-      return .invalidMessage
-    case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
-      return .notServerCompletionQueue
-    case GRPC_CALL_ERROR_BATCH_TOO_BIG:
-      return .batchTooBig
-    case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
-      return .payloadTypeMismatch
-    default:
-      return .unknown
-    }
-  }
-}
-
-public struct CallResult: CustomStringConvertible {
-  public let statusCode: StatusCode
-  public let statusMessage: String?
-  public let resultData: Data?
-  public let initialMetadata: Metadata?
-  public let trailingMetadata: Metadata?
-
-  fileprivate init(_ op: OperationGroup) {
-    if op.success {
-      if let statusCodeRawValue = op.receivedStatusCode() {
-        if let statusCode = StatusCode(rawValue: statusCodeRawValue) {
-          self.statusCode = statusCode
-        } else {
-          statusCode = .unknown
-        }
-      } else {
-        statusCode = .ok
-      }
-      statusMessage = op.receivedStatusMessage()
-      resultData = op.receivedMessage()?.data()
-      initialMetadata = op.receivedInitialMetadata()
-      trailingMetadata = op.receivedTrailingMetadata()
-    } else {
-      statusCode = .unknown
-      statusMessage = nil
-      resultData = nil
-      initialMetadata = nil
-      trailingMetadata = nil
-    }
-  }
-
-  public var description: String {
-    var result = "status \(statusCode)"
-    if let statusMessage = self.statusMessage {
-      result += ": " + statusMessage
-    }
-    if let resultData = self.resultData {
-      result += "\n"
-      result += resultData.description
-    }
-    if let initialMetadata = self.initialMetadata {
-      result += "\n"
-      result += initialMetadata.description
-    }
-    if let trailingMetadata = self.trailingMetadata {
-      result += "\n"
-      result += trailingMetadata.description
-    }
-    return result
-  }
-}
-
 /// A gRPC API call
 public class Call {
   /// Shared mutex for synchronizing calls to cgrpc_call_perform()
@@ -240,11 +133,18 @@ public class Call {
         .receiveStatusOnClient,
       ]
     case .clientStreaming, .bidiStreaming:
-      operations = [
-        .sendInitialMetadata(metadata.copy()),
-        .receiveInitialMetadata,
-        .receiveStatusOnClient,
-      ]
+      try perform(OperationGroup(call: self,
+                                 operations: [
+                                  .sendInitialMetadata(metadata.copy()),
+                                  .receiveInitialMetadata
+                                  ],
+                                 completion: nil))
+      try perform(OperationGroup(call: self,
+                                 operations: [.receiveStatusOnClient],
+                                 completion: completion != nil
+                                  ? { op in completion?(CallResult(op)) }
+                                  : nil))
+      return
     }
     try perform(OperationGroup(call: self,
                                operations: operations,
@@ -304,27 +204,17 @@ public class Call {
 
   // Receive a message over a streaming connection.
   /// - Throws: `CallError` if fails to call.
-  public func closeAndReceiveMessage(completion: @escaping (Data?) throws -> Void) throws {
+  public func closeAndReceiveMessage(completion: @escaping (CallResult) -> Void) throws {
     try perform(OperationGroup(call: self, operations: [.sendCloseFromClient, .receiveMessage]) { operationGroup in
-      if operationGroup.success {
-        if let messageBuffer = operationGroup.receivedMessage() {
-          try completion(messageBuffer.data())
-        } else {
-          try completion(nil) // an empty response signals the end of a connection
-        }
-      }
+      completion(CallResult(operationGroup))
     })
   }
 
   // Receive a message over a streaming connection.
   /// - Throws: `CallError` if fails to call.
-  public func receiveMessage(completion: @escaping (Data?) throws -> Void) throws {
+  public func receiveMessage(completion: @escaping (CallResult) -> Void) throws {
     try perform(OperationGroup(call: self, operations: [.receiveMessage]) { operationGroup in
-      if operationGroup.success {
-        try completion(operationGroup.receivedMessage()?.data())
-      } else {
-        try completion(nil)
-      }
+      completion(CallResult(operationGroup))
     })
   }
 

+ 76 - 0
Sources/SwiftGRPC/Core/CallError.swift

@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if SWIFT_PACKAGE
+import CgRPC
+import Dispatch
+#endif
+import Foundation
+
+public enum CallError: Error {
+  case ok
+  case unknown
+  case notOnServer
+  case notOnClient
+  case alreadyAccepted
+  case alreadyInvoked
+  case notInvoked
+  case alreadyFinished
+  case tooManyOperations
+  case invalidFlags
+  case invalidMetadata
+  case invalidMessage
+  case notServerCompletionQueue
+  case batchTooBig
+  case payloadTypeMismatch
+  
+  static func callError(grpcCallError error: grpc_call_error) -> CallError {
+    switch error {
+    case GRPC_CALL_OK:
+      return .ok
+    case GRPC_CALL_ERROR:
+      return .unknown
+    case GRPC_CALL_ERROR_NOT_ON_SERVER:
+      return .notOnServer
+    case GRPC_CALL_ERROR_NOT_ON_CLIENT:
+      return .notOnClient
+    case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
+      return .alreadyAccepted
+    case GRPC_CALL_ERROR_ALREADY_INVOKED:
+      return .alreadyInvoked
+    case GRPC_CALL_ERROR_NOT_INVOKED:
+      return .notInvoked
+    case GRPC_CALL_ERROR_ALREADY_FINISHED:
+      return .alreadyFinished
+    case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
+      return .tooManyOperations
+    case GRPC_CALL_ERROR_INVALID_FLAGS:
+      return .invalidFlags
+    case GRPC_CALL_ERROR_INVALID_METADATA:
+      return .invalidMetadata
+    case GRPC_CALL_ERROR_INVALID_MESSAGE:
+      return .invalidMessage
+    case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
+      return .notServerCompletionQueue
+    case GRPC_CALL_ERROR_BATCH_TOO_BIG:
+      return .batchTooBig
+    case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
+      return .payloadTypeMismatch
+    default:
+      return .unknown
+    }
+  }
+}
+

+ 63 - 0
Sources/SwiftGRPC/Core/CallResult.swift

@@ -0,0 +1,63 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if SWIFT_PACKAGE
+import CgRPC
+import Dispatch
+#endif
+import Foundation
+
+public struct CallResult: CustomStringConvertible {
+  public let success: Bool
+  public let statusCode: StatusCode
+  public let statusMessage: String?
+  public let resultData: Data?
+  public let initialMetadata: Metadata?
+  public let trailingMetadata: Metadata?
+  
+  init(_ op: OperationGroup) {
+    success = op.success
+    if let statusCodeRawValue = op.receivedStatusCode(),
+      let statusCode = StatusCode(rawValue: statusCodeRawValue) {
+      self.statusCode = statusCode
+    } else {
+      statusCode = .unknown
+    }
+    statusMessage = op.receivedStatusMessage()
+    resultData = op.receivedMessage()?.data()
+    initialMetadata = op.receivedInitialMetadata()
+    trailingMetadata = op.receivedTrailingMetadata()
+  }
+  
+  public var description: String {
+    var result = "status \(statusCode)"
+    if let statusMessage = self.statusMessage {
+      result += ": " + statusMessage
+    }
+    if let resultData = self.resultData {
+      result += "\n"
+      result += resultData.description
+    }
+    if let initialMetadata = self.initialMetadata {
+      result += "\n"
+      result += initialMetadata.description
+    }
+    if let trailingMetadata = self.trailingMetadata {
+      result += "\n"
+      result += trailingMetadata.description
+    }
+    return result
+  }
+}

+ 5 - 13
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -113,12 +113,8 @@ class CompletionQueue {
           self.operationGroupsMutex.unlock()
           if let operationGroup = operationGroup {
             // call the operation group completion handler
-            do {
-              operationGroup.success = (event.success == 1)
-              try operationGroup.completion?(operationGroup)
-            } catch (let callError) {
-              print("CompletionQueue runToCompletion: grpc error \(callError)")
-            }
+            operationGroup.success = (event.success == 1)
+            operationGroup.completion?(operationGroup)
             self.operationGroupsMutex.synchronize {
               self.operationGroups[tag] = nil
             }
@@ -126,13 +122,9 @@ class CompletionQueue {
           break
         case GRPC_QUEUE_SHUTDOWN:
           running = false
-          do {
-            for operationGroup in self.operationGroups.values {
-              operationGroup.success = false
-              try operationGroup.completion?(operationGroup)
-            }
-          } catch (let callError) {
-            print("CompletionQueue runToCompletion: grpc error \(callError)")
+          for operationGroup in self.operationGroups.values {
+            operationGroup.success = false
+            operationGroup.completion?(operationGroup)
           }
           self.operationGroupsMutex.synchronize {
             self.operationGroups = [:]

+ 18 - 10
Sources/SwiftGRPC/Core/Handler.swift

@@ -30,7 +30,7 @@ public class Handler {
   public let requestMetadata: Metadata
 
   /// A Call object that can be used to respond to the request
-  private(set) lazy var call: Call = {
+  public lazy var call: Call = {
     Call(underlyingCall: cgrpc_handler_get_call(self.underlyingHandler),
          owned: false,
          completionQueue: self.completionQueue)
@@ -87,16 +87,16 @@ public class Handler {
   /// Receive the message sent with a call
   ///
   public func receiveMessage(initialMetadata: Metadata,
-                             completion: @escaping (Data?) throws -> Void) throws {
+                             completion: @escaping (Data?) -> Void) throws {
     let operations = OperationGroup(call: call,
                                     operations: [
                                       .sendInitialMetadata(initialMetadata),
                                       .receiveMessage
     ]) { operationGroup in
       if operationGroup.success {
-        try completion(operationGroup.receivedMessage()?.data())
+        completion(operationGroup.receivedMessage()?.data())
       } else {
-        try completion(nil)
+        completion(nil)
       }
     }
     try call.perform(operations)
@@ -152,11 +152,11 @@ public class Handler {
   /// - Parameter initialMetadata: initial metadata to send
   /// - Parameter completion: a completion handler to call after the metadata has been sent
   public func sendMetadata(initialMetadata: Metadata,
-                           completion: ((Bool) throws -> Void)? = nil) throws {
+                           completion: ((Bool) -> Void)? = nil) throws {
     let operations = OperationGroup(call: call,
                                     operations: [.sendInitialMetadata(initialMetadata)],
                                     completion: completion != nil
-                                      ? { operationGroup in try completion?(operationGroup.success) }
+                                      ? { operationGroup in completion?(operationGroup.success) }
                                       : nil)
     try call.perform(operations)
   }
@@ -165,7 +165,7 @@ public class Handler {
   ///
   /// - Parameter completion: a completion handler to call after the message has been received
   /// - Returns: a tuple containing status codes and a message (if available)
-  public func receiveMessage(completion: @escaping (Data?) throws -> Void) throws {
+  public func receiveMessage(completion: @escaping (CallResult) -> Void) throws {
     try call.receiveMessage(completion: completion)
   }
   
@@ -181,10 +181,10 @@ public class Handler {
   /// Recognize when the client has closed a request
   ///
   /// - Parameter completion: a completion handler to call after request has been closed
-  public func receiveClose(completion: @escaping (Bool) throws -> Void) throws {
+  public func receiveClose(completion: @escaping (Bool) -> Void) throws {
     let operations = OperationGroup(call: call,
                                     operations: [.receiveCloseOnServer]) { operationGroup in
-                                      try completion(operationGroup.success)
+                                      completion(operationGroup.success)
     }
     try call.perform(operations)
   }
@@ -197,7 +197,7 @@ public class Handler {
   /// - Parameter completion: a completion handler to call after the status has been sent
   public func sendStatus(statusCode: StatusCode,
                          statusMessage: String,
-                         trailingMetadata: Metadata,
+                         trailingMetadata: Metadata = Metadata(),
                          completion: ((Bool) -> Void)? = nil) throws {
     let operations = OperationGroup(call: call,
                                     operations: [
@@ -210,6 +210,14 @@ public class Handler {
     }
     try call.perform(operations)
   }
+  
+  public func sendError(_ error: ServerErrorStatus,
+                        completion: ((Bool) -> Void)? = nil) throws {
+    try sendStatus(statusCode: error.statusCode,
+                   statusMessage: error.statusMessage,
+                   trailingMetadata: error.trailingMetadata,
+                   completion: completion)
+  }
 }
 
 extension Handler: Hashable {

+ 3 - 3
Sources/SwiftGRPC/Core/OperationGroup.swift

@@ -32,7 +32,7 @@ class OperationGroup {
   private let call: Call
 
   /// An array of operation objects that are passed into the initializer.
-  private let operations: [Operation]
+  let operations: [Operation]
 
   /// An array of observers used to watch the operation
   private var underlyingObservers: [UnsafeMutableRawPointer] = []
@@ -41,7 +41,7 @@ class OperationGroup {
   let underlyingOperations: UnsafeMutableRawPointer?
 
   /// Completion handler that is called when the group completes
-  let completion: ((OperationGroup) throws -> Void)?
+  let completion: ((OperationGroup) -> Void)?
 
   /// Indicates that the OperationGroup completed successfully
   var success = false
@@ -81,7 +81,7 @@ class OperationGroup {
   /// - Parameter operations: an array of operations
   init(call: Call,
        operations: [Operation],
-       completion: ((OperationGroup) throws -> Void)? = nil) {
+       completion: ((OperationGroup) -> Void)? = nil) {
     self.call = call
     self.operations = operations
     self.completion = completion

+ 9 - 49
Sources/SwiftGRPC/Runtime/ClientCallBidirectionalStreaming.swift

@@ -25,7 +25,9 @@ public protocol ClientCallBidirectionalStreaming: ClientCall {
   // as the protocol would then have an associated type requirement (and become pretty much unusable in the process).
 }
 
-open class ClientCallBidirectionalStreamingBase<InputType: Message, OutputType: Message>: ClientCallBase, ClientCallBidirectionalStreaming {
+open class ClientCallBidirectionalStreamingBase<InputType: Message, OutputType: Message>: ClientCallBase, ClientCallBidirectionalStreaming, StreamReceiving {
+  public typealias ReceivedType = OutputType
+  
   /// Call this to start a call. Nonblocking.
   public func start(metadata: Metadata, completion: ((CallResult) -> Void)?)
     throws -> Self {
@@ -33,40 +35,6 @@ open class ClientCallBidirectionalStreamingBase<InputType: Message, OutputType:
     return self
   }
 
-  public func receive(completion: @escaping (OutputType?, ClientError?) -> Void) throws {
-    do {
-      try call.receiveMessage { data in
-        if let data = data {
-          if let returnMessage = try? OutputType(serializedData: data) {
-            completion(returnMessage, nil)
-          } else {
-            completion(nil, .invalidMessageReceived)
-          }
-        } else {
-          completion(nil, .endOfStream)
-        }
-      }
-    }
-  }
-
-  public func receive() throws -> OutputType {
-    var returnError: ClientError?
-    var returnMessage: OutputType!
-    let sem = DispatchSemaphore(value: 0)
-    do {
-      try receive { response, error in
-        returnMessage = response
-        returnError = error
-        sem.signal()
-      }
-      _ = sem.wait()
-    }
-    if let returnError = returnError {
-      throw returnError
-    }
-    return returnMessage
-  }
-
   public func send(_ message: InputType, completion: @escaping (Error?) -> Void) throws {
     let messageData = try message.serializedData()
     try call.sendMessage(data: messageData, completion: completion)
@@ -99,22 +67,14 @@ open class ClientCallBidirectionalStreamingTestStub<InputType: Message, OutputTy
   
   public init() {}
 
-  open func receive(completion: @escaping (OutputType?, ClientError?) -> Void) throws {
-    if let output = outputs.first {
-      outputs.removeFirst()
-      completion(output, nil)
-    } else {
-      completion(nil, .endOfStream)
-    }
+  open func receive(completion: @escaping (ResultOrRPCError<OutputType?>) -> Void) throws {
+    completion(.result(outputs.first))
+    outputs.removeFirst()
   }
 
-  open func receive() throws -> OutputType {
-    if let output = outputs.first {
-      outputs.removeFirst()
-      return output
-    } else {
-      throw ClientError.endOfStream
-    }
+  open func receive() throws -> OutputType? {
+    defer { outputs.removeFirst() }
+    return outputs.first
   }
 
   open func send(_ message: InputType, completion _: @escaping (Error?) -> Void) throws {

+ 19 - 27
Sources/SwiftGRPC/Runtime/ClientCallClientStreaming.swift

@@ -37,39 +37,31 @@ open class ClientCallClientStreamingBase<InputType: Message, OutputType: Message
     try call.sendMessage(data: messageData, completion: completion)
   }
 
-  public func closeAndReceive(completion: @escaping (OutputType?, ClientError?) -> Void) throws {
-    do {
-      try call.closeAndReceiveMessage { responseData in
-        if let responseData = responseData,
-          let response = try? OutputType(serializedData: responseData) {
-          completion(response, nil)
-        } else {
-          completion(nil, .invalidMessageReceived)
-        }
+  public func closeAndReceive(completion: @escaping (ResultOrRPCError<OutputType>) -> Void) throws {
+    try call.closeAndReceiveMessage { callResult in
+      guard let responseData = callResult.resultData else {
+        completion(.error(.callError(callResult))); return
+      }
+      if let response = try? OutputType(serializedData: responseData) {
+        completion(.result(response))
+      } else {
+        completion(.error(.invalidMessageReceived))
       }
-    } catch (let error) {
-      throw error
     }
   }
 
   public func closeAndReceive() throws -> OutputType {
-    var returnError: ClientError?
-    var returnResponse: OutputType!
+    var result: ResultOrRPCError<OutputType>?
     let sem = DispatchSemaphore(value: 0)
-    do {
-      try closeAndReceive { response, error in
-        returnResponse = response
-        returnError = error
-        sem.signal()
-      }
-      _ = sem.wait()
-    } catch (let error) {
-      throw error
+    try closeAndReceive {
+      result = $0
+      sem.signal()
     }
-    if let returnError = returnError {
-      throw returnError
+    _ = sem.wait()
+    switch result! {
+    case .result(let response): return response
+    case .error(let error): throw error
     }
-    return returnResponse
   }
 
   public func waitForSendOperationsToFinish() {
@@ -91,8 +83,8 @@ open class ClientCallClientStreamingTestStub<InputType: Message, OutputType: Mes
     inputs.append(message)
   }
 
-  open func closeAndReceive(completion: @escaping (OutputType?, ClientError?) -> Void) throws {
-    completion(output!, nil)
+  open func closeAndReceive(completion: @escaping (ResultOrRPCError<OutputType>) -> Void) throws {
+    completion(.result(output!))
   }
 
   open func closeAndReceive() throws -> OutputType {

+ 9 - 49
Sources/SwiftGRPC/Runtime/ClientCallServerStreaming.swift

@@ -23,7 +23,9 @@ public protocol ClientCallServerStreaming: ClientCall {
   // as the protocol would then have an associated type requirement (and become pretty much unusable in the process).
 }
 
-open class ClientCallServerStreamingBase<InputType: Message, OutputType: Message>: ClientCallBase, ClientCallServerStreaming {
+open class ClientCallServerStreamingBase<InputType: Message, OutputType: Message>: ClientCallBase, ClientCallServerStreaming, StreamReceiving {
+  public typealias ReceivedType = OutputType
+  
   /// Call this once with the message to send. Nonblocking.
   public func start(request: InputType, metadata: Metadata, completion: ((CallResult) -> Void)?) throws -> Self {
     let requestData = try request.serializedData()
@@ -33,40 +35,6 @@ open class ClientCallServerStreamingBase<InputType: Message, OutputType: Message
                    completion: completion)
     return self
   }
-
-  public func receive(completion: @escaping (OutputType?, ClientError?) -> Void) throws {
-    do {
-      try call.receiveMessage { responseData in
-        if let responseData = responseData {
-          if let response = try? OutputType(serializedData: responseData) {
-            completion(response, nil)
-          } else {
-            completion(nil, .invalidMessageReceived)
-          }
-        } else {
-          completion(nil, .endOfStream)
-        }
-      }
-    }
-  }
-
-  public func receive() throws -> OutputType {
-    var returnError: ClientError?
-    var returnResponse: OutputType!
-    let sem = DispatchSemaphore(value: 0)
-    do {
-      try receive { response, error in
-        returnResponse = response
-        returnError = error
-        sem.signal()
-      }
-      _ = sem.wait()
-    }
-    if let returnError = returnError {
-      throw returnError
-    }
-    return returnResponse
-  }
 }
 
 /// Simple fake implementation of ClientCallServerStreamingBase that returns a previously-defined set of results.
@@ -77,22 +45,14 @@ open class ClientCallServerStreamingTestStub<OutputType: Message>: ClientCallSer
   
   public init() {}
 
-  open func receive(completion: @escaping (OutputType?, ClientError?) -> Void) throws {
-    if let output = outputs.first {
-      outputs.removeFirst()
-      completion(output, nil)
-    } else {
-      completion(nil, .endOfStream)
-    }
+  open func receive(completion: @escaping (ResultOrRPCError<OutputType?>) -> Void) throws {
+    completion(.result(outputs.first))
+    outputs.removeFirst()
   }
 
-  open func receive() throws -> OutputType {
-    if let output = outputs.first {
-      outputs.removeFirst()
-      return output
-    } else {
-      throw ClientError.endOfStream
-    }
+  open func receive() throws -> OutputType? {
+    defer { outputs.removeFirst() }
+    return outputs.first
   }
 
   open func cancel() {}

+ 2 - 2
Sources/SwiftGRPC/Runtime/ClientCallUnary.swift

@@ -22,7 +22,7 @@ public protocol ClientCallUnary: ClientCall {}
 
 open class ClientCallUnaryBase<InputType: Message, OutputType: Message>: ClientCallBase, ClientCallUnary {
   /// Run the call. Blocks until the reply is received.
-  /// - Throws: `BinaryEncodingError` if encoding fails. `CallError` if fails to call. `ClientError` if receives no response.
+  /// - Throws: `BinaryEncodingError` if encoding fails. `CallError` if fails to call. `RPCError` if receives no response.
   public func run(request: InputType, metadata: Metadata) throws -> OutputType {
     let sem = DispatchSemaphore(value: 0)
     var returnCallResult: CallResult!
@@ -36,7 +36,7 @@ open class ClientCallUnaryBase<InputType: Message, OutputType: Message>: ClientC
     if let returnResponse = returnResponse {
       return returnResponse
     } else {
-      throw ClientError.error(c: returnCallResult)
+      throw RPCError.callError(returnCallResult)
     }
   }
 

+ 55 - 0
Sources/SwiftGRPC/Runtime/RPCError.swift

@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Foundation
+
+/// Type for errors thrown from generated client code.
+public enum RPCError: Error {
+  case invalidMessageReceived
+  case callError(CallResult)
+}
+
+public extension RPCError {
+  var callResult: CallResult? {
+    switch self {
+    case .invalidMessageReceived: return nil
+    case .callError(let callResult): return callResult
+    }
+  }
+}
+
+
+public enum ResultOrRPCError<ResultType> {
+  case result(ResultType)
+  case error(RPCError)
+}
+
+public extension ResultOrRPCError {
+  var result: ResultType? {
+    switch self {
+    case .result(let result): return result
+    case .error: return nil
+    }
+  }
+  
+  var error: RPCError? {
+    switch self {
+    case .result: return nil
+    case .error(let error): return error
+    }
+  }
+}
+

+ 0 - 22
Sources/SwiftGRPC/Runtime/ServerError.swift

@@ -1,22 +0,0 @@
-/*
- * Copyright 2018, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import Foundation
-
-/// Type for errors thrown from generated server code.
-public enum ServerError: Error {
-  case endOfStream
-}

+ 14 - 0
Sources/SwiftGRPC/Runtime/ServerSession.swift

@@ -18,6 +18,18 @@ import Dispatch
 import Foundation
 import SwiftProtobuf
 
+public struct ServerErrorStatus: Error {
+  public let statusCode: StatusCode
+  public let statusMessage: String
+  public let trailingMetadata: Metadata
+  
+  public init(statusCode: StatusCode, statusMessage: String, trailingMetadata: Metadata = Metadata()) {
+    self.statusCode = statusCode
+    self.statusMessage = statusMessage
+    self.trailingMetadata = trailingMetadata
+  }
+}
+
 public protocol ServerSession: class {
   var requestMetadata: Metadata { get }
 
@@ -35,6 +47,8 @@ open class ServerSessionBase: ServerSession {
   public var statusMessage: String = "OK"
   public var initialMetadata: Metadata = Metadata()
   public var trailingMetadata: Metadata = Metadata()
+  
+  public var call: Call { return handler.call }
 
   public init(handler: Handler) {
     self.handler = handler

+ 11 - 29
Sources/SwiftGRPC/Runtime/ServerSessionBidirectionalStreaming.swift

@@ -22,7 +22,9 @@ public protocol ServerSessionBidirectionalStreaming: ServerSession {
   func waitForSendOperationsToFinish()
 }
 
-open class ServerSessionBidirectionalStreamingBase<InputType: Message, OutputType: Message>: ServerSessionBase, ServerSessionBidirectionalStreaming {
+open class ServerSessionBidirectionalStreamingBase<InputType: Message, OutputType: Message>: ServerSessionBase, ServerSessionBidirectionalStreaming, StreamReceiving {
+  public typealias ReceivedType = InputType
+  
   public typealias ProviderBlock = (ServerSessionBidirectionalStreamingBase) throws -> Void
   private var providerBlock: ProviderBlock
 
@@ -31,27 +33,6 @@ open class ServerSessionBidirectionalStreamingBase<InputType: Message, OutputTyp
     super.init(handler: handler)
   }
 
-  public func receive() throws -> InputType {
-    let sem = DispatchSemaphore(value: 0)
-    var requestMessage: InputType?
-    try handler.receiveMessage { requestData in
-      if let requestData = requestData {
-        do {
-          requestMessage = try InputType(serializedData: requestData)
-        } catch (let error) {
-          print("error \(error)")
-        }
-      }
-      sem.signal()
-    }
-    _ = sem.wait()
-    if let requestMessage = requestMessage {
-      return requestMessage
-    } else {
-      throw ServerError.endOfStream
-    }
-  }
-
   public func send(_ response: OutputType, completion: ((Error?) -> Void)?) throws {
     try handler.sendResponse(message: response.serializedData(), completion: completion)
   }
@@ -87,13 +68,14 @@ open class ServerSessionBidirectionalStreamingTestStub<InputType: Message, Outpu
   open var inputs: [InputType] = []
   open var outputs: [OutputType] = []
 
-  open func receive() throws -> InputType {
-    if let input = inputs.first {
-      inputs.removeFirst()
-      return input
-    } else {
-      throw ServerError.endOfStream
-    }
+  open func receive() throws -> InputType? {
+    defer { inputs.removeFirst() }
+    return inputs.first
+  }
+  
+  open func receive(completion: @escaping (ResultOrRPCError<InputType?>) -> Void) throws {
+    completion(.result(inputs.first))
+    inputs.removeFirst()
   }
 
   open func send(_ response: OutputType, completion _: ((Error?) -> Void)?) throws {

+ 11 - 24
Sources/SwiftGRPC/Runtime/ServerSessionClientStreaming.swift

@@ -20,7 +20,9 @@ import SwiftProtobuf
 
 public protocol ServerSessionClientStreaming: ServerSession {}
 
-open class ServerSessionClientStreamingBase<InputType: Message, OutputType: Message>: ServerSessionBase, ServerSessionClientStreaming {
+open class ServerSessionClientStreamingBase<InputType: Message, OutputType: Message>: ServerSessionBase, ServerSessionClientStreaming, StreamReceiving {
+  public typealias ReceivedType = InputType
+  
   public typealias ProviderBlock = (ServerSessionClientStreamingBase) throws -> Void
   private var providerBlock: ProviderBlock
 
@@ -29,22 +31,6 @@ open class ServerSessionClientStreamingBase<InputType: Message, OutputType: Mess
     super.init(handler: handler)
   }
 
-  public func receive() throws -> InputType {
-    let sem = DispatchSemaphore(value: 0)
-    var requestMessage: InputType?
-    try handler.receiveMessage { requestData in
-      if let requestData = requestData {
-        requestMessage = try? InputType(serializedData: requestData)
-      }
-      sem.signal()
-    }
-    _ = sem.wait()
-    if requestMessage == nil {
-      throw ServerError.endOfStream
-    }
-    return requestMessage!
-  }
-
   public func sendAndClose(_ response: OutputType) throws {
     try handler.sendResponse(message: response.serializedData(),
                              statusCode: statusCode,
@@ -71,13 +57,14 @@ open class ServerSessionClientStreamingTestStub<InputType: Message, OutputType:
   open var inputs: [InputType] = []
   open var output: OutputType?
 
-  open func receive() throws -> InputType {
-    if let input = inputs.first {
-      inputs.removeFirst()
-      return input
-    } else {
-      throw ServerError.endOfStream
-    }
+  open func receive() throws -> InputType? {
+    defer { inputs.removeFirst() }
+    return inputs.first
+  }
+  
+  open func receive(completion: @escaping (ResultOrRPCError<InputType?>) -> Void) throws {
+    completion(.result(inputs.first))
+    inputs.removeFirst()
   }
 
   open func sendAndClose(_ response: OutputType) throws {

+ 21 - 2
Sources/SwiftGRPC/Runtime/ServerSessionUnary.swift

@@ -28,16 +28,35 @@ open class ServerSessionUnaryBase<InputType: Message, OutputType: Message>: Serv
     self.providerBlock = providerBlock
     super.init(handler: handler)
   }
-
+  
   public func run(queue _: DispatchQueue) throws {
     try handler.receiveMessage(initialMetadata: initialMetadata) { requestData in
-      if let requestData = requestData {
+      guard let requestData = requestData else {
+        print("ServerSessionUnaryBase.run empty request data")
+        do {
+          try self.handler.sendStatus(statusCode: .invalidArgument,
+                                      statusMessage: "no request data received")
+        } catch {
+          print("ServerSessionUnaryBase.run error sending status: \(error)")
+        }
+        return
+      }
+      do {
         let requestMessage = try InputType(serializedData: requestData)
         let replyMessage = try self.providerBlock(requestMessage, self)
         try self.handler.sendResponse(message: replyMessage.serializedData(),
                                       statusCode: self.statusCode,
                                       statusMessage: self.statusMessage,
                                       trailingMetadata: self.trailingMetadata)
+      } catch {
+        print("ServerSessionUnaryBase.run error processing request: \(error)")
+        
+        do {
+          try self.handler.sendError((error as? ServerErrorStatus)
+            ?? ServerErrorStatus(statusCode: .unknown, statusMessage: "unknown error processing request"))
+        } catch {
+          print("ServerSessionUnaryBase.run error sending status: \(error)")
+        }
       }
     }
   }

+ 3 - 5
Sources/SwiftGRPC/Runtime/ServiceServer.swift

@@ -69,11 +69,9 @@ open class ServiceServer {
       do {
         if try !strongSelf.handleMethod(unwrappedMethod, handler: handler, queue: queue) {
           // handle unknown requests
-          try handler.receiveMessage(initialMetadata: Metadata()) { _ in
-            try handler.sendResponse(statusCode: .unimplemented,
-                                     statusMessage: "unknown method " + unwrappedMethod,
-                                     trailingMetadata: Metadata())
-          }
+          try handler.sendResponse(statusCode: .unimplemented,
+                                   statusMessage: "unknown method " + unwrappedMethod,
+                                   trailingMetadata: Metadata())
         }
       } catch (let error) {
         print("Server error: \(error)")

+ 59 - 0
Sources/SwiftGRPC/Runtime/StreamReceiving.swift

@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Dispatch
+import Foundation
+import SwiftProtobuf
+
+public protocol StreamReceiving {
+  associatedtype ReceivedType: Message
+  
+  var call: Call { get }
+}
+
+extension StreamReceiving {
+  public func receive(completion: @escaping (ResultOrRPCError<ReceivedType?>) -> Void) throws {
+    try call.receiveMessage { callResult in
+      guard let responseData = callResult.resultData else {
+        if callResult.success {
+          completion(.result(nil))
+        } else {
+          completion(.error(.callError(callResult)))
+        }
+        return
+      }
+      if let response = try? ReceivedType(serializedData: responseData) {
+        completion(.result(response))
+      } else {
+        completion(.error(.invalidMessageReceived))
+      }
+    }
+  }
+  
+  public func receive() throws -> ReceivedType? {
+    var result: ResultOrRPCError<ReceivedType?>?
+    let sem = DispatchSemaphore(value: 0)
+    try receive {
+      result = $0
+      sem.signal()
+    }
+    _ = sem.wait()
+    switch result! {
+    case .result(let response): return response
+    case .error(let error): throw error
+    }
+  }
+}

+ 3 - 10
Sources/protoc-gen-swiftgrpc/Generator-Client.swift

@@ -18,7 +18,6 @@ import SwiftProtobuf
 import SwiftProtobufPluginLibrary
 
 extension Generator {
-
   internal func printClient() {
     for method in service.methods {
       self.method = method
@@ -57,10 +56,7 @@ extension Generator {
   private func printServiceClientMethodCallServerStreaming() {
     println("\(access) protocol \(callName): ClientCallServerStreaming {")
     indent()
-    println("/// Call this to wait for a result. Blocking.")
-    println("func receive() throws -> \(methodOutputName)")
-    println("/// Call this to wait for a result. Nonblocking.")
-    println("func receive(completion: @escaping (\(methodOutputName)?, ClientError?) -> Void) throws")
+    printStreamReceiveMethods(receivedType: methodOutputName)
     outdent()
     println("}")
     println()
@@ -89,7 +85,7 @@ extension Generator {
     println("/// Call this to close the connection and wait for a response. Blocking.")
     println("func closeAndReceive() throws -> \(methodOutputName)")
     println("/// Call this to close the connection and wait for a response. Nonblocking.")
-    println("func closeAndReceive(completion: @escaping (\(methodOutputName)?, ClientError?) -> Void) throws")
+    println("func closeAndReceive(completion: @escaping (ResultOrRPCError<\(methodOutputName)>) -> Void) throws")
     outdent()
     println("}")
     println()
@@ -114,10 +110,7 @@ extension Generator {
   private func printServiceClientMethodCallBidiStreaming() {
     println("\(access) protocol \(callName): ClientCallBidirectionalStreaming {")
     indent()
-    println("/// Call this to wait for a result. Blocking.")
-    println("func receive() throws -> \(methodOutputName)")
-    println("/// Call this to wait for a result. Nonblocking.")
-    println("func receive(completion: @escaping (\(methodOutputName)?, ClientError?) -> Void) throws")
+    printStreamReceiveMethods(receivedType: methodOutputName)
     println()
     println("/// Call this to send each message in the request stream.")
     println("func send(_ message: \(methodInputName), completion: @escaping (Error?) -> Void) throws")

+ 9 - 6
Sources/SwiftGRPC/Runtime/ClientError.swift → Sources/protoc-gen-swiftgrpc/Generator-Methods.swift

@@ -13,12 +13,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 import Foundation
+import SwiftProtobuf
+import SwiftProtobufPluginLibrary
 
-/// Type for errors thrown from generated client code.
-public enum ClientError: Error {
-  case endOfStream
-  case invalidMessageReceived
-  case error(c: CallResult)
+extension Generator {
+  func printStreamReceiveMethods(receivedType: String) {
+    println("/// Call this to wait for a result. Blocking.")
+    println("func receive() throws -> \(receivedType)?")
+    println("/// Call this to wait for a result. Nonblocking.")
+    println("func receive(completion: @escaping (ResultOrRPCError<\(receivedType)?>) -> Void) throws")
+  }
 }

+ 2 - 4
Sources/protoc-gen-swiftgrpc/Generator-Server.swift

@@ -145,8 +145,7 @@ extension Generator {
   private func printServerMethodClientStreaming() {
     println("\(access) protocol \(methodSessionName): ServerSessionClientStreaming {")
     indent()
-    println("/// Receive a message. Blocks until a message is received or the client closes the connection.")
-    println("func receive() throws -> \(methodInputName)")
+    printStreamReceiveMethods(receivedType: methodInputName)
     println()
     println("/// Send a response and close the connection.")
     println("func sendAndClose(_ response: \(methodOutputName)) throws")
@@ -178,8 +177,7 @@ extension Generator {
   private func printServerMethodBidirectional() {
     println("\(access) protocol \(methodSessionName): ServerSessionBidirectionalStreaming {")
     indent()
-    println("/// Receive a message. Blocks until a message is received or the client closes the connection.")
-    println("func receive() throws -> \(methodInputName)")
+    printStreamReceiveMethods(receivedType: methodInputName)
     println()
     println("/// Send a message. Nonblocking.")
     println("func send(_ response: \(methodOutputName), completion: ((Error?) -> Void)?) throws")

+ 20 - 12
Tests/SwiftGRPCTests/EchoTests.swift

@@ -146,9 +146,10 @@ extension EchoTests {
       completionHandlerExpectation.fulfill()
     }
 
-    XCTAssertEqual("Swift echo expand (0): foo", try! call.receive().text)
-    XCTAssertEqual("Swift echo expand (1): bar", try! call.receive().text)
-    XCTAssertEqual("Swift echo expand (2): baz", try! call.receive().text)
+    XCTAssertEqual("Swift echo expand (0): foo", try! call.receive()!.text)
+    XCTAssertEqual("Swift echo expand (1): bar", try! call.receive()!.text)
+    XCTAssertEqual("Swift echo expand (2): baz", try! call.receive()!.text)
+    XCTAssertNil(try! call.receive())
 
     waitForExpectations(timeout: defaultTimeout)
   }
@@ -161,8 +162,9 @@ extension EchoTests {
     }
 
     for string in EchoTests.lotsOfStrings {
-      XCTAssertEqual("Swift echo expand (\(string)): \(string)", try! call.receive().text)
+      XCTAssertEqual("Swift echo expand (\(string)): \(string)", try! call.receive()!.text)
     }
+    XCTAssertNil(try! call.receive())
 
     waitForExpectations(timeout: defaultTimeout)
   }
@@ -187,9 +189,10 @@ extension EchoTests {
     let closeCompletionHandlerExpectation = expectation(description: "close completion handler called")
     try! call.closeSend { closeCompletionHandlerExpectation.fulfill() }
 
-    XCTAssertEqual("Swift echo update (0): foo", try! call.receive().text)
-    XCTAssertEqual("Swift echo update (1): bar", try! call.receive().text)
-    XCTAssertEqual("Swift echo update (2): baz", try! call.receive().text)
+    XCTAssertEqual("Swift echo update (0): foo", try! call.receive()!.text)
+    XCTAssertEqual("Swift echo update (1): bar", try! call.receive()!.text)
+    XCTAssertEqual("Swift echo update (2): baz", try! call.receive()!.text)
+    XCTAssertNil(try! call.receive())
 
     waitForExpectations(timeout: defaultTimeout)
   }
@@ -203,19 +206,21 @@ extension EchoTests {
 
     var sendExpectation = expectation(description: "send completion handler 1 called")
     try! call.send(Echo_EchoRequest(text: "foo")) { [sendExpectation] in XCTAssertNil($0); sendExpectation.fulfill() }
-    XCTAssertEqual("Swift echo update (0): foo", try! call.receive().text)
+    XCTAssertEqual("Swift echo update (0): foo", try! call.receive()!.text)
 
     sendExpectation = expectation(description: "send completion handler 2 called")
     try! call.send(Echo_EchoRequest(text: "bar")) { [sendExpectation] in XCTAssertNil($0); sendExpectation.fulfill() }
-    XCTAssertEqual("Swift echo update (1): bar", try! call.receive().text)
+    XCTAssertEqual("Swift echo update (1): bar", try! call.receive()!.text)
 
     sendExpectation = expectation(description: "send completion handler 3 called")
     try! call.send(Echo_EchoRequest(text: "baz")) { [sendExpectation] in XCTAssertNil($0); sendExpectation.fulfill() }
-    XCTAssertEqual("Swift echo update (2): baz", try! call.receive().text)
+    XCTAssertEqual("Swift echo update (2): baz", try! call.receive()!.text)
 
     let closeCompletionHandlerExpectation = expectation(description: "close completion handler called")
     try! call.closeSend { closeCompletionHandlerExpectation.fulfill() }
 
+    XCTAssertNil(try! call.receive())
+    
     waitForExpectations(timeout: defaultTimeout)
   }
 
@@ -236,8 +241,9 @@ extension EchoTests {
     try! call.closeSend { closeCompletionHandlerExpectation.fulfill() }
 
     for string in EchoTests.lotsOfStrings {
-      XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive().text)
+      XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive()!.text)
     }
+    XCTAssertNil(try! call.receive())
 
     waitForExpectations(timeout: defaultTimeout)
   }
@@ -252,11 +258,13 @@ extension EchoTests {
     for string in EchoTests.lotsOfStrings {
       let sendExpectation = expectation(description: "send completion handler \(string) called")
       try! call.send(Echo_EchoRequest(text: string)) { [sendExpectation] in XCTAssertNil($0); sendExpectation.fulfill() }
-      XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive().text)
+      XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive()!.text)
     }
 
     let closeCompletionHandlerExpectation = expectation(description: "close completion handler called")
     try! call.closeSend { closeCompletionHandlerExpectation.fulfill() }
+    
+    XCTAssertNil(try! call.receive())
 
     waitForExpectations(timeout: defaultTimeout)
   }

+ 126 - 0
Tests/SwiftGRPCTests/ErrorHandlingTests.swift

@@ -0,0 +1,126 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Dispatch
+import Foundation
+@testable import SwiftGRPC
+import XCTest
+
+// TODO(danielalm): Also run a similar set of tests with SSL enabled.
+class ErrorHandlingTests: XCTestCase {
+  static var allTests: [(String, (ErrorHandlingTests) -> () throws -> Void)] {
+    return [
+      ("testConnectionFailureUnary", testConnectionFailureUnary),
+      ("testConnectionFailureClientStreaming", testConnectionFailureClientStreaming),
+      ("testConnectionFailureServerStreaming", testConnectionFailureServerStreaming),
+      ("testConnectionFailureBidirectionalStreaming", testConnectionFailureBidirectionalStreaming)
+    ]
+  }
+  
+  let address = "localhost:5050"
+  
+  let defaultTimeout: TimeInterval = 1.0
+}
+
+extension ErrorHandlingTests {
+  func testConnectionFailureUnary() {
+    let client = Echo_EchoServiceClient(address: "localhost:1234", secure: false)
+    client.timeout = defaultTimeout
+    
+    do {
+      _ = try client.get(Echo_EchoRequest(text: "foo")).text
+      XCTFail("should have thrown")
+    } catch {
+      guard case let .callError(callResult) = error as! RPCError
+        else { XCTFail("unexpected error \(error)"); return }
+      XCTAssertEqual(.unavailable, callResult.statusCode)
+      XCTAssertEqual("Connect Failed", callResult.statusMessage)
+    }
+  }
+  
+  func testConnectionFailureClientStreaming() {
+    let client = Echo_EchoServiceClient(address: "localhost:1234", secure: false)
+    client.timeout = defaultTimeout
+    
+    let completionHandlerExpectation = expectation(description: "final completion handler called")
+    let call = try! client.collect { callResult in
+      XCTAssertEqual(.unavailable, callResult.statusCode)
+      completionHandlerExpectation.fulfill()
+    }
+    
+    let sendExpectation = expectation(description: "send completion handler 1 called")
+    try! call.send(Echo_EchoRequest(text: "foo")) { [sendExpectation] in
+      XCTAssertEqual(.unknown, $0 as! CallError)
+      sendExpectation.fulfill()
+    }
+    call.waitForSendOperationsToFinish()
+    
+    do {
+      _ = try call.closeAndReceive()
+      XCTFail("should have thrown")
+    } catch let receiveError {
+      XCTAssertEqual(.unknown, (receiveError as! RPCError).callResult!.statusCode)
+    }
+    
+    waitForExpectations(timeout: defaultTimeout)
+  }
+  
+  func testConnectionFailureServerStreaming() {
+    let client = Echo_EchoServiceClient(address: "localhost:1234", secure: false)
+    client.timeout = defaultTimeout
+    
+    let completionHandlerExpectation = expectation(description: "completion handler called")
+    let call = try! client.expand(Echo_EchoRequest(text: "foo bar baz")) { callResult in
+      XCTAssertEqual(.unavailable, callResult.statusCode)
+      completionHandlerExpectation.fulfill()
+    }
+    
+    do {
+      _ = try call.receive()
+      XCTFail("should have thrown")
+    } catch let receiveError {
+      XCTAssertEqual(.unknown, (receiveError as! RPCError).callResult!.statusCode)
+    }
+    
+    waitForExpectations(timeout: defaultTimeout)
+  }
+  
+  func testConnectionFailureBidirectionalStreaming() {
+    let client = Echo_EchoServiceClient(address: "localhost:1234", secure: false)
+    client.timeout = defaultTimeout
+    
+    let completionHandlerExpectation = expectation(description: "completion handler called")
+    let call = try! client.update { callResult in
+      XCTAssertEqual(.unavailable, callResult.statusCode)
+      completionHandlerExpectation.fulfill()
+    }
+    
+    let sendExpectation = expectation(description: "send completion handler 1 called")
+    try! call.send(Echo_EchoRequest(text: "foo")) { [sendExpectation] in
+      XCTAssertEqual(.unknown, $0 as! CallError)
+      sendExpectation.fulfill()
+    }
+    call.waitForSendOperationsToFinish()
+    
+    do {
+      _ = try call.receive()
+      XCTFail("should have thrown")
+    } catch let receiveError {
+      XCTAssertEqual(.unknown, (receiveError as! RPCError).callResult!.statusCode)
+    }
+    
+    waitForExpectations(timeout: defaultTimeout)
+  }
+}

+ 10 - 6
Tests/SwiftGRPCTests/GRPCTests.swift

@@ -222,10 +222,12 @@ func callServerStream(channel: Channel) throws {
 
   for _ in 0..<steps {
     let messageSem = DispatchSemaphore(value: 0)
-    try call.receiveMessage(completion: { (data) in
-      if let data = data {
+    try call.receiveMessage(completion: { callResult in
+      if let data = callResult.resultData {
         let messageString = String(data: data, encoding: .utf8)
         XCTAssertEqual(messageString, serverText)
+      } else {
+        print("unexpected result: \(callResult)")
       }
       messageSem.signal()
     })
@@ -273,10 +275,12 @@ func callBiDiStream(channel: Channel) throws {
   // Receive pongs
   for _ in 0..<steps {
     let pongSem = DispatchSemaphore(value: 0)
-    try call.receiveMessage(completion: { (data) in
-      if let data = data {
+    try call.receiveMessage(completion: { callResult in
+      if let data = callResult.resultData {
         let messageString = String(data: data, encoding: .utf8)
         XCTAssertEqual(messageString, serverPong)
+      } else {
+        print("unexpected result: \(callResult)")
       }
       pongSem.signal()
     })
@@ -386,8 +390,8 @@ func handleBiDiStream(requestHandler: Handler) throws {
   // Receive remaining pings
   for _ in 0..<steps-1 {
     let receiveSem = DispatchSemaphore(value: 0)
-    try requestHandler.receiveMessage(completion: { (data) in
-      let messageString = String(data: data!, encoding: .utf8)
+    try requestHandler.receiveMessage(completion: { callStatus in
+      let messageString = String(data: callStatus.resultData!, encoding: .utf8)
       XCTAssertEqual(messageString, clientPing)
       receiveSem.signal()
     })