|
|
@@ -44,6 +44,9 @@ private class CallLock {
|
|
|
static let sharedInstance = CallLock()
|
|
|
}
|
|
|
|
|
|
+public typealias CallCompletion = (Int, String?, Data?, Metadata?, Metadata?) -> Void
|
|
|
+public typealias SendMessageCompletion = (grpc_call_error) -> Void
|
|
|
+
|
|
|
/// A gRPC API call
|
|
|
public class Call {
|
|
|
|
|
|
@@ -96,7 +99,6 @@ public class Call {
|
|
|
return error
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/// Performs a nonstreaming gRPC API call
|
|
|
///
|
|
|
/// - Parameter message: a ByteBuffer containing the message to send
|
|
|
@@ -104,7 +106,7 @@ public class Call {
|
|
|
/// - Returns: a CallResponse object containing results of the call
|
|
|
public func performNonStreamingCall(messageData: Data,
|
|
|
metadata: Metadata,
|
|
|
- completion: @escaping ((CallResponse) -> Void)) -> Void {
|
|
|
+ completion: @escaping CallCompletion) -> grpc_call_error {
|
|
|
|
|
|
let messageBuffer = ByteBuffer(data:messageData)
|
|
|
|
|
|
@@ -121,80 +123,85 @@ public class Call {
|
|
|
operation_sendCloseFromClient,
|
|
|
operation_receiveInitialMetadata,
|
|
|
operation_receiveStatusOnClient,
|
|
|
- operation_receiveMessage])
|
|
|
- { (success) in
|
|
|
- if success {
|
|
|
- let response = CallResponse(status:operation_receiveStatusOnClient.status(),
|
|
|
- statusDetails:operation_receiveStatusOnClient.statusDetails(),
|
|
|
- message:operation_receiveMessage.message(),
|
|
|
- initialMetadata:operation_receiveInitialMetadata.metadata(),
|
|
|
- trailingMetadata:operation_receiveStatusOnClient.metadata())
|
|
|
- completion(response)
|
|
|
- } else {
|
|
|
- completion(CallResponse())
|
|
|
- }
|
|
|
- }
|
|
|
- let call_error = self.perform(call: self, operations: group)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print ("call error = \(call_error)")
|
|
|
- }
|
|
|
+ operation_receiveMessage],
|
|
|
+ completion:
|
|
|
+ {(success) in
|
|
|
+ if success {
|
|
|
+ completion(operation_receiveStatusOnClient.status(),
|
|
|
+ operation_receiveStatusOnClient.statusDetails(),
|
|
|
+ operation_receiveMessage.message()?.data(),
|
|
|
+ operation_receiveInitialMetadata.metadata(),
|
|
|
+ operation_receiveStatusOnClient.metadata())
|
|
|
+ } else {
|
|
|
+ completion(0, nil, nil, nil, nil)
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ return self.perform(operations: group)
|
|
|
}
|
|
|
|
|
|
// perform a group of operations (used internally)
|
|
|
- private func perform(call: Call, operations: OperationGroup) -> grpc_call_error {
|
|
|
+ private func perform(operations: OperationGroup) -> grpc_call_error {
|
|
|
self.completionQueue.operationGroups[operations.tag] = operations
|
|
|
- return call.performOperations(operations:operations,
|
|
|
- tag:operations.tag,
|
|
|
- completionQueue: self.completionQueue)
|
|
|
+ return performOperations(operations:operations,
|
|
|
+ tag:operations.tag,
|
|
|
+ completionQueue: self.completionQueue)
|
|
|
}
|
|
|
|
|
|
// start a streaming connection
|
|
|
- public func start(metadata: Metadata) {
|
|
|
- self.sendInitialMetadata(metadata: metadata)
|
|
|
- self.receiveInitialMetadata()
|
|
|
- self.receiveStatus()
|
|
|
+ public func start(metadata: Metadata) -> grpc_call_error {
|
|
|
+ var error : grpc_call_error
|
|
|
+ error = self.sendInitialMetadata(metadata: metadata)
|
|
|
+ if error != GRPC_CALL_OK {
|
|
|
+ return error
|
|
|
+ }
|
|
|
+ error = self.receiveInitialMetadata()
|
|
|
+ if error != GRPC_CALL_OK {
|
|
|
+ return error
|
|
|
+ }
|
|
|
+ return self.receiveStatus()
|
|
|
}
|
|
|
|
|
|
// send a message over a streaming connection
|
|
|
- public func sendMessage(data: Data) {
|
|
|
+ public func sendMessage(data: Data,
|
|
|
+ callback:@escaping SendMessageCompletion = {(error) in })
|
|
|
+ -> Void {
|
|
|
DispatchQueue.main.async {
|
|
|
if self.writing {
|
|
|
- self.pendingMessages.append(data)
|
|
|
+ self.pendingMessages.append(data) // TODO: return something if we can't accept another message
|
|
|
+ callback(GRPC_CALL_OK)
|
|
|
} else {
|
|
|
self.writing = true
|
|
|
- self.sendWithoutBlocking(data: data)
|
|
|
+ let error = self.sendWithoutBlocking(data: data)
|
|
|
+ callback(error)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func sendWithoutBlocking(data: Data) {
|
|
|
+ private func sendWithoutBlocking(data: Data) -> grpc_call_error {
|
|
|
let messageBuffer = ByteBuffer(data:data)
|
|
|
let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendMessage])
|
|
|
{ (event) in
|
|
|
|
|
|
- // if the event failed, shut down
|
|
|
-
|
|
|
+ // TODO: if the event failed, shut down
|
|
|
|
|
|
DispatchQueue.main.async {
|
|
|
- if self.pendingMessages.count > 0 {
|
|
|
- let nextMessage = self.pendingMessages.first!
|
|
|
- self.pendingMessages.removeFirst()
|
|
|
- self.sendWithoutBlocking(data: nextMessage)
|
|
|
- } else {
|
|
|
- self.writing = false
|
|
|
+ if self.pendingMessages.count > 0 {
|
|
|
+ let nextMessage = self.pendingMessages.first!
|
|
|
+ self.pendingMessages.removeFirst()
|
|
|
+ _ = self.sendWithoutBlocking(data: nextMessage)
|
|
|
+ } else {
|
|
|
+ self.writing = false
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
- let call_error = self.perform(call:self, operations:operations)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print("call error \(call_error)")
|
|
|
}
|
|
|
+ return self.perform(operations:operations)
|
|
|
}
|
|
|
|
|
|
|
|
|
// receive a message over a streaming connection
|
|
|
- public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> Void {
|
|
|
+ public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> grpc_call_error {
|
|
|
let operation_receiveMessage = Operation_ReceiveMessage()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_receiveMessage])
|
|
|
{ (event) in
|
|
|
@@ -202,14 +209,11 @@ public class Call {
|
|
|
callback(messageBuffer.data())
|
|
|
}
|
|
|
}
|
|
|
- let call_error = self.perform(call:self, operations:operations)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print("call error \(call_error)")
|
|
|
- }
|
|
|
+ return self.perform(operations:operations)
|
|
|
}
|
|
|
|
|
|
// send initial metadata over a streaming connection
|
|
|
- private func sendInitialMetadata(metadata: Metadata) {
|
|
|
+ private func sendInitialMetadata(metadata: Metadata) -> grpc_call_error {
|
|
|
let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendInitialMetadata])
|
|
|
{ (success) in
|
|
|
@@ -219,14 +223,11 @@ public class Call {
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- let call_error = self.perform(call:self, operations:operations)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print("call error: \(call_error)")
|
|
|
- }
|
|
|
+ return self.perform(operations:operations)
|
|
|
}
|
|
|
|
|
|
// receive initial metadata from a streaming connection
|
|
|
- private func receiveInitialMetadata() {
|
|
|
+ private func receiveInitialMetadata() -> grpc_call_error {
|
|
|
let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_receiveInitialMetadata])
|
|
|
{ (event) in
|
|
|
@@ -235,36 +236,27 @@ public class Call {
|
|
|
print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
|
|
|
}
|
|
|
}
|
|
|
- let call_error = self.perform(call:self, operations:operations)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print("call error \(call_error)")
|
|
|
- }
|
|
|
+ return self.perform(operations:operations)
|
|
|
}
|
|
|
|
|
|
// receive status from a streaming connection
|
|
|
- private func receiveStatus() {
|
|
|
+ private func receiveStatus() -> grpc_call_error {
|
|
|
let operation_receiveStatus = Operation_ReceiveStatusOnClient()
|
|
|
let operations = OperationGroup(call:self,
|
|
|
operations:[operation_receiveStatus])
|
|
|
{ (event) in
|
|
|
print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
|
|
|
}
|
|
|
- let call_error = self.perform(call:self, operations:operations)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print("call error \(call_error)")
|
|
|
- }
|
|
|
+ return self.perform(operations:operations)
|
|
|
}
|
|
|
|
|
|
// close a streaming connection
|
|
|
- public func close(completion:@escaping (() -> Void)) {
|
|
|
+ public func close(completion:@escaping (() -> Void)) -> grpc_call_error {
|
|
|
let operation_sendCloseFromClient = Operation_SendCloseFromClient()
|
|
|
let operations = OperationGroup(call:self, operations:[operation_sendCloseFromClient])
|
|
|
{ (event) in
|
|
|
completion()
|
|
|
}
|
|
|
- let call_error = self.perform(call:self, operations:operations)
|
|
|
- if call_error != GRPC_CALL_OK {
|
|
|
- print("call error \(call_error)")
|
|
|
- }
|
|
|
+ return self.perform(operations:operations)
|
|
|
}
|
|
|
}
|