| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- /*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- import Foundation
- import gRPC
- import Darwin // for sleep()
- enum ServerError : Error {
- case endOfStream
- case error
- }
- protocol CustomEchoServer {
- func Get(request : Echo_EchoRequest) throws -> Echo_EchoResponse
- func Collect(stream : EchoCollectSession) throws -> Void
- func Expand(request : Echo_EchoRequest, stream : EchoExpandSession) throws -> Void
- func Update(stream : EchoUpdateSession) throws -> Void
- }
- // This seemed like a nice idea but doesn't work because
- // specific message types are in the protocol signatures.
- // There are also functions in the Session classes that depend
- // on specific message types.
- protocol ServerStreamingServer {
- func handle(session:EchoExpandSession, message:Echo_EchoRequest) -> Void
- }
- protocol ClientStreamingServer {
- func handle(session:EchoCollectSession, message:Echo_EchoRequest) -> Void
- func close(session:EchoCollectSession)
- }
- protocol BidiStreamingServer {
- func handle(session:EchoUpdateSession, message:Echo_EchoRequest) -> Void
- }
- // unary
- class EchoGetSession : Session {
- var handler : Handler
- var server : CustomEchoServer
- init(handler:Handler, server: CustomEchoServer) {
- self.handler = handler
- self.server = server
- }
- func run() {
- do {
- try handler.receiveMessage(initialMetadata:Metadata()) {(requestData) in
- if let requestData = requestData {
- let requestMessage = try! Echo_EchoRequest(protobuf:requestData)
- let replyMessage = try! self.server.Get(request:requestMessage)
- // calling stub
- try self.handler.sendResponse(message:replyMessage.serializeProtobuf(),
- statusCode: 0,
- statusMessage: "OK",
- trailingMetadata:Metadata())
- }
- }
- } catch (let callError) {
- print("grpc error: \(callError)")
- }
- }
- }
- // server streaming
- class EchoExpandSession : Session {
- var handler : Handler
- var server : CustomEchoServer
- init(handler:Handler, server: CustomEchoServer) {
- self.handler = handler
- self.server = server
- }
- func Send(_ response: Echo_EchoResponse) throws {
- try! handler.sendResponse(message:response.serializeProtobuf()) {}
- }
- func run() {
- do {
- try handler.receiveMessage(initialMetadata:Metadata()) {(requestData) in
- if let requestData = requestData {
- let requestMessage = try! Echo_EchoRequest(protobuf:requestData)
- try self.server.Expand(request:requestMessage, stream: self)
- try! self.handler.sendStatus(statusCode:0,
- statusMessage:"OK",
- trailingMetadata:Metadata(),
- completion:{})
- }
- }
- } catch (let callError) {
- print("grpc error: \(callError)")
- }
- }
- }
- // client streaming
- class EchoCollectSession : Session {
- var handler : Handler
- var server : CustomEchoServer
- init(handler:Handler, server: CustomEchoServer) {
- self.handler = handler
- self.server = server
- }
- func Recv() throws -> Echo_EchoRequest {
- print("collect awaiting message")
- let done = NSCondition()
- var requestMessage : Echo_EchoRequest?
- try self.handler.receiveMessage() {(requestData) in
- print("collect received message")
- if let requestData = requestData {
- requestMessage = try! Echo_EchoRequest(protobuf:requestData)
- }
- done.lock()
- done.signal()
- done.unlock()
- }
- done.lock()
- done.wait()
- done.unlock()
- if requestMessage == nil {
- throw ServerError.endOfStream
- }
- return requestMessage!
- }
- func SendAndClose(_ response: Echo_EchoResponse) throws {
- try! self.handler.sendResponse(message:response.serializeProtobuf(),
- statusCode: 0,
- statusMessage: "OK",
- trailingMetadata: Metadata())
- }
- func sendMessage(message:Echo_EchoResponse) -> Void {
- try! self.handler.sendResponse(message:message.serializeProtobuf(),
- statusCode: 0,
- statusMessage: "OK",
- trailingMetadata: Metadata())
- }
- func run() {
- do {
- print("EchoCollectSession run")
- try self.handler.sendMetadata(initialMetadata:Metadata()) {
- try self.server.Collect(stream:self)
- }
- } catch (let callError) {
- print("grpc error: \(callError)")
- }
- }
- }
- // fully streaming
- class EchoUpdateSession : Session {
- var handler : Handler
- var server : CustomEchoServer
- init(handler:Handler, server: CustomEchoServer) {
- self.handler = handler
- self.server = server
- }
- func Recv() throws -> Echo_EchoRequest {
- print("update awaiting message")
- let done = NSCondition()
- var requestMessage : Echo_EchoRequest?
- try self.handler.receiveMessage() {(requestData) in
- print("update received message")
- if let requestData = requestData {
- requestMessage = try! Echo_EchoRequest(protobuf:requestData)
- }
- done.lock()
- done.signal()
- done.unlock()
- }
- done.lock()
- done.wait()
- done.unlock()
- if requestMessage == nil {
- throw ServerError.endOfStream
- }
- return requestMessage!
- }
- func Send(_ response: Echo_EchoResponse) throws {
- try handler.sendResponse(message:response.serializeProtobuf()) {}
- }
- func sendMessage(message:Echo_EchoResponse) -> Void {
- try! handler.sendResponse(message:message.serializeProtobuf()) {}
- }
- func Close() {
- let done = NSCondition()
- try! self.handler.sendStatus(statusCode: 0,
- statusMessage: "OK",
- trailingMetadata: Metadata()) {
- done.lock()
- done.signal()
- done.unlock()
- }
- done.lock()
- done.wait()
- done.unlock()
- }
- func run() {
- do {
- try self.handler.sendMetadata(initialMetadata:Metadata()) {
- try self.server.Update(stream:self)
- }
- } catch (let callError) {
- print("grpc error: \(callError)")
- }
- }
- }
- class EchoServer {
- private var address: String
- private var server: Server
- public var myServer: MyEchoServer!
- init(address:String, secure:Bool) {
- gRPC.initialize()
- self.address = address
- if secure {
- let certificateURL = Bundle.main.url(forResource: "ssl", withExtension: "crt")!
- let certificate = try! String(contentsOf: certificateURL)
- let keyURL = Bundle.main.url(forResource: "ssl", withExtension: "key")!
- let key = try! String(contentsOf: keyURL)
- self.server = gRPC.Server(address:address, key:key, certs:certificate)
- } else {
- self.server = gRPC.Server(address:address)
- }
- self.myServer = MyEchoServer()
- }
- func start() {
- print("Server Starting")
- print("GRPC version " + gRPC.version())
- server.run {(handler) in
- print("Server received request to " + handler.host
- + " calling " + handler.method
- + " from " + handler.caller)
- if (handler.method == "/echo.Echo/Get") {
- handler.session = EchoGetSession(handler:handler,
- server:self.myServer)
- handler.session.run()
- }
- else if (handler.method == "/echo.Echo/Expand") {
- handler.session = EchoExpandSession(handler:handler,
- server:self.myServer)
- handler.session.run()
- }
- else if (handler.method == "/echo.Echo/Collect") {
- handler.session = EchoCollectSession(handler:handler,
- server:self.myServer)
- handler.session.run()
- }
- else if (handler.method == "/echo.Echo/Update") {
- handler.session = EchoUpdateSession(handler:handler,
- server:self.myServer)
- handler.session.run()
- }
- }
- }
- }
- // The following code is for developer/users to edit.
- // Everything above these lines is intended to be preexisting or generated.
- class MyEchoServer : CustomEchoServer {
- func Get(request : Echo_EchoRequest) throws -> Echo_EchoResponse {
- print("Get received: \(request.text)")
- return Echo_EchoResponse(text:"Swift echo get: " + request.text)
- }
- func Expand(request : Echo_EchoRequest, stream : EchoExpandSession) throws -> Void {
- print("Expand received: \(request.text)")
- let parts = request.text.components(separatedBy: " ")
- var i = 0
- for part in parts {
- try stream.Send(Echo_EchoResponse(text:"Swift echo expand (\(i)): \(part)"))
- i += 1
- sleep(1)
- }
- }
- func Collect(stream : EchoCollectSession) throws -> Void {
- DispatchQueue.global().async {
- print("Called collect")
- var parts : [String] = []
- while true {
- do {
- let request = try stream.Recv()
- print("Collect received: \(request.text)")
- parts.append(request.text)
- } catch ServerError.endOfStream {
- break
- } catch (let error) {
- print("\(error)")
- }
- }
- print("sending collect response")
- let response = Echo_EchoResponse(text:"Swift echo collect: " + parts.joined(separator: " "))
- try! stream.SendAndClose(response)
- }
- }
- func Update(stream : EchoUpdateSession) throws -> Void {
- DispatchQueue.global().async {
- var count = 0
- while true {
- do {
- let request = try stream.Recv()
- print("Update received: \(request.text)")
- count += 1
- try stream.Send(Echo_EchoResponse(text:"Swift echo update (\(count)): \(request.text)"))
- } catch ServerError.endOfStream {
- break
- } catch (let error) {
-
- }
- }
- stream.Close()
- }
- }
- }
|