Call.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*
  2. *
  3. * Copyright 2016, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. #if SWIFT_PACKAGE
  34. import CgRPC
  35. import Dispatch
  36. #endif
  37. import Foundation
  38. public enum CallError : Error {
  39. case ok
  40. case unknown
  41. case notOnServer
  42. case notOnClient
  43. case alreadyAccepted
  44. case alreadyInvoked
  45. case notInvoked
  46. case alreadyFinished
  47. case tooManyOperations
  48. case invalidFlags
  49. case invalidMetadata
  50. case invalidMessage
  51. case notServerCompletionQueue
  52. case batchTooBig
  53. case payloadTypeMismatch
  54. static func callError(grpcCallError error: grpc_call_error) -> CallError {
  55. switch(error) {
  56. case GRPC_CALL_OK:
  57. return .ok
  58. case GRPC_CALL_ERROR:
  59. return .unknown
  60. case GRPC_CALL_ERROR_NOT_ON_SERVER:
  61. return .notOnServer
  62. case GRPC_CALL_ERROR_NOT_ON_CLIENT:
  63. return .notOnClient
  64. case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
  65. return .alreadyAccepted
  66. case GRPC_CALL_ERROR_ALREADY_INVOKED:
  67. return .alreadyInvoked
  68. case GRPC_CALL_ERROR_NOT_INVOKED:
  69. return .notInvoked
  70. case GRPC_CALL_ERROR_ALREADY_FINISHED:
  71. return .alreadyFinished
  72. case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
  73. return .tooManyOperations
  74. case GRPC_CALL_ERROR_INVALID_FLAGS:
  75. return .invalidFlags
  76. case GRPC_CALL_ERROR_INVALID_METADATA:
  77. return .invalidMetadata
  78. case GRPC_CALL_ERROR_INVALID_MESSAGE:
  79. return .invalidMessage
  80. case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
  81. return .notServerCompletionQueue
  82. case GRPC_CALL_ERROR_BATCH_TOO_BIG:
  83. return .batchTooBig
  84. case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
  85. return .payloadTypeMismatch
  86. default:
  87. return .unknown
  88. }
  89. }
  90. }
  91. public struct CallResult {
  92. public var statusCode : Int
  93. public var statusMessage : String?
  94. public var resultData : Data?
  95. public var initialMetadata : Metadata?
  96. public var trailingMetadata : Metadata?
  97. }
  98. /// A gRPC API call
  99. public class Call {
  100. /// Shared mutex for synchronizing calls to cgrpc_call_perform()
  101. static let callMutex = Mutex()
  102. /// Maximum number of messages that can be queued
  103. static var maximumQueuedMessages = 10
  104. /// Pointer to underlying C representation
  105. private var underlyingCall : UnsafeMutableRawPointer
  106. /// Completion queue used for call
  107. private var completionQueue: CompletionQueue
  108. /// True if this instance is responsible for deleting the underlying C representation
  109. private var owned : Bool
  110. /// A queue of pending messages to send over the call
  111. private var pendingMessages : Array<Data>
  112. /// True if a message write operation is underway
  113. private var writing : Bool
  114. /// Mutex for synchronizing message sending
  115. private var sendMutex : Mutex
  116. /// Dispatch queue used for sending messages asynchronously
  117. private var messageDispatchQueue: DispatchQueue = DispatchQueue.global()
  118. /// Initializes a Call representation
  119. ///
  120. /// - Parameter call: the underlying C representation
  121. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  122. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  123. self.underlyingCall = underlyingCall
  124. self.owned = owned
  125. self.completionQueue = completionQueue
  126. self.pendingMessages = []
  127. self.writing = false
  128. self.sendMutex = Mutex()
  129. }
  130. deinit {
  131. if (owned) {
  132. cgrpc_call_destroy(underlyingCall)
  133. }
  134. }
  135. /// Initiate performance of a group of operations without waiting for completion
  136. ///
  137. /// - Parameter operations: group of operations to be performed
  138. /// - Returns: the result of initiating the call
  139. internal func perform(_ operations: OperationGroup) throws -> Void {
  140. completionQueue.register(operations)
  141. Call.callMutex.lock()
  142. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
  143. Call.callMutex.unlock()
  144. if error != GRPC_CALL_OK {
  145. throw CallError.callError(grpcCallError:error)
  146. }
  147. }
  148. /// Performs a nonstreaming gRPC API call
  149. ///
  150. /// - Parameter message: data containing the message to send
  151. /// - Parameter metadata: metadata to send with the call
  152. /// - Parameter callback: a block to call with a CallResponse object containing call results
  153. public func perform(message: Data,
  154. metadata: Metadata,
  155. completion: @escaping (CallResult) throws -> Void)
  156. throws -> Void {
  157. let messageBuffer = ByteBuffer(data:message)
  158. let operations = OperationGroup(call:self,
  159. operations:[.sendInitialMetadata(metadata),
  160. .sendMessage(messageBuffer),
  161. .sendCloseFromClient,
  162. .receiveInitialMetadata,
  163. .receiveStatusOnClient,
  164. .receiveMessage],
  165. completion:
  166. {(operationGroup) in
  167. if operationGroup.success {
  168. try completion(CallResult(statusCode:operationGroup.receivedStatusCode()!,
  169. statusMessage:operationGroup.receivedStatusMessage(),
  170. resultData:operationGroup.receivedMessage()?.data(),
  171. initialMetadata:operationGroup.receivedInitialMetadata(),
  172. trailingMetadata:operationGroup.receivedTrailingMetadata()))
  173. } else {
  174. try completion(CallResult(statusCode:0,
  175. statusMessage:nil,
  176. resultData:nil,
  177. initialMetadata:nil,
  178. trailingMetadata:nil))
  179. }
  180. })
  181. try self.perform(operations)
  182. }
  183. public func startServerStreaming(message: Data,
  184. metadata: Metadata,
  185. completion: @escaping (CallResult) throws -> Void)
  186. throws -> Void {
  187. let messageBuffer = ByteBuffer(data:message)
  188. let operations = OperationGroup(call:self,
  189. operations:[.sendInitialMetadata(metadata),
  190. .sendMessage(messageBuffer),
  191. .sendCloseFromClient,
  192. .receiveInitialMetadata
  193. ],
  194. completion:
  195. {(operationGroup) in
  196. if operationGroup.success {
  197. try completion(CallResult(statusCode:0,
  198. statusMessage:nil,
  199. resultData:nil,
  200. initialMetadata:operationGroup.receivedInitialMetadata(),
  201. trailingMetadata:nil))
  202. } else {
  203. try completion(CallResult(statusCode:0,
  204. statusMessage:nil,
  205. resultData:nil,
  206. initialMetadata:nil,
  207. trailingMetadata:nil))
  208. }
  209. })
  210. try self.perform(operations)
  211. }
  212. /// start a streaming connection
  213. ///
  214. /// Parameter metadata: initial metadata to send
  215. public func start(metadata: Metadata) throws -> Void {
  216. try self.sendInitialMetadata(metadata: metadata)
  217. try self.receiveInitialMetadata()
  218. try self.receiveStatus()
  219. }
  220. /// send a message over a streaming connection
  221. ///
  222. /// Parameter data: the message data to send
  223. /// Returns: true if the message could be queued or sent, false if the queue is full
  224. public func sendMessage(data: Data) -> Bool {
  225. self.sendMutex.lock()
  226. defer {self.sendMutex.unlock()}
  227. if self.writing {
  228. if self.pendingMessages.count == Call.maximumQueuedMessages {
  229. return false
  230. }
  231. self.pendingMessages.append(data) // TODO: return something if we can't accept another message
  232. } else {
  233. self.writing = true
  234. do {
  235. try self.sendWithoutBlocking(data: data)
  236. } catch (let callError) {
  237. print("Call sendMessage: grpc error \(callError)")
  238. }
  239. }
  240. return true
  241. }
  242. /// helper for sending queued messages
  243. private func sendWithoutBlocking(data: Data) throws -> Void {
  244. let operations = OperationGroup(call:self, operations:[.sendMessage(ByteBuffer(data:data))])
  245. {(operationGroup) in
  246. if operationGroup.success {
  247. self.messageDispatchQueue.async {
  248. self.sendMutex.synchronize {
  249. // if there are messages pending, send the next one
  250. if self.pendingMessages.count > 0 {
  251. let nextMessage = self.pendingMessages.first!
  252. self.pendingMessages.removeFirst()
  253. do {
  254. try self.sendWithoutBlocking(data: nextMessage)
  255. } catch (let callError) {
  256. print("Call sendWithoutBlocking: grpc error \(callError)")
  257. }
  258. } else {
  259. // otherwise, we are finished writing
  260. self.writing = false
  261. }
  262. }
  263. }
  264. } else {
  265. // TODO: if the event failed, shut down
  266. }
  267. }
  268. try self.perform(operations)
  269. }
  270. // receive a message over a streaming connection
  271. public func receiveMessage(callback:@escaping ((Data!) throws -> Void)) throws -> Void {
  272. let operations = OperationGroup(call:self, operations:[.receiveMessage])
  273. {(operationGroup) in
  274. if operationGroup.success {
  275. if let messageBuffer = operationGroup.receivedMessage() {
  276. try callback(messageBuffer.data())
  277. } else {
  278. try callback(nil) // an empty response signals the end of a connection
  279. }
  280. }
  281. }
  282. try self.perform(operations)
  283. }
  284. // send initial metadata over a streaming connection
  285. private func sendInitialMetadata(metadata: Metadata) throws -> Void {
  286. let operations = OperationGroup(call:self, operations:[.sendInitialMetadata(metadata)])
  287. {(operationGroup) in
  288. if operationGroup.success {
  289. } else {
  290. return
  291. }
  292. }
  293. try self.perform(operations)
  294. }
  295. // receive initial metadata from a streaming connection
  296. private func receiveInitialMetadata() throws -> Void {
  297. let operations = OperationGroup(call:self, operations:[.receiveInitialMetadata])
  298. {(operationGroup) in
  299. if operationGroup.success {
  300. if let initialMetadata = operationGroup.receivedInitialMetadata() {
  301. for j in 0..<initialMetadata.count() {
  302. print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
  303. }
  304. }
  305. }
  306. }
  307. try self.perform(operations)
  308. }
  309. // receive status from a streaming connection
  310. private func receiveStatus() throws -> Void {
  311. let operations = OperationGroup(call:self, operations:[.receiveStatusOnClient])
  312. {(operationGroup) in
  313. if operationGroup.success {
  314. print("status = \(operationGroup.receivedStatusCode()), \(operationGroup.receivedStatusMessage())")
  315. }
  316. }
  317. try self.perform(operations)
  318. }
  319. // close a streaming connection
  320. public func close(completion:@escaping (() -> Void)) throws -> Void {
  321. let operations = OperationGroup(call:self, operations:[.sendCloseFromClient])
  322. {(operationGroup) in
  323. if operationGroup.success {
  324. completion()
  325. }
  326. }
  327. try self.perform(operations)
  328. }
  329. }