|
|
@@ -13,8 +13,6 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
-import Commander
|
|
|
-import Dispatch
|
|
|
import Foundation
|
|
|
import NIO
|
|
|
import NIOSSL
|
|
|
@@ -22,210 +20,282 @@ import GRPC
|
|
|
import GRPCSampleData
|
|
|
import EchoImplementation
|
|
|
import EchoModel
|
|
|
+import Logging
|
|
|
|
|
|
-// Common flags and options
|
|
|
-let sslFlag = Flag("ssl", description: "if true, use SSL for connections")
|
|
|
-func addressOption(_ address: String) -> Option<String> {
|
|
|
- return Option("address", default: address, description: "address of server")
|
|
|
-}
|
|
|
-
|
|
|
-let portOption = Option("port", default: 8080)
|
|
|
-let messageOption = Option("message",
|
|
|
- default: "Testing 1 2 3",
|
|
|
- description: "message to send")
|
|
|
-
|
|
|
-func makeClientTLSConfiguration() -> ClientConnection.Configuration.TLS {
|
|
|
- let caCert = SampleCertificate.ca
|
|
|
- let clientCert = SampleCertificate.client
|
|
|
- precondition(!caCert.isExpired && !clientCert.isExpired,
|
|
|
- "SSL certificates are expired. Please submit an issue at https://github.com/grpc/grpc-swift.")
|
|
|
-
|
|
|
- return .init(certificateChain: [.certificate(clientCert.certificate)],
|
|
|
- privateKey: .privateKey(SamplePrivateKey.client),
|
|
|
- trustRoots: .certificates([caCert.certificate]),
|
|
|
- certificateVerification: .noHostnameVerification)
|
|
|
-}
|
|
|
-
|
|
|
-func makeServerTLSConfiguration() -> Server.Configuration.TLS {
|
|
|
- let caCert = SampleCertificate.ca
|
|
|
- let serverCert = SampleCertificate.server
|
|
|
- precondition(!caCert.isExpired && !serverCert.isExpired,
|
|
|
- "SSL certificates are expired. Please submit an issue at https://github.com/grpc/grpc-swift.")
|
|
|
+// MARK: - Argument parsing
|
|
|
|
|
|
- return .init(certificateChain: [.certificate(serverCert.certificate)],
|
|
|
- privateKey: .privateKey(SamplePrivateKey.server),
|
|
|
- trustRoots: .certificates([caCert.certificate]))
|
|
|
+enum RPC: String {
|
|
|
+ case get
|
|
|
+ case collect
|
|
|
+ case expand
|
|
|
+ case update
|
|
|
}
|
|
|
|
|
|
-/// Create en `EchoClient` and wait for it to initialize. Returns nil if initialisation fails.
|
|
|
-func makeEchoClient(address: String, port: Int, ssl: Bool) -> Echo_EchoServiceClient? {
|
|
|
- let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
|
+enum Command {
|
|
|
+ case server(port: Int, useTLS: Bool)
|
|
|
+ case client(host: String, port: Int, useTLS: Bool, rpc: RPC, message: String)
|
|
|
|
|
|
- let configuration = ClientConnection.Configuration(
|
|
|
- target: .hostAndPort(address, port),
|
|
|
- eventLoopGroup: eventLoopGroup,
|
|
|
- tls: ssl ? makeClientTLSConfiguration() : nil)
|
|
|
+ init?(from args: [String]) {
|
|
|
+ guard !args.isEmpty else {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
|
|
|
- return Echo_EchoServiceClient(connection: ClientConnection(configuration: configuration))
|
|
|
-}
|
|
|
+ var args = args
|
|
|
+ switch args.removeFirst() {
|
|
|
+ case "server":
|
|
|
+ guard (args.count == 1 || args.count == 2),
|
|
|
+ let port = args.popLast().flatMap(Int.init),
|
|
|
+ let useTLS = Command.parseTLSArg(args.popLast())
|
|
|
+ else {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ self = .server(port: port, useTLS: useTLS)
|
|
|
+
|
|
|
+ case "client":
|
|
|
+ guard (args.count == 4 || args.count == 5),
|
|
|
+ let message = args.popLast(),
|
|
|
+ let rpc = args.popLast().flatMap(RPC.init),
|
|
|
+ let port = args.popLast().flatMap(Int.init),
|
|
|
+ let host = args.popLast(),
|
|
|
+ let useTLS = Command.parseTLSArg(args.popLast())
|
|
|
+ else {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ self = .client(host: host, port: port, useTLS: useTLS, rpc: rpc, message: message)
|
|
|
+
|
|
|
+ default:
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
-Group {
|
|
|
- $0.command("serve",
|
|
|
- sslFlag,
|
|
|
- addressOption("localhost"),
|
|
|
- portOption,
|
|
|
- description: "Run an echo server.") { ssl, address, port in
|
|
|
- let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
|
-
|
|
|
- var configuration = Server.Configuration(
|
|
|
- target: .hostAndPort(address, port),
|
|
|
- eventLoopGroup: eventLoopGroup,
|
|
|
- serviceProviders: [EchoProvider()])
|
|
|
-
|
|
|
- if ssl {
|
|
|
- print("starting secure server")
|
|
|
- configuration.tls = makeServerTLSConfiguration()
|
|
|
- } else {
|
|
|
- print("starting insecure server")
|
|
|
+ private static func parseTLSArg(_ arg: String?) -> Bool? {
|
|
|
+ switch arg {
|
|
|
+ case .some("--tls"):
|
|
|
+ return true
|
|
|
+ case .none, .some("--notls"):
|
|
|
+ return false
|
|
|
+ default:
|
|
|
+ return nil
|
|
|
}
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- let server = try! Server.start(configuration: configuration)
|
|
|
- .wait()
|
|
|
+func printUsageAndExit(program: String) -> Never {
|
|
|
+ print("""
|
|
|
+ Usage: \(program) COMMAND [OPTIONS...]
|
|
|
+
|
|
|
+ Commands:
|
|
|
+ server [--tls|--notls] PORT Starts the echo server on the given port.
|
|
|
+
|
|
|
+ client [--tls|--notls] HOST PORT RPC MESSAGE Connects to the echo server on the given host
|
|
|
+ host and port and calls the RPC with the
|
|
|
+ provided message. See below for a list of
|
|
|
+ possible RPCs.
|
|
|
+
|
|
|
+ RPCs:
|
|
|
+ * get (unary)
|
|
|
+ * collect (client streaming)
|
|
|
+ * expand (server streaming)
|
|
|
+ * update (bidirectional streaming)
|
|
|
+ """)
|
|
|
+ exit(1)
|
|
|
+}
|
|
|
|
|
|
- // This blocks to keep the main thread from finishing while the server runs,
|
|
|
- // but the server never exits. Kill the process to stop it.
|
|
|
- try server.onClose.wait()
|
|
|
+func main(args: [String]) {
|
|
|
+ var args = args
|
|
|
+ let program = args.removeFirst()
|
|
|
+ guard let command = Command(from: args) else {
|
|
|
+ printUsageAndExit(program: program)
|
|
|
}
|
|
|
|
|
|
- $0.command(
|
|
|
- "get",
|
|
|
- sslFlag,
|
|
|
- addressOption("localhost"),
|
|
|
- portOption,
|
|
|
- messageOption,
|
|
|
- description: "Perform a unary get()."
|
|
|
- ) { ssl, address, port, message in
|
|
|
- print("calling get")
|
|
|
- guard let echo = makeEchoClient(address: address, port: port, ssl: ssl) else { return }
|
|
|
-
|
|
|
- var requestMessage = Echo_EchoRequest()
|
|
|
- requestMessage.text = message
|
|
|
-
|
|
|
- print("get sending: \(requestMessage.text)")
|
|
|
- let get = echo.get(requestMessage)
|
|
|
- get.response.whenSuccess { response in
|
|
|
- print("get received: \(response.text)")
|
|
|
- }
|
|
|
+ // Reduce the logging verbosity.
|
|
|
+ LoggingSystem.bootstrap {
|
|
|
+ var handler = StreamLogHandler.standardOutput(label: $0)
|
|
|
+ handler.logLevel = .warning
|
|
|
+ return handler
|
|
|
+ }
|
|
|
|
|
|
- get.response.whenFailure { error in
|
|
|
- print("get response failed with error: \(error)")
|
|
|
- }
|
|
|
+ // Okay, we're nearly ready to start, create an `EventLoopGroup` most suitable for our platform.
|
|
|
+ let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
|
|
|
+ defer {
|
|
|
+ try! group.syncShutdownGracefully()
|
|
|
+ }
|
|
|
|
|
|
- // wait() on the status to stop the program from exiting.
|
|
|
+ // Now run the server/client.
|
|
|
+ switch command {
|
|
|
+ case let .server(port: port, useTLS: useTLS):
|
|
|
do {
|
|
|
- let status = try get.status.wait()
|
|
|
- print("get completed with status: \(status)")
|
|
|
+ try startEchoServer(group: group, port: port, useTLS: useTLS)
|
|
|
} catch {
|
|
|
- print("get status failed with error: \(error)")
|
|
|
+ print("Error running server: \(error)")
|
|
|
}
|
|
|
+
|
|
|
+ case let .client(host: host, port: port, useTLS: useTLS, rpc: rpc, message: message):
|
|
|
+ let client = makeClient(group: group, host: host, port: port, useTLS: useTLS)
|
|
|
+ callRPC(rpc, using: client, message: message)
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- $0.command(
|
|
|
- "expand",
|
|
|
- sslFlag,
|
|
|
- addressOption("localhost"),
|
|
|
- portOption,
|
|
|
- messageOption,
|
|
|
- description: "Perform a server-streaming expand()."
|
|
|
- ) { ssl, address, port, message in
|
|
|
- print("calling expand")
|
|
|
- guard let echo = makeEchoClient(address: address, port: port, ssl: ssl) else { return }
|
|
|
-
|
|
|
- let requestMessage = Echo_EchoRequest.with { $0.text = message }
|
|
|
-
|
|
|
- print("expand sending: \(requestMessage.text)")
|
|
|
- let expand = echo.expand(requestMessage) { response in
|
|
|
- print("expand received: \(response.text)")
|
|
|
- }
|
|
|
+// MARK: - Server / Client
|
|
|
+
|
|
|
+func startEchoServer(group: EventLoopGroup, port: Int, useTLS: Bool) throws {
|
|
|
+ // Configure the server:
|
|
|
+ var configuration = Server.Configuration(
|
|
|
+ target: .hostAndPort("localhost", port),
|
|
|
+ eventLoopGroup: group,
|
|
|
+ serviceProviders: [EchoProvider()]
|
|
|
+ )
|
|
|
+
|
|
|
+ if useTLS {
|
|
|
+ // We're using some self-signed certs here: check they aren't expired.
|
|
|
+ let caCert = SampleCertificate.ca
|
|
|
+ let serverCert = SampleCertificate.server
|
|
|
+ precondition(
|
|
|
+ !caCert.isExpired && !serverCert.isExpired,
|
|
|
+ "SSL certificates are expired. Please submit an issue at https://github.com/grpc/grpc-swift."
|
|
|
+ )
|
|
|
+
|
|
|
+ configuration.tls = .init(
|
|
|
+ certificateChain: [.certificate(serverCert.certificate)],
|
|
|
+ privateKey: .privateKey(SamplePrivateKey.server),
|
|
|
+ trustRoots: .certificates([caCert.certificate])
|
|
|
+ )
|
|
|
+ print("starting secure server")
|
|
|
+ } else {
|
|
|
+ print("starting insecure server")
|
|
|
+ }
|
|
|
|
|
|
- // wait() on the status to stop the program from exiting.
|
|
|
- do {
|
|
|
- let status = try expand.status.wait()
|
|
|
- print("expand completed with status: \(status)")
|
|
|
- } catch {
|
|
|
- print("expand status failed with error: \(error)")
|
|
|
- }
|
|
|
+ let server = try Server.start(configuration: configuration).wait()
|
|
|
+ print("started server: \(server.channel.localAddress!)")
|
|
|
+
|
|
|
+ // This blocks to keep the main thread from finishing while the server runs,
|
|
|
+ // but the server never exits. Kill the process to stop it.
|
|
|
+ try server.onClose.wait()
|
|
|
+}
|
|
|
+
|
|
|
+func makeClient(group: EventLoopGroup, host: String, port: Int, useTLS: Bool) -> Echo_EchoServiceClient {
|
|
|
+ // Configure the connection:
|
|
|
+ var configuration = ClientConnection.Configuration(
|
|
|
+ target: .hostAndPort(host, port),
|
|
|
+ eventLoopGroup: group
|
|
|
+ )
|
|
|
+
|
|
|
+ if useTLS {
|
|
|
+ // We're using some self-signed certs here: check they aren't expired.
|
|
|
+ let caCert = SampleCertificate.ca
|
|
|
+ let clientCert = SampleCertificate.client
|
|
|
+ precondition(
|
|
|
+ !caCert.isExpired && !clientCert.isExpired,
|
|
|
+ "SSL certificates are expired. Please submit an issue at https://github.com/grpc/grpc-swift."
|
|
|
+ )
|
|
|
+
|
|
|
+ configuration.tls = .init(
|
|
|
+ certificateChain: [.certificate(clientCert.certificate)],
|
|
|
+ privateKey: .privateKey(SamplePrivateKey.client),
|
|
|
+ trustRoots: .certificates([caCert.certificate])
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
- $0.command(
|
|
|
- "collect",
|
|
|
- sslFlag,
|
|
|
- addressOption("localhost"),
|
|
|
- portOption,
|
|
|
- messageOption,
|
|
|
- description: "Perform a client-streaming collect()."
|
|
|
- ) { ssl, address, port, message in
|
|
|
- print("calling collect")
|
|
|
- guard let echo = makeEchoClient(address: address, port: port, ssl: ssl) else { return }
|
|
|
-
|
|
|
- let collect = echo.collect()
|
|
|
-
|
|
|
- var queue = collect.newMessageQueue()
|
|
|
- for part in message.components(separatedBy: " ") {
|
|
|
- var requestMessage = Echo_EchoRequest()
|
|
|
- requestMessage.text = part
|
|
|
- print("collect sending: \(requestMessage.text)")
|
|
|
- queue = queue.flatMap { collect.sendMessage(requestMessage) }
|
|
|
- }
|
|
|
- queue.whenSuccess { collect.sendEnd(promise: nil) }
|
|
|
+ // Start the connection and create the client:
|
|
|
+ let connection = ClientConnection(configuration: configuration)
|
|
|
+ return Echo_EchoServiceClient(connection: connection)
|
|
|
+}
|
|
|
|
|
|
- collect.response.whenSuccess { respone in
|
|
|
- print("collect received: \(respone.text)")
|
|
|
+func callRPC(_ rpc: RPC, using client: Echo_EchoServiceClient, message: String) {
|
|
|
+ do {
|
|
|
+ switch rpc {
|
|
|
+ case .get:
|
|
|
+ try echoGet(client: client, message: message)
|
|
|
+ case .collect:
|
|
|
+ try echoCollect(client: client, message: message)
|
|
|
+ case .expand:
|
|
|
+ try echoExpand(client: client, message: message)
|
|
|
+ case .update:
|
|
|
+ try echoUpdate(client: client, message: message)
|
|
|
}
|
|
|
+ } catch {
|
|
|
+ print("\(rpc) RPC failed: \(error)")
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- collect.response.whenFailure { error in
|
|
|
- print("collect response failed with error: \(error)")
|
|
|
+func echoGet(client: Echo_EchoServiceClient, message: String) throws {
|
|
|
+ // Get is a unary call.
|
|
|
+ let get = client.get(.with { $0.text = message })
|
|
|
+
|
|
|
+ // Register a callback for the response:
|
|
|
+ get.response.whenComplete { result in
|
|
|
+ switch result {
|
|
|
+ case .success(let response):
|
|
|
+ print("get receieved: \(response.text)")
|
|
|
+ case .failure(let error):
|
|
|
+ print("get failed with error: \(error)")
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // wait() on the status to stop the program from exiting.
|
|
|
- do {
|
|
|
- let status = try collect.status.wait()
|
|
|
- print("collect completed with status: \(status)")
|
|
|
- } catch {
|
|
|
- print("collect status failed with error: \(error)")
|
|
|
- }
|
|
|
+ // wait() for the call to terminate
|
|
|
+ let status = try get.status.wait()
|
|
|
+ print("get completed with status: \(status.code)")
|
|
|
+}
|
|
|
+
|
|
|
+func echoCollect(client: Echo_EchoServiceClient, message: String) throws {
|
|
|
+ // Collect is a client streaming call
|
|
|
+ let collect = client.collect()
|
|
|
+
|
|
|
+ // Split the messages and map them into requests
|
|
|
+ let messages = message.components(separatedBy: " ").map { part in
|
|
|
+ Echo_EchoRequest.with { $0.text = part }
|
|
|
}
|
|
|
|
|
|
- $0.command(
|
|
|
- "update",
|
|
|
- sslFlag,
|
|
|
- addressOption("localhost"),
|
|
|
- portOption,
|
|
|
- messageOption,
|
|
|
- description: "Perform a bidirectional-streaming update()."
|
|
|
- ) { ssl, address, port, message in
|
|
|
- print("calling update")
|
|
|
- guard let echo = makeEchoClient(address: address, port: port, ssl: ssl) else { return }
|
|
|
-
|
|
|
- let update = echo.update { response in
|
|
|
- print("update received: \(response.text)")
|
|
|
+ // Stream the to the service (this can also be done on individual requests using `sendMessage`).
|
|
|
+ collect.sendMessages(messages, promise: nil)
|
|
|
+ // Close the request stream.
|
|
|
+ collect.sendEnd(promise: nil)
|
|
|
+
|
|
|
+ // Register a callback for the response:
|
|
|
+ collect.response.whenComplete { result in
|
|
|
+ switch result {
|
|
|
+ case .success(let response):
|
|
|
+ print("collect receieved: \(response.text)")
|
|
|
+ case .failure(let error):
|
|
|
+ print("collect failed with error: \(error)")
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- var queue = update.newMessageQueue()
|
|
|
- for part in message.components(separatedBy: " ") {
|
|
|
- var requestMessage = Echo_EchoRequest()
|
|
|
- requestMessage.text = part
|
|
|
- print("update sending: \(requestMessage.text)")
|
|
|
- queue = queue.flatMap { update.sendMessage(requestMessage) }
|
|
|
- }
|
|
|
- queue.whenSuccess { update.sendEnd(promise: nil) }
|
|
|
+ // wait() for the call to terminate
|
|
|
+ let status = try collect.status.wait()
|
|
|
+ print("collect completed with status: \(status.code)")
|
|
|
+}
|
|
|
|
|
|
- // wait() on the status to stop the program from exiting.
|
|
|
- do {
|
|
|
- let status = try update.status.wait()
|
|
|
- print("update completed with status: \(status)")
|
|
|
- } catch {
|
|
|
- print("update status failed with error: \(error)")
|
|
|
- }
|
|
|
+func echoExpand(client: Echo_EchoServiceClient, message: String) throws {
|
|
|
+ // Expand is a server streaming call; provide a response handler.
|
|
|
+ let expand = client.expand(.with { $0.text = message}) { response in
|
|
|
+ print("expand received: \(response.text)")
|
|
|
+ }
|
|
|
+
|
|
|
+ // wait() for the call to terminate
|
|
|
+ let status = try expand.status.wait()
|
|
|
+ print("expand completed with status: \(status.code)")
|
|
|
+}
|
|
|
+
|
|
|
+func echoUpdate(client: Echo_EchoServiceClient, message: String) throws {
|
|
|
+ // Update is a bidirectional streaming call; provide a response handler.
|
|
|
+ let update = client.update { response in
|
|
|
+ print("update received: \(response.text)")
|
|
|
}
|
|
|
-}.run()
|
|
|
+
|
|
|
+ // Split the messages and map them into requests
|
|
|
+ let messages = message.components(separatedBy: " ").map { part in
|
|
|
+ Echo_EchoRequest.with { $0.text = part }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Stream the to the service (this can also be done on individual requests using `sendMessage`).
|
|
|
+ update.sendMessages(messages, promise: nil)
|
|
|
+ // Close the request stream.
|
|
|
+ update.sendEnd(promise: nil)
|
|
|
+
|
|
|
+ // wait() for the call to terminate
|
|
|
+ let status = try update.status.wait()
|
|
|
+ print("update completed with status: \(status.code)")
|
|
|
+}
|
|
|
+
|
|
|
+main(args: CommandLine.arguments)
|