|
|
@@ -102,8 +102,7 @@ class CompletionQueue {
|
|
|
completion: (() -> Void)?) {
|
|
|
// run the completion queue on a new background thread
|
|
|
DispatchQueue.global().async {
|
|
|
- var running = true
|
|
|
- while running {
|
|
|
+ spinloop: while true {
|
|
|
let event = cgrpc_completion_queue_get_next_event(self.underlyingCompletionQueue, -1.0)
|
|
|
switch event.type {
|
|
|
case GRPC_OP_COMPLETE:
|
|
|
@@ -118,22 +117,25 @@ class CompletionQueue {
|
|
|
self.operationGroupsMutex.synchronize {
|
|
|
self.operationGroups[tag] = nil
|
|
|
}
|
|
|
+ } else {
|
|
|
+ print("CompletionQueue.runToCompletion error: operation group with tag \(tag) not found")
|
|
|
}
|
|
|
- break
|
|
|
case GRPC_QUEUE_SHUTDOWN:
|
|
|
- running = false
|
|
|
- for operationGroup in self.operationGroups.values {
|
|
|
+ self.operationGroupsMutex.lock()
|
|
|
+ let currentOperationGroups = self.operationGroups
|
|
|
+ self.operationGroups = [:]
|
|
|
+ self.operationGroupsMutex.unlock()
|
|
|
+
|
|
|
+ for operationGroup in currentOperationGroups.values {
|
|
|
operationGroup.success = false
|
|
|
operationGroup.completion?(operationGroup)
|
|
|
}
|
|
|
- self.operationGroupsMutex.synchronize {
|
|
|
- self.operationGroups = [:]
|
|
|
- }
|
|
|
- break
|
|
|
+ break spinloop
|
|
|
case GRPC_QUEUE_TIMEOUT:
|
|
|
- break
|
|
|
+ continue spinloop
|
|
|
default:
|
|
|
- break
|
|
|
+ print("CompletionQueue.runToCompletion error: unknown event type \(event.type)")
|
|
|
+ break spinloop
|
|
|
}
|
|
|
}
|
|
|
if let completion = completion {
|