|
|
@@ -107,8 +107,8 @@ public struct CallResult {
|
|
|
public var trailingMetadata : Metadata?
|
|
|
}
|
|
|
|
|
|
-public typealias CallCompletion = (CallResult) -> Void
|
|
|
-public typealias SendMessageCompletion = (CallError) -> Void
|
|
|
+public typealias CallCompletion = (CallResult) throws -> Void
|
|
|
+public typealias SendMessageCompletion = () -> Void
|
|
|
|
|
|
/// A gRPC API call
|
|
|
public class Call {
|
|
|
@@ -150,14 +150,15 @@ public class Call {
|
|
|
///
|
|
|
/// - Parameter operations: group of operations to be performed
|
|
|
/// - Returns: the result of initiating the call
|
|
|
- func perform(_ operations: OperationGroup)
|
|
|
- -> CallError {
|
|
|
- completionQueue.operationGroups[operations.tag] = operations
|
|
|
- let mutex = CallLock.sharedInstance.mutex
|
|
|
- mutex.lock()
|
|
|
- let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
|
|
|
- mutex.unlock()
|
|
|
- return CallError.callError(grpcCallError:error)
|
|
|
+ func perform(_ operations: OperationGroup) throws -> Void {
|
|
|
+ completionQueue.operationGroups[operations.tag] = operations
|
|
|
+ let mutex = CallLock.sharedInstance.mutex
|
|
|
+ mutex.lock()
|
|
|
+ let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
|
|
|
+ mutex.unlock()
|
|
|
+ if error != GRPC_CALL_OK {
|
|
|
+ throw CallError.callError(grpcCallError:error)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// Performs a nonstreaming gRPC API call
|
|
|
@@ -167,7 +168,7 @@ public class Call {
|
|
|
/// - Returns: a CallResponse object containing results of the call
|
|
|
public func performNonStreamingCall(messageData: Data,
|
|
|
metadata: Metadata,
|
|
|
- completion: @escaping CallCompletion) -> CallError {
|
|
|
+ completion: @escaping CallCompletion) throws -> Void {
|
|
|
|
|
|
let messageBuffer = ByteBuffer(data:messageData)
|
|
|
|
|
|
@@ -188,54 +189,51 @@ public class Call {
|
|
|
completion:
|
|
|
{(success) in
|
|
|
if success {
|
|
|
- completion(CallResult(statusCode:operation_receiveStatusOnClient.status(),
|
|
|
- statusMessage:operation_receiveStatusOnClient.statusDetails(),
|
|
|
- resultData:operation_receiveMessage.message()?.data(),
|
|
|
- initialMetadata:operation_receiveInitialMetadata.metadata(),
|
|
|
- trailingMetadata:operation_receiveStatusOnClient.metadata()))
|
|
|
+ try completion(CallResult(statusCode:operation_receiveStatusOnClient.status(),
|
|
|
+ statusMessage:operation_receiveStatusOnClient.statusDetails(),
|
|
|
+ resultData:operation_receiveMessage.message()?.data(),
|
|
|
+ initialMetadata:operation_receiveInitialMetadata.metadata(),
|
|
|
+ trailingMetadata:operation_receiveStatusOnClient.metadata()))
|
|
|
} else {
|
|
|
- completion(CallResult(statusCode:0,
|
|
|
- statusMessage:nil,
|
|
|
- resultData:nil,
|
|
|
- initialMetadata:nil,
|
|
|
- trailingMetadata:nil))
|
|
|
+ try completion(CallResult(statusCode:0,
|
|
|
+ statusMessage:nil,
|
|
|
+ resultData:nil,
|
|
|
+ initialMetadata:nil,
|
|
|
+ trailingMetadata:nil))
|
|
|
}
|
|
|
})
|
|
|
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
// start a streaming connection
|
|
|
- public func start(metadata: Metadata) -> CallError {
|
|
|
- var error : CallError
|
|
|
- error = self.sendInitialMetadata(metadata: metadata)
|
|
|
- if error != .ok {
|
|
|
- return error
|
|
|
- }
|
|
|
- error = self.receiveInitialMetadata()
|
|
|
- if error != .ok {
|
|
|
- return error
|
|
|
- }
|
|
|
- return self.receiveStatus()
|
|
|
+ public func start(metadata: Metadata) throws -> Void {
|
|
|
+ try self.sendInitialMetadata(metadata: metadata)
|
|
|
+ try self.receiveInitialMetadata()
|
|
|
+ try self.receiveStatus()
|
|
|
}
|
|
|
|
|
|
// send a message over a streaming connection
|
|
|
public func sendMessage(data: Data,
|
|
|
- callback:@escaping SendMessageCompletion = {(error) in })
|
|
|
+ callback:@escaping SendMessageCompletion = {() in })
|
|
|
-> Void {
|
|
|
DispatchQueue.main.async {
|
|
|
if self.writing {
|
|
|
self.pendingMessages.append(data) // TODO: return something if we can't accept another message
|
|
|
- callback(.ok)
|
|
|
+ callback()
|
|
|
} else {
|
|
|
self.writing = true
|
|
|
- let error = self.sendWithoutBlocking(data: data)
|
|
|
- callback(error)
|
|
|
+ do {
|
|
|
+ try self.sendWithoutBlocking(data: data)
|
|
|
+ callback()
|
|
|
+ } catch (let callError) {
|
|
|
+
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func sendWithoutBlocking(data: Data) -> CallError {
|
|
|
+ private func sendWithoutBlocking(data: Data) throws -> Void {
|
|
|
let messageBuffer = ByteBuffer(data:data)
|
|
|
let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendMessage])
|
|
|
@@ -245,7 +243,11 @@ public class Call {
|
|
|
if self.pendingMessages.count > 0 {
|
|
|
let nextMessage = self.pendingMessages.first!
|
|
|
self.pendingMessages.removeFirst()
|
|
|
- _ = self.sendWithoutBlocking(data: nextMessage)
|
|
|
+ do {
|
|
|
+ try self.sendWithoutBlocking(data: nextMessage)
|
|
|
+ } catch (let callError) {
|
|
|
+
|
|
|
+ }
|
|
|
} else {
|
|
|
self.writing = false
|
|
|
}
|
|
|
@@ -254,26 +256,26 @@ public class Call {
|
|
|
// TODO: if the event failed, shut down
|
|
|
}
|
|
|
}
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
|
|
|
// receive a message over a streaming connection
|
|
|
- public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> CallError {
|
|
|
+ public func receiveMessage(callback:@escaping ((Data!) throws -> Void)) throws -> Void {
|
|
|
let operation_receiveMessage = Operation_ReceiveMessage()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_receiveMessage])
|
|
|
{(success) in
|
|
|
if success {
|
|
|
if let messageBuffer = operation_receiveMessage.message() {
|
|
|
- callback(messageBuffer.data())
|
|
|
+ try callback(messageBuffer.data())
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
// send initial metadata over a streaming connection
|
|
|
- private func sendInitialMetadata(metadata: Metadata) -> CallError {
|
|
|
+ private func sendInitialMetadata(metadata: Metadata) throws -> Void {
|
|
|
let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendInitialMetadata])
|
|
|
{(success) in
|
|
|
@@ -283,11 +285,11 @@ public class Call {
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
// receive initial metadata from a streaming connection
|
|
|
- private func receiveInitialMetadata() -> CallError {
|
|
|
+ private func receiveInitialMetadata() throws -> Void {
|
|
|
let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_receiveInitialMetadata])
|
|
|
{(success) in
|
|
|
@@ -298,11 +300,11 @@ public class Call {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
// receive status from a streaming connection
|
|
|
- private func receiveStatus() -> CallError {
|
|
|
+ private func receiveStatus() throws -> Void {
|
|
|
let operation_receiveStatus = Operation_ReceiveStatusOnClient()
|
|
|
let operations = OperationGroup(call:self,
|
|
|
operations:[operation_receiveStatus])
|
|
|
@@ -311,11 +313,11 @@ public class Call {
|
|
|
print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
|
|
|
}
|
|
|
}
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
// close a streaming connection
|
|
|
- public func close(completion:@escaping (() -> Void)) -> CallError {
|
|
|
+ public func close(completion:@escaping (() -> Void)) throws -> Void {
|
|
|
let operation_sendCloseFromClient = Operation_SendCloseFromClient()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendCloseFromClient])
|
|
|
{(success) in
|
|
|
@@ -323,6 +325,6 @@ public class Call {
|
|
|
completion()
|
|
|
}
|
|
|
}
|
|
|
- return self.perform(operations)
|
|
|
+ try self.perform(operations)
|
|
|
}
|
|
|
}
|