|
|
@@ -104,6 +104,9 @@ public class Call {
|
|
|
/// Shared mutex for synchronizing calls to cgrpc_call_perform()
|
|
|
static let callMutex = Mutex()
|
|
|
|
|
|
+ /// Maximum number of messages that can be queued
|
|
|
+ static var maximumQueuedMessages = 10
|
|
|
+
|
|
|
/// Pointer to underlying C representation
|
|
|
private var underlyingCall : UnsafeMutableRawPointer
|
|
|
|
|
|
@@ -148,7 +151,7 @@ public class Call {
|
|
|
///
|
|
|
/// - Parameter operations: group of operations to be performed
|
|
|
/// - Returns: the result of initiating the call
|
|
|
- func perform(_ operations: OperationGroup) throws -> Void {
|
|
|
+ internal func perform(_ operations: OperationGroup) throws -> Void {
|
|
|
completionQueue.register(operations)
|
|
|
Call.callMutex.lock()
|
|
|
let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
|
|
|
@@ -163,9 +166,9 @@ public class Call {
|
|
|
/// - Parameter message: data containing the message to send
|
|
|
/// - Parameter metadata: metadata to send with the call
|
|
|
/// - Parameter callback: a blocko to call with a CallResponse object containing call results
|
|
|
- public func performNonStreamingCall(message: Data,
|
|
|
- metadata: Metadata,
|
|
|
- completion: @escaping (CallResult) throws -> Void)
|
|
|
+ public func perform(message: Data,
|
|
|
+ metadata: Metadata,
|
|
|
+ completion: @escaping (CallResult) throws -> Void)
|
|
|
throws -> Void {
|
|
|
let messageBuffer = ByteBuffer(data:message)
|
|
|
let operations = OperationGroup(call:self,
|
|
|
@@ -195,32 +198,41 @@ public class Call {
|
|
|
try self.perform(operations)
|
|
|
}
|
|
|
|
|
|
- // start a streaming connection
|
|
|
+ /// start a streaming connection
|
|
|
+ ///
|
|
|
+ /// Parameter metadata: initial metadata to send
|
|
|
public func start(metadata: Metadata) throws -> Void {
|
|
|
try self.sendInitialMetadata(metadata: metadata)
|
|
|
try self.receiveInitialMetadata()
|
|
|
try self.receiveStatus()
|
|
|
}
|
|
|
|
|
|
- // send a message over a streaming connection
|
|
|
- public func sendMessage(data: Data) -> Void {
|
|
|
- self.sendMutex.synchronize {
|
|
|
- if self.writing {
|
|
|
- self.pendingMessages.append(data) // TODO: return something if we can't accept another message
|
|
|
- } else {
|
|
|
- self.writing = true
|
|
|
- do {
|
|
|
- try self.sendWithoutBlocking(data: data)
|
|
|
- } catch (let callError) {
|
|
|
- print("grpc error: \(callError)")
|
|
|
- }
|
|
|
+ /// send a message over a streaming connection
|
|
|
+ ///
|
|
|
+ /// Parameter data: the message data to send
|
|
|
+ /// Returns: true if the message could be queued or sent, false if the queue is full
|
|
|
+ public func sendMessage(data: Data) -> Bool {
|
|
|
+ self.sendMutex.lock()
|
|
|
+ defer {self.sendMutex.unlock()}
|
|
|
+ if self.writing {
|
|
|
+ if self.pendingMessages.count == Call.maximumQueuedMessages {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ self.pendingMessages.append(data) // TODO: return something if we can't accept another message
|
|
|
+ } else {
|
|
|
+ self.writing = true
|
|
|
+ do {
|
|
|
+ try self.sendWithoutBlocking(data: data)
|
|
|
+ } catch (let callError) {
|
|
|
+ print("grpc error: \(callError)")
|
|
|
}
|
|
|
}
|
|
|
+ return true
|
|
|
}
|
|
|
|
|
|
+ /// helper for sending queued messages
|
|
|
private func sendWithoutBlocking(data: Data) throws -> Void {
|
|
|
- let messageBuffer = ByteBuffer(data:data)
|
|
|
- let operations = OperationGroup(call:self, operations:[.sendMessage(messageBuffer)])
|
|
|
+ let operations = OperationGroup(call:self, operations:[.sendMessage(ByteBuffer(data:data))])
|
|
|
{(operationGroup) in
|
|
|
if operationGroup.success {
|
|
|
self.messageDispatchQueue.async {
|