EchoServer.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. /*
  2. *
  3. * Copyright 2016, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. import Foundation
  34. import gRPC
  35. import Darwin // for sleep()
  36. enum ServerError : Error {
  37. case endOfStream
  38. case error
  39. }
  40. protocol CustomEchoServer {
  41. func Get(request : Echo_EchoRequest) throws -> Echo_EchoResponse
  42. func Collect(stream : EchoCollectSession) throws -> Void
  43. func Expand(request : Echo_EchoRequest, stream : EchoExpandSession) throws -> Void
  44. func Update(stream : EchoUpdateSession) throws -> Void
  45. }
  46. // This seemed like a nice idea but doesn't work because
  47. // specific message types are in the protocol signatures.
  48. // There are also functions in the Session classes that depend
  49. // on specific message types.
  50. protocol ServerStreamingServer {
  51. func handle(session:EchoExpandSession, message:Echo_EchoRequest) -> Void
  52. }
  53. protocol ClientStreamingServer {
  54. func handle(session:EchoCollectSession, message:Echo_EchoRequest) -> Void
  55. func close(session:EchoCollectSession)
  56. }
  57. protocol BidiStreamingServer {
  58. func handle(session:EchoUpdateSession, message:Echo_EchoRequest) -> Void
  59. }
  60. // unary
  61. class EchoGetSession : Session {
  62. var handler : Handler
  63. var server : CustomEchoServer
  64. init(handler:Handler, server: CustomEchoServer) {
  65. self.handler = handler
  66. self.server = server
  67. }
  68. func run() {
  69. do {
  70. try handler.receiveMessage(initialMetadata:Metadata()) {(requestData) in
  71. if let requestData = requestData {
  72. let requestMessage = try! Echo_EchoRequest(protobuf:requestData)
  73. let replyMessage = try! self.server.Get(request:requestMessage)
  74. // calling stub
  75. try self.handler.sendResponse(message:replyMessage.serializeProtobuf(),
  76. statusCode: 0,
  77. statusMessage: "OK",
  78. trailingMetadata:Metadata())
  79. }
  80. }
  81. } catch (let callError) {
  82. print("grpc error: \(callError)")
  83. }
  84. }
  85. }
  86. // server streaming
  87. class EchoExpandSession : Session {
  88. var handler : Handler
  89. var server : CustomEchoServer
  90. init(handler:Handler, server: CustomEchoServer) {
  91. self.handler = handler
  92. self.server = server
  93. }
  94. func Send(_ response: Echo_EchoResponse) throws {
  95. try! handler.sendResponse(message:response.serializeProtobuf()) {}
  96. }
  97. func run() {
  98. do {
  99. try handler.receiveMessage(initialMetadata:Metadata()) {(requestData) in
  100. if let requestData = requestData {
  101. let requestMessage = try! Echo_EchoRequest(protobuf:requestData)
  102. try self.server.Expand(request:requestMessage, stream: self)
  103. try! self.handler.sendStatus(statusCode:0,
  104. statusMessage:"OK",
  105. trailingMetadata:Metadata(),
  106. completion:{})
  107. }
  108. }
  109. } catch (let callError) {
  110. print("grpc error: \(callError)")
  111. }
  112. }
  113. }
  114. // client streaming
  115. class EchoCollectSession : Session {
  116. var handler : Handler
  117. var server : CustomEchoServer
  118. init(handler:Handler, server: CustomEchoServer) {
  119. self.handler = handler
  120. self.server = server
  121. }
  122. func Recv() throws -> Echo_EchoRequest {
  123. print("collect awaiting message")
  124. let done = NSCondition()
  125. var requestMessage : Echo_EchoRequest?
  126. try self.handler.receiveMessage() {(requestData) in
  127. print("collect received message")
  128. if let requestData = requestData {
  129. requestMessage = try! Echo_EchoRequest(protobuf:requestData)
  130. }
  131. done.lock()
  132. done.signal()
  133. done.unlock()
  134. }
  135. done.lock()
  136. done.wait()
  137. done.unlock()
  138. if requestMessage == nil {
  139. throw ServerError.endOfStream
  140. }
  141. return requestMessage!
  142. }
  143. func SendAndClose(_ response: Echo_EchoResponse) throws {
  144. try! self.handler.sendResponse(message:response.serializeProtobuf(),
  145. statusCode: 0,
  146. statusMessage: "OK",
  147. trailingMetadata: Metadata())
  148. }
  149. func sendMessage(message:Echo_EchoResponse) -> Void {
  150. try! self.handler.sendResponse(message:message.serializeProtobuf(),
  151. statusCode: 0,
  152. statusMessage: "OK",
  153. trailingMetadata: Metadata())
  154. }
  155. func run() {
  156. do {
  157. print("EchoCollectSession run")
  158. try self.handler.sendMetadata(initialMetadata:Metadata()) {
  159. try self.server.Collect(stream:self)
  160. }
  161. } catch (let callError) {
  162. print("grpc error: \(callError)")
  163. }
  164. }
  165. }
  166. // fully streaming
  167. class EchoUpdateSession : Session {
  168. var handler : Handler
  169. var server : CustomEchoServer
  170. init(handler:Handler, server: CustomEchoServer) {
  171. self.handler = handler
  172. self.server = server
  173. }
  174. func Recv() throws -> Echo_EchoRequest {
  175. print("update awaiting message")
  176. let done = NSCondition()
  177. var requestMessage : Echo_EchoRequest?
  178. try self.handler.receiveMessage() {(requestData) in
  179. print("update received message")
  180. if let requestData = requestData {
  181. requestMessage = try! Echo_EchoRequest(protobuf:requestData)
  182. }
  183. done.lock()
  184. done.signal()
  185. done.unlock()
  186. }
  187. done.lock()
  188. done.wait()
  189. done.unlock()
  190. if requestMessage == nil {
  191. throw ServerError.endOfStream
  192. }
  193. return requestMessage!
  194. }
  195. func Send(_ response: Echo_EchoResponse) throws {
  196. try handler.sendResponse(message:response.serializeProtobuf()) {}
  197. }
  198. func sendMessage(message:Echo_EchoResponse) -> Void {
  199. try! handler.sendResponse(message:message.serializeProtobuf()) {}
  200. }
  201. func Close() {
  202. let done = NSCondition()
  203. try! self.handler.sendStatus(statusCode: 0,
  204. statusMessage: "OK",
  205. trailingMetadata: Metadata()) {
  206. done.lock()
  207. done.signal()
  208. done.unlock()
  209. }
  210. done.lock()
  211. done.wait()
  212. done.unlock()
  213. }
  214. func run() {
  215. do {
  216. try self.handler.sendMetadata(initialMetadata:Metadata()) {
  217. try self.server.Update(stream:self)
  218. }
  219. } catch (let callError) {
  220. print("grpc error: \(callError)")
  221. }
  222. }
  223. }
  224. class EchoServer {
  225. private var address: String
  226. private var server: Server
  227. public var myServer: MyEchoServer!
  228. init(address:String, secure:Bool) {
  229. gRPC.initialize()
  230. self.address = address
  231. if secure {
  232. let certificateURL = Bundle.main.url(forResource: "ssl", withExtension: "crt")!
  233. let certificate = try! String(contentsOf: certificateURL)
  234. let keyURL = Bundle.main.url(forResource: "ssl", withExtension: "key")!
  235. let key = try! String(contentsOf: keyURL)
  236. self.server = gRPC.Server(address:address, key:key, certs:certificate)
  237. } else {
  238. self.server = gRPC.Server(address:address)
  239. }
  240. self.myServer = MyEchoServer()
  241. }
  242. func start() {
  243. print("Server Starting")
  244. print("GRPC version " + gRPC.version())
  245. server.run {(handler) in
  246. print("Server received request to " + handler.host
  247. + " calling " + handler.method
  248. + " from " + handler.caller)
  249. if (handler.method == "/echo.Echo/Get") {
  250. handler.session = EchoGetSession(handler:handler,
  251. server:self.myServer)
  252. handler.session.run()
  253. }
  254. else if (handler.method == "/echo.Echo/Expand") {
  255. handler.session = EchoExpandSession(handler:handler,
  256. server:self.myServer)
  257. handler.session.run()
  258. }
  259. else if (handler.method == "/echo.Echo/Collect") {
  260. handler.session = EchoCollectSession(handler:handler,
  261. server:self.myServer)
  262. handler.session.run()
  263. }
  264. else if (handler.method == "/echo.Echo/Update") {
  265. handler.session = EchoUpdateSession(handler:handler,
  266. server:self.myServer)
  267. handler.session.run()
  268. }
  269. }
  270. }
  271. }
  272. // The following code is for developer/users to edit.
  273. // Everything above these lines is intended to be preexisting or generated.
  274. class MyEchoServer : CustomEchoServer {
  275. func Get(request : Echo_EchoRequest) throws -> Echo_EchoResponse {
  276. print("Get received: \(request.text)")
  277. return Echo_EchoResponse(text:"Swift echo get: " + request.text)
  278. }
  279. func Expand(request : Echo_EchoRequest, stream : EchoExpandSession) throws -> Void {
  280. print("Expand received: \(request.text)")
  281. let parts = request.text.components(separatedBy: " ")
  282. var i = 0
  283. for part in parts {
  284. try stream.Send(Echo_EchoResponse(text:"Swift echo expand (\(i)): \(part)"))
  285. i += 1
  286. sleep(1)
  287. }
  288. }
  289. func Collect(stream : EchoCollectSession) throws -> Void {
  290. DispatchQueue.global().async {
  291. print("Called collect")
  292. var parts : [String] = []
  293. while true {
  294. do {
  295. let request = try stream.Recv()
  296. print("Collect received: \(request.text)")
  297. parts.append(request.text)
  298. } catch ServerError.endOfStream {
  299. break
  300. } catch (let error) {
  301. print("\(error)")
  302. }
  303. }
  304. print("sending collect response")
  305. let response = Echo_EchoResponse(text:"Swift echo collect: " + parts.joined(separator: " "))
  306. try! stream.SendAndClose(response)
  307. }
  308. }
  309. func Update(stream : EchoUpdateSession) throws -> Void {
  310. DispatchQueue.global().async {
  311. var count = 0
  312. while true {
  313. do {
  314. let request = try stream.Recv()
  315. print("Update received: \(request.text)")
  316. count += 1
  317. try stream.Send(Echo_EchoResponse(text:"Swift echo update (\(count)): \(request.text)"))
  318. } catch ServerError.endOfStream {
  319. break
  320. } catch (let error) {
  321. }
  322. }
  323. stream.Close()
  324. }
  325. }
  326. }