/* * Copyright 2016, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #if SWIFT_PACKAGE import CgRPC import Dispatch #endif import Foundation /// gRPC Server public class Server { static let handlerCallTag = 101 // These are sent by the CgRPC shim. static let stopTag = 0 static let destroyTag = 1000 /// Pointer to underlying C representation private let underlyingServer: UnsafeMutableRawPointer /// Completion queue used for server operations let completionQueue: CompletionQueue /// Optional callback when server stops serving public var onCompletion: (() -> Void)? /// Initializes a Server /// /// - Parameter address: the address where the server will listen public init(address: String) { underlyingServer = cgrpc_server_create(address) completionQueue = CompletionQueue( underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address) } /// Initializes a secure Server /// /// - Parameter address: the address where the server will listen /// - Parameter key: the private key for the server's certificates /// - Parameter certs: the server's certificates public init(address: String, key: String, certs: String, rootCerts: String? = nil) { underlyingServer = cgrpc_server_create_secure(address, key, certs, rootCerts, rootCerts == nil ? 0 : 1) completionQueue = CompletionQueue( underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address) } deinit { cgrpc_server_destroy(underlyingServer) completionQueue.shutdown() } /// Run the server. /// /// - Parameter handlerFunction: will be called to handle an incoming request. Dispatched on a new thread, so can be blocking. public func run(handlerFunction: @escaping (Handler) -> Void) { cgrpc_server_start(underlyingServer) // run the server on a new background thread let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread") spinloopThreadQueue.async { spinloop: while true { do { let handler = Handler(underlyingServer: self.underlyingServer) try handler.requestCall(tag: Server.handlerCallTag) // block while waiting for an incoming request let event = self.completionQueue.wait(timeout: 600) if event.type == .complete { if event.tag == Server.handlerCallTag { // run the handler and remove it when it finishes if event.success != 0 { // hold onto the handler while it runs var strongHandlerReference: Handler? strongHandlerReference = handler // To prevent the "Variable 'strongHandlerReference' was written to, but never read" warning. _ = strongHandlerReference // this will start the completion queue on a new thread handler.completionQueue.runToCompletion { // release the handler when it finishes strongHandlerReference = nil } // Dispatch the handler function on a separate thread. let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread") handlerDispatchThreadQueue.async { handlerFunction(handler) } } } else if event.tag == Server.stopTag || event.tag == Server.destroyTag { break spinloop } } else if event.type == .queueTimeout { // everything is fine continue } else if event.type == .queueShutdown { break spinloop } } catch { print("server call error: \(error)") break spinloop } } self.onCompletion?() } } public func stop() { cgrpc_server_stop(underlyingServer) } }