|
|
@@ -44,6 +44,61 @@ private class CallLock {
|
|
|
static let sharedInstance = CallLock()
|
|
|
}
|
|
|
|
|
|
+public enum CallError : Error {
|
|
|
+ case ok
|
|
|
+ case unknown
|
|
|
+ case notOnServer
|
|
|
+ case notOnClient
|
|
|
+ case alreadyAccepted
|
|
|
+ case alreadyInvoked
|
|
|
+ case notInvoked
|
|
|
+ case alreadyFinished
|
|
|
+ case tooManyOperations
|
|
|
+ case invalidFlags
|
|
|
+ case invalidMetadata
|
|
|
+ case invalidMessage
|
|
|
+ case notServerCompletionQueue
|
|
|
+ case batchTooBig
|
|
|
+ case payloadTypeMismatch
|
|
|
+
|
|
|
+ static func callError(grpcCallError error: grpc_call_error) -> CallError {
|
|
|
+ switch(error) {
|
|
|
+ case GRPC_CALL_OK:
|
|
|
+ return .ok
|
|
|
+ case GRPC_CALL_ERROR:
|
|
|
+ return .unknown
|
|
|
+ case GRPC_CALL_ERROR_NOT_ON_SERVER:
|
|
|
+ return .notOnServer
|
|
|
+ case GRPC_CALL_ERROR_NOT_ON_CLIENT:
|
|
|
+ return .notOnClient
|
|
|
+ case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
|
|
|
+ return .alreadyAccepted
|
|
|
+ case GRPC_CALL_ERROR_ALREADY_INVOKED:
|
|
|
+ return .alreadyInvoked
|
|
|
+ case GRPC_CALL_ERROR_NOT_INVOKED:
|
|
|
+ return .notInvoked
|
|
|
+ case GRPC_CALL_ERROR_ALREADY_FINISHED:
|
|
|
+ return .alreadyFinished
|
|
|
+ case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
|
|
|
+ return .tooManyOperations
|
|
|
+ case GRPC_CALL_ERROR_INVALID_FLAGS:
|
|
|
+ return .invalidFlags
|
|
|
+ case GRPC_CALL_ERROR_INVALID_METADATA:
|
|
|
+ return .invalidMetadata
|
|
|
+ case GRPC_CALL_ERROR_INVALID_MESSAGE:
|
|
|
+ return .invalidMessage
|
|
|
+ case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
|
|
|
+ return .notServerCompletionQueue
|
|
|
+ case GRPC_CALL_ERROR_BATCH_TOO_BIG:
|
|
|
+ return .batchTooBig
|
|
|
+ case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
|
|
|
+ return .payloadTypeMismatch
|
|
|
+ default:
|
|
|
+ return .unknown
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
public struct CallResult {
|
|
|
public var statusCode : Int
|
|
|
public var statusMessage : String?
|
|
|
@@ -53,7 +108,7 @@ public struct CallResult {
|
|
|
}
|
|
|
|
|
|
public typealias CallCompletion = (CallResult) -> Void
|
|
|
-public typealias SendMessageCompletion = (grpc_call_error) -> Void
|
|
|
+public typealias SendMessageCompletion = (CallError) -> Void
|
|
|
|
|
|
/// A gRPC API call
|
|
|
public class Call {
|
|
|
@@ -99,12 +154,12 @@ public class Call {
|
|
|
func performOperations(operations: OperationGroup,
|
|
|
tag: Int64,
|
|
|
completionQueue: CompletionQueue)
|
|
|
- -> grpc_call_error {
|
|
|
+ -> CallError {
|
|
|
let mutex = CallLock.sharedInstance.mutex
|
|
|
mutex.lock()
|
|
|
let error = cgrpc_call_perform(underlyingCall, operations.operations, tag)
|
|
|
mutex.unlock()
|
|
|
- return error
|
|
|
+ return CallError.callError(grpcCallError:error)
|
|
|
}
|
|
|
|
|
|
/// Performs a nonstreaming gRPC API call
|
|
|
@@ -114,7 +169,7 @@ public class Call {
|
|
|
/// - Returns: a CallResponse object containing results of the call
|
|
|
public func performNonStreamingCall(messageData: Data,
|
|
|
metadata: Metadata,
|
|
|
- completion: @escaping CallCompletion) -> grpc_call_error {
|
|
|
+ completion: @escaping CallCompletion) -> CallError {
|
|
|
|
|
|
let messageBuffer = ByteBuffer(data:messageData)
|
|
|
|
|
|
@@ -153,7 +208,7 @@ public class Call {
|
|
|
}
|
|
|
|
|
|
// perform a group of operations (used internally)
|
|
|
- private func perform(operations: OperationGroup) -> grpc_call_error {
|
|
|
+ private func perform(operations: OperationGroup) -> CallError {
|
|
|
self.completionQueue.operationGroups[operations.tag] = operations
|
|
|
return performOperations(operations:operations,
|
|
|
tag:operations.tag,
|
|
|
@@ -161,14 +216,14 @@ public class Call {
|
|
|
}
|
|
|
|
|
|
// start a streaming connection
|
|
|
- public func start(metadata: Metadata) -> grpc_call_error {
|
|
|
- var error : grpc_call_error
|
|
|
+ public func start(metadata: Metadata) -> CallError {
|
|
|
+ var error : CallError
|
|
|
error = self.sendInitialMetadata(metadata: metadata)
|
|
|
- if error != GRPC_CALL_OK {
|
|
|
+ if error != .ok {
|
|
|
return error
|
|
|
}
|
|
|
error = self.receiveInitialMetadata()
|
|
|
- if error != GRPC_CALL_OK {
|
|
|
+ if error != .ok {
|
|
|
return error
|
|
|
}
|
|
|
return self.receiveStatus()
|
|
|
@@ -178,19 +233,19 @@ public class Call {
|
|
|
public func sendMessage(data: Data,
|
|
|
callback:@escaping SendMessageCompletion = {(error) in })
|
|
|
-> Void {
|
|
|
- DispatchQueue.main.async {
|
|
|
- if self.writing {
|
|
|
- self.pendingMessages.append(data) // TODO: return something if we can't accept another message
|
|
|
- callback(GRPC_CALL_OK)
|
|
|
- } else {
|
|
|
- self.writing = true
|
|
|
- let error = self.sendWithoutBlocking(data: data)
|
|
|
- callback(error)
|
|
|
+ DispatchQueue.main.async {
|
|
|
+ if self.writing {
|
|
|
+ self.pendingMessages.append(data) // TODO: return something if we can't accept another message
|
|
|
+ callback(.ok)
|
|
|
+ } else {
|
|
|
+ self.writing = true
|
|
|
+ let error = self.sendWithoutBlocking(data: data)
|
|
|
+ callback(error)
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- private func sendWithoutBlocking(data: Data) -> grpc_call_error {
|
|
|
+ private func sendWithoutBlocking(data: Data) -> CallError {
|
|
|
let messageBuffer = ByteBuffer(data:data)
|
|
|
let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendMessage])
|
|
|
@@ -213,7 +268,7 @@ public class Call {
|
|
|
|
|
|
|
|
|
// receive a message over a streaming connection
|
|
|
- public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> grpc_call_error {
|
|
|
+ public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> CallError {
|
|
|
let operation_receiveMessage = Operation_ReceiveMessage()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_receiveMessage])
|
|
|
{ (event) in
|
|
|
@@ -225,7 +280,7 @@ public class Call {
|
|
|
}
|
|
|
|
|
|
// send initial metadata over a streaming connection
|
|
|
- private func sendInitialMetadata(metadata: Metadata) -> grpc_call_error {
|
|
|
+ private func sendInitialMetadata(metadata: Metadata) -> CallError {
|
|
|
let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendInitialMetadata])
|
|
|
{ (success) in
|
|
|
@@ -239,7 +294,7 @@ public class Call {
|
|
|
}
|
|
|
|
|
|
// receive initial metadata from a streaming connection
|
|
|
- private func receiveInitialMetadata() -> grpc_call_error {
|
|
|
+ private func receiveInitialMetadata() -> CallError {
|
|
|
let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_receiveInitialMetadata])
|
|
|
{ (event) in
|
|
|
@@ -252,7 +307,7 @@ public class Call {
|
|
|
}
|
|
|
|
|
|
// receive status from a streaming connection
|
|
|
- private func receiveStatus() -> grpc_call_error {
|
|
|
+ private func receiveStatus() -> CallError {
|
|
|
let operation_receiveStatus = Operation_ReceiveStatusOnClient()
|
|
|
let operations = OperationGroup(call:self,
|
|
|
operations:[operation_receiveStatus])
|
|
|
@@ -263,7 +318,7 @@ public class Call {
|
|
|
}
|
|
|
|
|
|
// close a streaming connection
|
|
|
- public func close(completion:@escaping (() -> Void)) -> grpc_call_error {
|
|
|
+ public func close(completion:@escaping (() -> Void)) -> CallError {
|
|
|
let operation_sendCloseFromClient = Operation_SendCloseFromClient()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendCloseFromClient])
|
|
|
{ (event) in
|