Browse Source

Make the server dispatch each request handler on its own thread and make the `ServerSession` request handler implementations run synchronously; this saves one potentially costly asynchronous dispatch.

Daniel Alm 7 years ago
parent
commit
dc35994f21

+ 1 - 1
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -98,7 +98,7 @@ class CompletionQueue {
 
   /// Register an operation group for handling upon completion. Will throw if the queue has been shutdown already.
   ///
-  /// - Parameter operationGroup: the operation group to handle
+  /// - Parameter operationGroup: the operation group to handle. Runs synchronously on the queue's thread, so should not be blocking.
   func register(_ operationGroup: OperationGroup, onSuccess: () throws -> Void) throws {
     try operationGroupsMutex.synchronize {
       guard !hasBeenShutdown

+ 9 - 8
Sources/SwiftGRPC/Core/Server.swift

@@ -61,12 +61,13 @@ public class Server {
     completionQueue.shutdown()
   }
 
-  /// Run the server
+  /// Run the server.
+  ///
+  /// - Parameter handlerFunction: will be called to handle an incoming request. Dispatched on a new thread, so can be blocking.
   public func run(handlerFunction: @escaping (Handler) -> Void) {
     cgrpc_server_start(underlyingServer)
     // run the server on a new background thread
     let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
-    let handlerDispatchQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandler", attributes: .concurrent)
     spinloopThreadQueue.async {
       spinloop: while true {
         do {
@@ -87,13 +88,13 @@ public class Server {
                 _ = strongHandlerReference
                 // this will start the completion queue on a new thread
                 handler.completionQueue.runToCompletion {
-                  handlerDispatchQueue.async {
-                    // release the handler when it finishes
-                    strongHandlerReference = nil
-                  }
+                  // release the handler when it finishes
+                  strongHandlerReference = nil
                 }
-                handlerDispatchQueue.async {
-                  // dispatch the handler function on a separate thread
+                
+                // Dispatch the handler function on a separate thread.
+                let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
+                handlerDispatchThreadQueue.async {
                   handlerFunction(handler)
                 }
               }

+ 26 - 23
Sources/SwiftGRPC/Runtime/ServerSessionBidirectionalStreaming.swift

@@ -35,29 +35,32 @@ open class ServerSessionBidirectionalStreamingBase<InputType: Message, OutputTyp
   }
   
   public func run() throws {
-    try handler.sendMetadata(initialMetadata: initialMetadata) { success in
-      let handlerThreadQueue = DispatchQueue(label: "SwiftGRPC.ServerSessionBidirectionalStreamingBase.run.handlerThread")
-      handlerThreadQueue.async {
-        var responseStatus: ServerStatus?
-        if success {
-          do {
-            try self.providerBlock(self)
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionBidirectionalStreamingBase.run sending initial metadata failed")
-          responseStatus = .sendingInitialMetadataFailed
-        }
-        
-        if let responseStatus = responseStatus {
-          // Error encountered, notify the client.
-          do {
-            try self.handler.sendStatus(responseStatus)
-          } catch {
-            print("ServerSessionBidirectionalStreamingBase.run error sending status: \(error)")
-          }
-        }
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var success = false
+    try handler.sendMetadata(initialMetadata: initialMetadata) {
+      success = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    var responseStatus: ServerStatus?
+    if success {
+      do {
+        try self.providerBlock(self)
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
+      }
+    } else {
+      print("ServerSessionBidirectionalStreamingBase.run sending initial metadata failed")
+      responseStatus = .sendingInitialMetadataFailed
+    }
+    
+    if let responseStatus = responseStatus {
+      // Error encountered, notify the client.
+      do {
+        try self.handler.sendStatus(responseStatus)
+      } catch {
+        print("ServerSessionBidirectionalStreamingBase.run error sending status: \(error)")
       }
     }
   }

+ 26 - 23
Sources/SwiftGRPC/Runtime/ServerSessionClientStreaming.swift

@@ -41,29 +41,32 @@ open class ServerSessionClientStreamingBase<InputType: Message, OutputType: Mess
   }
   
   public func run() throws {
-    try handler.sendMetadata(initialMetadata: initialMetadata) { success in
-      let handlerThreadQueue = DispatchQueue(label: "SwiftGRPC.ServerSessionClientStreamingBase.run.handlerThread")
-      handlerThreadQueue.async {
-        var responseStatus: ServerStatus?
-        if success {
-          do {
-            try self.providerBlock(self)
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionClientStreamingBase.run sending initial metadata failed")
-          responseStatus = .sendingInitialMetadataFailed
-        }
-        
-        if let responseStatus = responseStatus {
-          // Error encountered, notify the client.
-          do {
-            try self.handler.sendStatus(responseStatus)
-          } catch {
-            print("ServerSessionClientStreamingBase.run error sending status: \(error)")
-          }
-        }
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var success = false
+    try handler.sendMetadata(initialMetadata: initialMetadata) {
+      success = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    var responseStatus: ServerStatus?
+    if success {
+      do {
+        try self.providerBlock(self)
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
+      }
+    } else {
+      print("ServerSessionClientStreamingBase.run sending initial metadata failed")
+      responseStatus = .sendingInitialMetadataFailed
+    }
+    
+    if let responseStatus = responseStatus {
+      // Error encountered, notify the client.
+      do {
+        try self.handler.sendStatus(responseStatus)
+      } catch {
+        print("ServerSessionClientStreamingBase.run error sending status: \(error)")
       }
     }
   }

+ 27 - 24
Sources/SwiftGRPC/Runtime/ServerSessionServerStreaming.swift

@@ -34,30 +34,33 @@ open class ServerSessionServerStreamingBase<InputType: Message, OutputType: Mess
   }
   
   public func run() throws {
-    try handler.receiveMessage(initialMetadata: initialMetadata) { requestData in
-      let handlerThreadQueue = DispatchQueue(label: "SwiftGRPC.ServerSessionServerStreamingBase.run.handlerThread")
-      handlerThreadQueue.async {
-        var responseStatus: ServerStatus?
-        if let requestData = requestData {
-          do {
-            let requestMessage = try InputType(serializedData: requestData)
-            try self.providerBlock(requestMessage, self)
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionServerStreamingBase.run empty request data")
-          responseStatus = .noRequestData
-        }
-        
-        if let responseStatus = responseStatus {
-          // Error encountered, notify the client.
-          do {
-            try self.handler.sendStatus(responseStatus)
-          } catch {
-            print("ServerSessionServerStreamingBase.run error sending status: \(error)")
-          }
-        }
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var requestData: Data?
+    try handler.receiveMessage(initialMetadata: initialMetadata) {
+      requestData = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    var responseStatus: ServerStatus?
+    if let requestData = requestData {
+      do {
+        let requestMessage = try InputType(serializedData: requestData)
+        try self.providerBlock(requestMessage, self)
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
+      }
+    } else {
+      print("ServerSessionServerStreamingBase.run no request data")
+      responseStatus = .noRequestData
+    }
+    
+    if let responseStatus = responseStatus {
+      // Error encountered, notify the client.
+      do {
+        try self.handler.sendStatus(responseStatus)
+      } catch {
+        print("ServerSessionServerStreamingBase.run error sending status: \(error)")
       }
     }
   }

+ 36 - 25
Sources/SwiftGRPC/Runtime/ServerSessionUnary.swift

@@ -32,34 +32,45 @@ open class ServerSessionUnaryBase<InputType: Message, OutputType: Message>: Serv
   }
   
   public func run() throws {
-    try handler.receiveMessage(initialMetadata: initialMetadata) { requestData in
-      let handlerThreadQueue = DispatchQueue(label: "SwiftGRPC.ServerSessionUnaryBase.run.handlerThread")
-      handlerThreadQueue.async {
-        let responseStatus: ServerStatus
-        if let requestData = requestData {
-          do {
-            let requestMessage = try InputType(serializedData: requestData)
-            let responseMessage = try self.providerBlock(requestMessage, self)
-            try self.handler.call.sendMessage(data: responseMessage.serializedData()) {
-              guard let error = $0
-                else { return }
-              print("ServerSessionUnaryBase.run error sending response: \(error)")
-            }
-            responseStatus = .ok
-          } catch {
-            responseStatus = (error as? ServerStatus) ?? .processingError
-          }
-        } else {
-          print("ServerSessionUnaryBase.run empty request data")
-          responseStatus = .noRequestData
-        }
+    let sendMetadataSignal = DispatchSemaphore(value: 0)
+    var requestData: Data?
+    try handler.receiveMessage(initialMetadata: initialMetadata) {
+      requestData = $0
+      sendMetadataSignal.signal()
+    }
+    sendMetadataSignal.wait()
+    
+    let responseStatus: ServerStatus
+    if let requestData = requestData {
+      do {
+        let requestMessage = try InputType(serializedData: requestData)
+        let responseMessage = try self.providerBlock(requestMessage, self)
         
-        do {
-          try self.handler.sendStatus(responseStatus)
-        } catch {
-          print("ServerSessionUnaryBase.run error sending status: \(error)")
+        let sendResponseSignal = DispatchSemaphore(value: 0)
+        var sendResponseError: Error?
+        try self.handler.call.sendMessage(data: responseMessage.serializedData()) {
+          sendResponseError = $0
+          sendResponseSignal.signal()
+        }
+        sendResponseSignal.wait()
+        if let sendResponseError = sendResponseError {
+          print("ServerSessionUnaryBase.run error sending response: \(sendResponseError)")
+          throw sendResponseError
         }
+        
+        responseStatus = .ok
+      } catch {
+        responseStatus = (error as? ServerStatus) ?? .processingError
       }
+    } else {
+      print("ServerSessionUnaryBase.run no request data")
+      responseStatus = .noRequestData
+    }
+    
+    do {
+      try self.handler.sendStatus(responseStatus)
+    } catch {
+      print("ServerSessionUnaryBase.run error sending status: \(error)")
     }
   }
 }