浏览代码

synchronization improvements (jconverse@)

Tim Burks 9 年之前
父节点
当前提交
b3547f18c0

+ 37 - 34
Packages/gRPC/Sources/Call.swift

@@ -35,15 +35,6 @@
 #endif
 import Foundation
 
-/// Singleton class that provides a mutex for synchronizing calls to cgrpc_call_perform()
-private class CallLock {
-  var mutex : Mutex
-  private init() {
-    mutex = Mutex()
-  }
-  static let sharedInstance = CallLock()
-}
-
 public enum CallError : Error {
   case ok
   case unknown
@@ -110,6 +101,9 @@ public struct CallResult {
 /// A gRPC API call
 public class Call {
 
+  /// Shared mutex for synchronizing calls to cgrpc_call_perform()
+  static let callMutex = Mutex()
+
   /// Pointer to underlying C representation
   private var underlyingCall : UnsafeMutableRawPointer
 
@@ -125,6 +119,12 @@ public class Call {
   /// True if a message write operation is underway
   private var writing : Bool
 
+  /// Mutex for synchronizing message sending
+  private var sendMutex : Mutex
+
+  /// Dispatch queue used for sending messages asynchronously
+  private var messageDispatchQueue: DispatchQueue = DispatchQueue.global()
+
   /// Initializes a Call representation
   ///
   /// - Parameter call: the underlying C representation
@@ -135,6 +135,7 @@ public class Call {
     self.completionQueue = completionQueue
     self.pendingMessages = []
     self.writing = false
+    self.sendMutex = Mutex()
   }
 
   deinit {
@@ -149,10 +150,9 @@ public class Call {
   /// - Returns: the result of initiating the call
   func perform(_ operations: OperationGroup) throws -> Void {
     completionQueue.register(operations)
-    let mutex = CallLock.sharedInstance.mutex
-    mutex.lock()
+    Call.callMutex.lock()
     let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
-    mutex.unlock()
+    Call.callMutex.unlock()
     if error != GRPC_CALL_OK {
       throw CallError.callError(grpcCallError:error)
     }
@@ -203,20 +203,19 @@ public class Call {
   }
 
   // send a message over a streaming connection
-  public func sendMessage(data: Data)
-    -> Void {
-      DispatchQueue.main.async {
-        if self.writing {
-          self.pendingMessages.append(data) // TODO: return something if we can't accept another message
-        } else {
-          self.writing = true
-          do {
-            try self.sendWithoutBlocking(data: data)
-          } catch (let callError) {
-            print("grpc error: \(callError)")
-          }
+  public func sendMessage(data: Data) -> Void {
+    self.sendMutex.synchronize {
+      if self.writing {
+        self.pendingMessages.append(data) // TODO: return something if we can't accept another message
+      } else {
+        self.writing = true
+        do {
+          try self.sendWithoutBlocking(data: data)
+        } catch (let callError) {
+          print("grpc error: \(callError)")
         }
       }
+    }
   }
 
   private func sendWithoutBlocking(data: Data) throws -> Void {
@@ -224,17 +223,21 @@ public class Call {
     let operations = OperationGroup(call:self, operations:[.sendMessage(messageBuffer)])
     {(operationGroup) in
       if operationGroup.success {
-        DispatchQueue.main.async {
-          if self.pendingMessages.count > 0 {
-            let nextMessage = self.pendingMessages.first!
-            self.pendingMessages.removeFirst()
-            do {
-              try self.sendWithoutBlocking(data: nextMessage)
-            } catch (let callError) {
-              print("grpc error: \(callError)")
+        self.messageDispatchQueue.async {
+          self.sendMutex.synchronize {
+            // if there are messages pending, send the next one
+            if self.pendingMessages.count > 0 {
+              let nextMessage = self.pendingMessages.first!
+              self.pendingMessages.removeFirst()
+              do {
+                try self.sendWithoutBlocking(data: nextMessage)
+              } catch (let callError) {
+                print("grpc error: \(callError)")
+              }
+            } else {
+              // otherwise, we are finished writing
+              self.writing = false
             }
-          } else {
-            self.writing = false
           }
         }
       } else {

+ 22 - 7
Packages/gRPC/Sources/CompletionQueue.swift

@@ -81,6 +81,9 @@ internal class CompletionQueue {
   /// Operation groups that are awaiting completion, keyed by tag
   private var operationGroups : [Int64 : OperationGroup] = [:]
 
+  /// Mutex for synchronizing access to operationGroups
+  private var operationGroupsMutex : Mutex = Mutex()
+
   /// Initializes a CompletionQueue
   ///
   /// - Parameter cq: the underlying C representation
@@ -102,13 +105,18 @@ internal class CompletionQueue {
   ///
   /// - Parameter operationGroup: the operation group to handle
   internal func register(_ operationGroup:OperationGroup) -> Void {
+    operationGroupsMutex.lock()
     operationGroups[operationGroup.tag] = operationGroup
+    operationGroupsMutex.unlock()
   }
 
   /// Runs a completion queue and call a completion handler when finished
   ///
-  /// - Parameter: a completion handler that is called when the queue stops running
-  internal func runToCompletion(_ completion:@escaping () -> Void) {
+  /// - Parameter callbackQueue: a DispatchQueue to use to call the completion handler
+  /// - Parameter completion: a completion handler that is called when the queue stops running
+  internal func runToCompletion(callbackQueue:DispatchQueue? = DispatchQueue.main,
+                                _ completion:@escaping () -> Void) {
+    // run the completion queue on a new background thread
     DispatchQueue.global().async {
       var running = true
       while (running) {
@@ -116,7 +124,10 @@ internal class CompletionQueue {
         switch (event.type) {
         case GRPC_OP_COMPLETE:
           let tag = cgrpc_event_tag(event)
-          if let operationGroup = self.operationGroups[tag] {
+          self.operationGroupsMutex.lock()
+          let operationGroup = self.operationGroups[tag]
+          self.operationGroupsMutex.unlock()
+          if let operationGroup = operationGroup {
             // call the operation group completion handler
             do {
               operationGroup.success = (event.success == 1)
@@ -124,7 +135,9 @@ internal class CompletionQueue {
             } catch (let callError) {
               print("grpc error: \(callError)")
             }
+            self.operationGroupsMutex.lock()
             self.operationGroups[tag] = nil
+            self.operationGroupsMutex.unlock()
           }
           break
         case GRPC_QUEUE_SHUTDOWN:
@@ -136,16 +149,18 @@ internal class CompletionQueue {
           break
         }
       }
-      DispatchQueue.main.async {
-        // when the queue stops running, call the queue completion handler
-        completion()
+      if let callbackQueue = callbackQueue {
+        callbackQueue.async {
+          // when the queue stops running, call the queue completion handler
+          completion()
+        }
       }
     }
   }
 
   /// Runs a completion queue
   internal func run() -> Void {
-    self.runToCompletion() {}
+    self.runToCompletion(callbackQueue:nil) {}
   }
 
   /// Shuts down a completion queue

+ 1 - 1
Packages/gRPC/Sources/Handler.swift

@@ -139,7 +139,7 @@ public class Handler {
     try call.perform(operations)
   }
 
-  /// Shutdown the handler's completion queue
+  /// Shuts down the handler's completion queue
   public func shutdown() {
     completionQueue.shutdown()
   }

+ 9 - 0
Packages/gRPC/Sources/Mutex.swift

@@ -67,4 +67,13 @@ public class Mutex {
   public func unlock() {
     cgrpc_mutex_unlock(underlyingMutex);
   }
+
+  /// Runs a block within a locked mutex
+  ///
+  /// Parameter block: the code to run while the mutex is locked
+  public func synchronize(block:() -> Void) {
+    lock()
+    block()
+    unlock()
+  }
 }

+ 5 - 12
Packages/gRPC/Sources/OperationGroup.swift

@@ -34,18 +34,12 @@
   import CgRPC
 #endif
 
-/// Singleton class that provides a mutex for synchronizing tag generation
-private class OperationGroupTagLock {
-  var mutex : Mutex
-  private init() {
-    mutex = Mutex()
-  }
-  static let sharedInstance = OperationGroupTagLock()
-}
-
 /// A collection of gRPC operations
 internal class OperationGroup {
 
+  /// A mutex for synchronizing tag generation
+  static let tagMutex = Mutex()
+
   /// Used to generate unique tags for OperationGroups
   private static var nextTag : Int64 = 1
 
@@ -110,11 +104,10 @@ internal class OperationGroup {
     self.operations = operations
     self.completion = completion
     // set tag to a unique value (per execution)
-    let mutex = OperationGroupTagLock.sharedInstance.mutex
-    mutex.lock()
+    OperationGroup.tagMutex.lock()
     self.tag = OperationGroup.nextTag
     OperationGroup.nextTag += 1
-    mutex.unlock()
+    OperationGroup.tagMutex.unlock()
     // create underlying observers and operations
     self.underlyingOperations = cgrpc_operations_create()
     cgrpc_operations_reserve_space_for_operations(self.underlyingOperations, Int32(operations.count))

+ 20 - 6
Packages/gRPC/Sources/Server.swift

@@ -45,7 +45,10 @@ public class Server {
   var completionQueue: CompletionQueue
 
   /// Active handlers
-  var handlers : NSMutableSet!
+  private var handlers : NSMutableSet!
+
+  /// Mutex for synchronizing access to handlers
+  private var handlersMutex : Mutex = Mutex()
 
   /// Optional callback when server stops serving
   private var onCompletion: (() -> Void)!
@@ -77,9 +80,11 @@ public class Server {
   }
 
   /// Run the server
-  public func run(handlerFunction: @escaping (Handler) -> Void) {
+  public func run(dispatchQueue: DispatchQueue = DispatchQueue.global(),
+                  handlerFunction: @escaping (Handler) -> Void) {
     cgrpc_server_start(underlyingServer);
-    DispatchQueue.global().async {
+    // run the server on a new background thread
+    dispatchQueue.async {
       var running = true
       while(running) {
         do {
@@ -91,9 +96,18 @@ public class Server {
             if event.tag == 101 {
               // run the handler and remove it when it finishes
               if event.success != 0 {
-                self.handlers.add(handler)
-                handler.completionQueue.runToCompletion() {
-                  self.handlers.remove(handler)
+                // hold onto the handler while it runs
+                self.handlersMutex.synchronize {
+                  self.handlers.add(handler)
+                }
+                // this will start the completion queue on a new thread
+                handler.completionQueue.runToCompletion(callbackQueue:dispatchQueue) {
+                  dispatchQueue.async {
+                    self.handlersMutex.synchronize {
+                      // release the handler when it finishes
+                      self.handlers.remove(handler)
+                    }
+                  }
                 }
                 handlerFunction(handler)
               }