|
@@ -33,16 +33,21 @@ public class Server {
|
|
|
/// Completion queue used for server operations
|
|
/// Completion queue used for server operations
|
|
|
let completionQueue: CompletionQueue
|
|
let completionQueue: CompletionQueue
|
|
|
|
|
|
|
|
|
|
+ /// Delay for which the spin loop should wait before starting over.
|
|
|
|
|
+ let loopTimeout: TimeInterval
|
|
|
|
|
+
|
|
|
/// Optional callback when server stops serving
|
|
/// Optional callback when server stops serving
|
|
|
public var onCompletion: (() -> Void)?
|
|
public var onCompletion: (() -> Void)?
|
|
|
|
|
|
|
|
/// Initializes a Server
|
|
/// Initializes a Server
|
|
|
///
|
|
///
|
|
|
/// - Parameter address: the address where the server will listen
|
|
/// - Parameter address: the address where the server will listen
|
|
|
- public init(address: String) {
|
|
|
|
|
|
|
+ /// - Parameter loopTimeout: delay for which the spin loop should wait before starting over.
|
|
|
|
|
+ public init(address: String, loopTimeout: TimeInterval = 600) {
|
|
|
underlyingServer = cgrpc_server_create(address)
|
|
underlyingServer = cgrpc_server_create(address)
|
|
|
completionQueue = CompletionQueue(
|
|
completionQueue = CompletionQueue(
|
|
|
underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address)
|
|
underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address)
|
|
|
|
|
+ self.loopTimeout = loopTimeout
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Initializes a secure Server
|
|
/// Initializes a secure Server
|
|
@@ -51,10 +56,12 @@ public class Server {
|
|
|
/// - Parameter key: the private key for the server's certificates
|
|
/// - Parameter key: the private key for the server's certificates
|
|
|
/// - Parameter certs: the server's certificates
|
|
/// - Parameter certs: the server's certificates
|
|
|
/// - Parameter rootCerts: used to validate client certificates; will enable enforcing valid client certificates when provided
|
|
/// - Parameter rootCerts: used to validate client certificates; will enable enforcing valid client certificates when provided
|
|
|
- public init(address: String, key: String, certs: String, rootCerts: String? = nil) {
|
|
|
|
|
|
|
+ /// - Parameter loopTimeout: delay for which the spin loop should wait before starting over.
|
|
|
|
|
+ public init(address: String, key: String, certs: String, rootCerts: String? = nil, loopTimeout: TimeInterval = 600) {
|
|
|
underlyingServer = cgrpc_server_create_secure(address, key, certs, rootCerts, rootCerts == nil ? 0 : 1)
|
|
underlyingServer = cgrpc_server_create_secure(address, key, certs, rootCerts, rootCerts == nil ? 0 : 1)
|
|
|
completionQueue = CompletionQueue(
|
|
completionQueue = CompletionQueue(
|
|
|
underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address)
|
|
underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address)
|
|
|
|
|
+ self.loopTimeout = loopTimeout
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
deinit {
|
|
deinit {
|
|
@@ -70,13 +77,17 @@ public class Server {
|
|
|
// run the server on a new background thread
|
|
// run the server on a new background thread
|
|
|
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
|
|
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
|
|
|
spinloopThreadQueue.async {
|
|
spinloopThreadQueue.async {
|
|
|
- spinloop: while true {
|
|
|
|
|
- do {
|
|
|
|
|
- let handler = Handler(underlyingServer: self.underlyingServer)
|
|
|
|
|
- try handler.requestCall(tag: Server.handlerCallTag)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ do {
|
|
|
|
|
+ // Allocate a handler _outside_ the spin loop, as we must use _this particular_ handler to serve the next call
|
|
|
|
|
+ // once we have called `handler.requestCall`. In particular, we need to keep the current handler for the next
|
|
|
|
|
+ // spin loop interation when we hit the `.queueTimeout` case. The handler should only be replaced once it is
|
|
|
|
|
+ // "used up" for serving an incoming call.
|
|
|
|
|
+ var handler = Handler(underlyingServer: self.underlyingServer)
|
|
|
|
|
+ // Tell gRPC to store the next call's information in this handler object.
|
|
|
|
|
+ try handler.requestCall(tag: Server.handlerCallTag)
|
|
|
|
|
+ spinloop: while true {
|
|
|
// block while waiting for an incoming request
|
|
// block while waiting for an incoming request
|
|
|
- let event = self.completionQueue.wait(timeout: 600)
|
|
|
|
|
|
|
+ let event = self.completionQueue.wait(timeout: self.loopTimeout)
|
|
|
|
|
|
|
|
if event.type == .complete {
|
|
if event.type == .complete {
|
|
|
if event.tag == Server.handlerCallTag {
|
|
if event.tag == Server.handlerCallTag {
|
|
@@ -95,23 +106,29 @@ public class Server {
|
|
|
|
|
|
|
|
// Dispatch the handler function on a separate thread.
|
|
// Dispatch the handler function on a separate thread.
|
|
|
let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
|
|
let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
|
|
|
|
|
+ // Needs to be copied, because we will change the value of `handler` right after this.
|
|
|
|
|
+ let handlerCopy = handler
|
|
|
handlerDispatchThreadQueue.async {
|
|
handlerDispatchThreadQueue.async {
|
|
|
- handlerFunction(handler)
|
|
|
|
|
|
|
+ handlerFunction(handlerCopy)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // This handler has now been "used up" for the current call; replace it with a fresh one for the next
|
|
|
|
|
+ // loop iteration.
|
|
|
|
|
+ handler = Handler(underlyingServer: self.underlyingServer)
|
|
|
|
|
+ try handler.requestCall(tag: Server.handlerCallTag)
|
|
|
} else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
|
|
} else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
|
|
|
break spinloop
|
|
break spinloop
|
|
|
}
|
|
}
|
|
|
} else if event.type == .queueTimeout {
|
|
} else if event.type == .queueTimeout {
|
|
|
- // everything is fine
|
|
|
|
|
|
|
+ // Everything is fine, just start over *while continuing to use the existing handler*.
|
|
|
continue
|
|
continue
|
|
|
} else if event.type == .queueShutdown {
|
|
} else if event.type == .queueShutdown {
|
|
|
break spinloop
|
|
break spinloop
|
|
|
}
|
|
}
|
|
|
- } catch {
|
|
|
|
|
- print("server call error: \(error)")
|
|
|
|
|
- break spinloop
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+ } catch {
|
|
|
|
|
+ print("server call error: \(error)")
|
|
|
}
|
|
}
|
|
|
self.onCompletion?()
|
|
self.onCompletion?()
|
|
|
}
|
|
}
|