Browse Source

Merge pull request #227 from MrMage/fresh-async-queues

Spin up dedicated queues (and thus threads) for most operations
Tim Burks 7 years ago
parent
commit
87a7c1baa8

+ 5 - 5
Sources/Examples/Echo/Generated/echo.grpc.swift

@@ -319,32 +319,32 @@ internal final class Echo_EchoServer: ServiceServer {
   }
 
   /// Start the server.
-  internal override func handleMethod(_ method: String, handler: Handler, queue: DispatchQueue) throws -> Bool {
+  internal override func handleMethod(_ method: String, handler: Handler) throws -> Bool {
     let provider = self.provider
     switch method {
     case "/echo.Echo/Get":
       try Echo_EchoGetSessionBase(
         handler: handler,
         providerBlock: { try provider.get(request: $0, session: $1 as! Echo_EchoGetSessionBase) })
-          .run(queue: queue)
+          .run()
       return true
     case "/echo.Echo/Expand":
       try Echo_EchoExpandSessionBase(
         handler: handler,
         providerBlock: { try provider.expand(request: $0, session: $1 as! Echo_EchoExpandSessionBase) })
-          .run(queue: queue)
+          .run()
       return true
     case "/echo.Echo/Collect":
       try Echo_EchoCollectSessionBase(
         handler: handler,
         providerBlock: { try provider.collect(session: $0 as! Echo_EchoCollectSessionBase) })
-          .run(queue: queue)
+          .run()
       return true
     case "/echo.Echo/Update":
       try Echo_EchoUpdateSessionBase(
         handler: handler,
         providerBlock: { try provider.update(session: $0 as! Echo_EchoUpdateSessionBase) })
-          .run(queue: queue)
+          .run()
       return true
     default:
       return false

+ 6 - 1
Sources/SwiftGRPC/Core/Call.swift

@@ -104,6 +104,7 @@ public class Call {
   /// - Parameter completion: a block to call with call results
   ///     The argument to `completion` will always have `.success = true`
   ///     because operations containing `.receiveCloseOnClient` always succeed.
+  ///   Runs synchronously on the completion queue's thread. Should not be blocking.
   /// - Throws: `CallError` if fails to call.
   public func start(_ style: CallStyle,
                     metadata: Metadata,
@@ -157,7 +158,8 @@ public class Call {
 
   /// Sends a message over a streaming connection.
   ///
-  /// Parameter data: the message data to send
+  /// - Parameter data: the message data to send
+  /// - Parameter completion: Runs synchronously on the completion queue's thread once the message has been sent. Should not be blocking.
   /// - Throws: `CallError` if fails to call. `CallWarning` if blocked.
   public func sendMessage(data: Data, completion: ((Error?) -> Void)? = nil) throws {
     try sendMutex.synchronize {
@@ -202,6 +204,7 @@ public class Call {
   }
 
   // Receive a message over a streaming connection.
+  /// - Parameter completion: Runs synchronously on the completion queue's thread once the message has been received. Should not be blocking.
   /// - Throws: `CallError` if fails to call.
   public func closeAndReceiveMessage(completion: @escaping (CallResult) -> Void) throws {
     try perform(OperationGroup(call: self, operations: [.sendCloseFromClient, .receiveMessage]) { operationGroup in
@@ -210,6 +213,7 @@ public class Call {
   }
 
   // Receive a message over a streaming connection.
+  /// - Parameter completion: Runs synchronously on the completion queue's thread once the message has been received. Should not be blocking.
   /// - Throws: `CallError` if fails to call.
   public func receiveMessage(completion: @escaping (CallResult) -> Void) throws {
     try perform(OperationGroup(call: self, operations: [.receiveMessage]) { operationGroup in
@@ -218,6 +222,7 @@ public class Call {
   }
 
   // Closes a streaming connection.
+  /// - Parameter completion: Runs synchronously on the completion queue's thread once the connection has been closed. Should not be blocking.
   /// - Throws: `CallError` if fails to call.
   public func close(completion: (() -> Void)? = nil) throws {
     try perform(OperationGroup(call: self, operations: [.sendCloseFromClient],

+ 3 - 2
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -98,7 +98,7 @@ class CompletionQueue {
 
   /// Register an operation group for handling upon completion. Will throw if the queue has been shutdown already.
   ///
-  /// - Parameter operationGroup: the operation group to handle
+  /// - Parameter operationGroup: the operation group to handle.
   func register(_ operationGroup: OperationGroup, onSuccess: () throws -> Void) throws {
     try operationGroupsMutex.synchronize {
       guard !hasBeenShutdown
@@ -113,7 +113,8 @@ class CompletionQueue {
   /// - Parameter completion: a completion handler that is called when the queue stops running
   func runToCompletion(completion: (() -> Void)?) {
     // run the completion queue on a new background thread
-    DispatchQueue.global().async {
+    let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
+    spinloopThreadQueue.async {
       spinloop: while true {
         let event = cgrpc_completion_queue_get_next_event(self.underlyingCompletionQueue, 600)
         switch event.type {

+ 12 - 10
Sources/SwiftGRPC/Core/Server.swift

@@ -61,12 +61,14 @@ public class Server {
     completionQueue.shutdown()
   }
 
-  /// Run the server
-  public func run(dispatchQueue: DispatchQueue = DispatchQueue.global(),
-                  handlerFunction: @escaping (Handler) -> Void) {
+  /// Run the server.
+  ///
+  /// - Parameter handlerFunction: will be called to handle an incoming request. Dispatched on a new thread, so can be blocking.
+  public func run(handlerFunction: @escaping (Handler) -> Void) {
     cgrpc_server_start(underlyingServer)
     // run the server on a new background thread
-    dispatchQueue.async {
+    let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
+    spinloopThreadQueue.async {
       spinloop: while true {
         do {
           let handler = Handler(underlyingServer: self.underlyingServer)
@@ -86,13 +88,13 @@ public class Server {
                 _ = strongHandlerReference
                 // this will start the completion queue on a new thread
                 handler.completionQueue.runToCompletion {
-                  dispatchQueue.async {
-                    // release the handler when it finishes
-                    strongHandlerReference = nil
-                  }
+                  // release the handler when it finishes
+                  strongHandlerReference = nil
                 }
-                dispatchQueue.async {
-                  // dispatch the handler function on a separate thread
+                
+                // Dispatch the handler function on a separate thread.
+                let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
+                handlerDispatchThreadQueue.async {
                   handlerFunction(handler)
                 }
               }

+ 27 - 23
Sources/SwiftGRPC/Runtime/ServerSessionBidirectionalStreaming.swift

@@ -34,29 +34,33 @@ open class ServerSessionBidirectionalStreamingBase<InputType: Message, OutputTyp
     super.init(handler: handler)
   }
   
-  public func run(queue: DispatchQueue) throws {
-    try handler.sendMetadata(initialMetadata: initialMetadata) { success in
-      queue.async {
-        var responseStatus: ServerStatus?
-        if success {
-          do {
-            try self.providerBlock(self)
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionBidirectionalStreamingBase.run sending initial metadata failed")
-          responseStatus = .sendingInitialMetadataFailed
-        }
-        
-        if let responseStatus = responseStatus {
-          // Error encountered, notify the client.
-          do {
-            try self.handler.sendStatus(responseStatus)
-          } catch {
-            print("ServerSessionBidirectionalStreamingBase.run error sending status: \(error)")
-          }
-        }
+  public func run() throws {
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var success = false
+    try handler.sendMetadata(initialMetadata: initialMetadata) {
+      success = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    var responseStatus: ServerStatus?
+    if success {
+      do {
+        try self.providerBlock(self)
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
+      }
+    } else {
+      print("ServerSessionBidirectionalStreamingBase.run sending initial metadata failed")
+      responseStatus = .sendingInitialMetadataFailed
+    }
+    
+    if let responseStatus = responseStatus {
+      // Error encountered, notify the client.
+      do {
+        try self.handler.sendStatus(responseStatus)
+      } catch {
+        print("ServerSessionBidirectionalStreamingBase.run error sending status: \(error)")
       }
     }
   }

+ 27 - 23
Sources/SwiftGRPC/Runtime/ServerSessionClientStreaming.swift

@@ -40,29 +40,33 @@ open class ServerSessionClientStreamingBase<InputType: Message, OutputType: Mess
     try handler.sendStatus(status, completion: completion)
   }
   
-  public func run(queue: DispatchQueue) throws {
-    try handler.sendMetadata(initialMetadata: initialMetadata) { success in
-      queue.async {
-        var responseStatus: ServerStatus?
-        if success {
-          do {
-            try self.providerBlock(self)
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionClientStreamingBase.run sending initial metadata failed")
-          responseStatus = .sendingInitialMetadataFailed
-        }
-        
-        if let responseStatus = responseStatus {
-          // Error encountered, notify the client.
-          do {
-            try self.handler.sendStatus(responseStatus)
-          } catch {
-            print("ServerSessionClientStreamingBase.run error sending status: \(error)")
-          }
-        }
+  public func run() throws {
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var success = false
+    try handler.sendMetadata(initialMetadata: initialMetadata) {
+      success = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    var responseStatus: ServerStatus?
+    if success {
+      do {
+        try self.providerBlock(self)
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
+      }
+    } else {
+      print("ServerSessionClientStreamingBase.run sending initial metadata failed")
+      responseStatus = .sendingInitialMetadataFailed
+    }
+    
+    if let responseStatus = responseStatus {
+      // Error encountered, notify the client.
+      do {
+        try self.handler.sendStatus(responseStatus)
+      } catch {
+        print("ServerSessionClientStreamingBase.run error sending status: \(error)")
       }
     }
   }

+ 28 - 24
Sources/SwiftGRPC/Runtime/ServerSessionServerStreaming.swift

@@ -33,30 +33,34 @@ open class ServerSessionServerStreamingBase<InputType: Message, OutputType: Mess
     super.init(handler: handler)
   }
   
-  public func run(queue: DispatchQueue) throws {
-    try handler.receiveMessage(initialMetadata: initialMetadata) { requestData in
-      queue.async {
-        var responseStatus: ServerStatus?
-        if let requestData = requestData {
-          do {
-            let requestMessage = try InputType(serializedData: requestData)
-            try self.providerBlock(requestMessage, self)
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionServerStreamingBase.run empty request data")
-          responseStatus = .noRequestData
-        }
-        
-        if let responseStatus = responseStatus {
-          // Error encountered, notify the client.
-          do {
-            try self.handler.sendStatus(responseStatus)
-          } catch {
-            print("ServerSessionServerStreamingBase.run error sending status: \(error)")
-          }
-        }
+  public func run() throws {
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var requestData: Data?
+    try handler.receiveMessage(initialMetadata: initialMetadata) {
+      requestData = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    var responseStatus: ServerStatus?
+    if let requestData = requestData {
+      do {
+        let requestMessage = try InputType(serializedData: requestData)
+        try self.providerBlock(requestMessage, self)
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
+      }
+    } else {
+      print("ServerSessionServerStreamingBase.run no request data")
+      responseStatus = .noRequestData
+    }
+    
+    if let responseStatus = responseStatus {
+      // Error encountered, notify the client.
+      do {
+        try self.handler.sendStatus(responseStatus)
+      } catch {
+        print("ServerSessionServerStreamingBase.run error sending status: \(error)")
       }
     }
   }

+ 37 - 25
Sources/SwiftGRPC/Runtime/ServerSessionUnary.swift

@@ -31,34 +31,46 @@ open class ServerSessionUnaryBase<InputType: Message, OutputType: Message>: Serv
     super.init(handler: handler)
   }
   
-  public func run(queue: DispatchQueue) throws {
-    try handler.receiveMessage(initialMetadata: initialMetadata) { requestData in
-      queue.async {
-        let responseStatus: ServerStatus
-        if let requestData = requestData {
-          do {
-            let requestMessage = try InputType(serializedData: requestData)
-            let responseMessage = try self.providerBlock(requestMessage, self)
-            try self.handler.call.sendMessage(data: responseMessage.serializedData()) {
-              guard let error = $0
-                else { return }
-              print("ServerSessionUnaryBase.run error sending response: \(error)")
-            }
-            responseStatus = .ok
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionUnaryBase.run empty request data")
-          responseStatus = .noRequestData
-        }
+  public func run() throws {
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var requestData: Data?
+    try handler.receiveMessage(initialMetadata: initialMetadata) {
+      requestData = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    let responseStatus: ServerStatus
+    if let requestData = requestData {
+      do {
+        let requestMessage = try InputType(serializedData: requestData)
+        let responseMessage = try self.providerBlock(requestMessage, self)
         
-        do {
-          try self.handler.sendStatus(responseStatus)
-        } catch {
-          print("ServerSessionUnaryBase.run error sending status: \(error)")
+        let sendResponseSignal = DispatchSemaphore(value: 0)
+        var sendResponseError: Error?
+        try self.handler.call.sendMessage(data: responseMessage.serializedData()) {
+          sendResponseError = $0
+          sendResponseSignal.signal()
+        }
+        sendResponseSignal.wait()
+        if let sendResponseError = sendResponseError {
+          print("ServerSessionUnaryBase.run error sending response: \(sendResponseError)")
+          throw sendResponseError
         }
+        
+        responseStatus = .ok
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
       }
+    } else {
+      print("ServerSessionUnaryBase.run no request data")
+      responseStatus = .noRequestData
+    }
+    
+    do {
+      try self.handler.sendStatus(responseStatus)
+    } catch {
+      print("ServerSessionUnaryBase.run error sending status: \(error)")
     }
   }
 }

+ 3 - 3
Sources/SwiftGRPC/Runtime/ServiceServer.swift

@@ -50,10 +50,10 @@ open class ServiceServer {
 
   /// Handle the given method. Needs to be overridden by actual implementations.
   /// Returns whether the method was actually handled.
-  open func handleMethod(_ method: String, handler: Handler, queue: DispatchQueue) throws -> Bool { fatalError("needs to be overridden") }
+  open func handleMethod(_ method: String, handler: Handler) throws -> Bool { fatalError("needs to be overridden") }
 
   /// Start the server.
-  public func start(queue: DispatchQueue = DispatchQueue.global()) {
+  public func start() {
     server.run { [weak self] handler in
       guard let strongSelf = self else {
         print("ERROR: ServiceServer has been asked to handle a request even though it has already been deallocated")
@@ -71,7 +71,7 @@ open class ServiceServer {
       }
       
       do {
-        if !(try strongSelf.handleMethod(unwrappedMethod, handler: handler, queue: queue)) {
+        if !(try strongSelf.handleMethod(unwrappedMethod, handler: handler)) {
           do {
             try handler.call.perform(OperationGroup(
               call: handler.call,

+ 3 - 3
Sources/protoc-gen-swiftgrpc/Generator-Server.swift

@@ -89,7 +89,7 @@ extension Generator {
     println("}")
     println()
     println("/// Start the server.")
-    println("\(access) override func handleMethod(_ method: String, handler: Handler, queue: DispatchQueue) throws -> Bool {")
+    println("\(access) override func handleMethod(_ method: String, handler: Handler) throws -> Bool {")
     indent()
     println("let provider = self.provider")
     println("switch method {")
@@ -104,7 +104,7 @@ extension Generator {
         println("handler: handler,")
         println("providerBlock: { try provider.\(methodFunctionName)(request: $0, session: $1 as! \(methodSessionName)Base) })")
         indent()
-        println(".run(queue: queue)")
+        println(".run()")
         outdent()
         outdent()
       default:
@@ -113,7 +113,7 @@ extension Generator {
         println("handler: handler,")
         println("providerBlock: { try provider.\(methodFunctionName)(session: $0 as! \(methodSessionName)Base) })")
         indent()
-        println(".run(queue: queue)")
+        println(".run()")
         outdent()
         outdent()
       }

+ 2 - 2
Tests/SwiftGRPCTests/BasicEchoTestCase.swift

@@ -52,12 +52,12 @@ class BasicEchoTestCase: XCTestCase {
                                certificateString: certificateString,
                                keyString: String(data: keyForTests, encoding: .utf8)!,
                                provider: provider)
-      server.start(queue: DispatchQueue.global())
+      server.start()
       client = Echo_EchoServiceClient(address: address, certificates: certificateString, arguments: [.sslTargetNameOverride("example.com")])
       client.host = "example.com"
     } else {
       server = Echo_EchoServer(address: address, provider: provider)
-      server.start(queue: DispatchQueue.global())
+      server.start()
       client = Echo_EchoServiceClient(address: address, secure: false)
     }