Call.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /*
  2. *
  3. * Copyright 2016, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. #if SWIFT_PACKAGE
  34. import CgRPC
  35. #endif
  36. import Foundation
  37. /// Singleton class that provides a mutex for synchronizing calls to cgrpc_call_perform()
  38. private class CallLock {
  39. var mutex : Mutex
  40. private init() {
  41. mutex = Mutex()
  42. }
  43. static let sharedInstance = CallLock()
  44. }
  45. public enum CallError : Error {
  46. case ok
  47. case unknown
  48. case notOnServer
  49. case notOnClient
  50. case alreadyAccepted
  51. case alreadyInvoked
  52. case notInvoked
  53. case alreadyFinished
  54. case tooManyOperations
  55. case invalidFlags
  56. case invalidMetadata
  57. case invalidMessage
  58. case notServerCompletionQueue
  59. case batchTooBig
  60. case payloadTypeMismatch
  61. static func callError(grpcCallError error: grpc_call_error) -> CallError {
  62. switch(error) {
  63. case GRPC_CALL_OK:
  64. return .ok
  65. case GRPC_CALL_ERROR:
  66. return .unknown
  67. case GRPC_CALL_ERROR_NOT_ON_SERVER:
  68. return .notOnServer
  69. case GRPC_CALL_ERROR_NOT_ON_CLIENT:
  70. return .notOnClient
  71. case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
  72. return .alreadyAccepted
  73. case GRPC_CALL_ERROR_ALREADY_INVOKED:
  74. return .alreadyInvoked
  75. case GRPC_CALL_ERROR_NOT_INVOKED:
  76. return .notInvoked
  77. case GRPC_CALL_ERROR_ALREADY_FINISHED:
  78. return .alreadyFinished
  79. case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
  80. return .tooManyOperations
  81. case GRPC_CALL_ERROR_INVALID_FLAGS:
  82. return .invalidFlags
  83. case GRPC_CALL_ERROR_INVALID_METADATA:
  84. return .invalidMetadata
  85. case GRPC_CALL_ERROR_INVALID_MESSAGE:
  86. return .invalidMessage
  87. case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
  88. return .notServerCompletionQueue
  89. case GRPC_CALL_ERROR_BATCH_TOO_BIG:
  90. return .batchTooBig
  91. case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
  92. return .payloadTypeMismatch
  93. default:
  94. return .unknown
  95. }
  96. }
  97. }
  98. public struct CallResult {
  99. public var statusCode : Int
  100. public var statusMessage : String?
  101. public var resultData : Data?
  102. public var initialMetadata : Metadata?
  103. public var trailingMetadata : Metadata?
  104. }
  105. public typealias CallCompletion = (CallResult) -> Void
  106. public typealias SendMessageCompletion = (CallError) -> Void
  107. /// A gRPC API call
  108. public class Call {
  109. /// Pointer to underlying C representation
  110. private var underlyingCall : UnsafeMutableRawPointer!
  111. /// Completion queue used for call
  112. private var completionQueue: CompletionQueue
  113. /// True if this instance is responsible for deleting the underlying C representation
  114. private var owned : Bool
  115. /// A queue of pending messages to send over the call
  116. private var pendingMessages : Array<Data>
  117. /// True if a message write operation is underway
  118. private var writing : Bool
  119. /// Initializes a Call representation
  120. ///
  121. /// - Parameter call: the underlying C representation
  122. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  123. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  124. self.underlyingCall = underlyingCall
  125. self.owned = owned
  126. self.completionQueue = completionQueue
  127. self.pendingMessages = []
  128. self.writing = false
  129. }
  130. deinit {
  131. if (owned) {
  132. cgrpc_call_destroy(underlyingCall)
  133. }
  134. }
  135. /// Initiate performance of a call without waiting for completion
  136. ///
  137. /// - Parameter operations: array of operations to be performed
  138. /// - Parameter completionQueue: completion queue used to wait for completion
  139. /// - Returns: the result of initiating the call
  140. func performOperations(operations: OperationGroup,
  141. completionQueue: CompletionQueue)
  142. -> CallError {
  143. completionQueue.operationGroups[operations.tag] = operations
  144. let mutex = CallLock.sharedInstance.mutex
  145. mutex.lock()
  146. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
  147. mutex.unlock()
  148. return CallError.callError(grpcCallError:error)
  149. }
  150. /// Performs a nonstreaming gRPC API call
  151. ///
  152. /// - Parameter message: a ByteBuffer containing the message to send
  153. /// - Parameter metadata: metadata to send with the call
  154. /// - Returns: a CallResponse object containing results of the call
  155. public func performNonStreamingCall(messageData: Data,
  156. metadata: Metadata,
  157. completion: @escaping CallCompletion) -> CallError {
  158. let messageBuffer = ByteBuffer(data:messageData)
  159. let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
  160. let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
  161. let operation_sendCloseFromClient = Operation_SendCloseFromClient()
  162. let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
  163. let operation_receiveStatusOnClient = Operation_ReceiveStatusOnClient()
  164. let operation_receiveMessage = Operation_ReceiveMessage()
  165. let group = OperationGroup(call:self,
  166. operations:[operation_sendInitialMetadata,
  167. operation_sendMessage,
  168. operation_sendCloseFromClient,
  169. operation_receiveInitialMetadata,
  170. operation_receiveStatusOnClient,
  171. operation_receiveMessage],
  172. completion:
  173. {(success) in
  174. if success {
  175. completion(CallResult(statusCode:operation_receiveStatusOnClient.status(),
  176. statusMessage:operation_receiveStatusOnClient.statusDetails(),
  177. resultData:operation_receiveMessage.message()?.data(),
  178. initialMetadata:operation_receiveInitialMetadata.metadata(),
  179. trailingMetadata:operation_receiveStatusOnClient.metadata()))
  180. } else {
  181. completion(CallResult(statusCode:0,
  182. statusMessage:nil,
  183. resultData:nil,
  184. initialMetadata:nil,
  185. trailingMetadata:nil))
  186. }
  187. })
  188. return self.perform(operations: group)
  189. }
  190. // perform a group of operations (used internally)
  191. private func perform(operations: OperationGroup) -> CallError {
  192. return performOperations(operations:operations,
  193. completionQueue: self.completionQueue)
  194. }
  195. // start a streaming connection
  196. public func start(metadata: Metadata) -> CallError {
  197. var error : CallError
  198. error = self.sendInitialMetadata(metadata: metadata)
  199. if error != .ok {
  200. return error
  201. }
  202. error = self.receiveInitialMetadata()
  203. if error != .ok {
  204. return error
  205. }
  206. return self.receiveStatus()
  207. }
  208. // send a message over a streaming connection
  209. public func sendMessage(data: Data,
  210. callback:@escaping SendMessageCompletion = {(error) in })
  211. -> Void {
  212. DispatchQueue.main.async {
  213. if self.writing {
  214. self.pendingMessages.append(data) // TODO: return something if we can't accept another message
  215. callback(.ok)
  216. } else {
  217. self.writing = true
  218. let error = self.sendWithoutBlocking(data: data)
  219. callback(error)
  220. }
  221. }
  222. }
  223. private func sendWithoutBlocking(data: Data) -> CallError {
  224. let messageBuffer = ByteBuffer(data:data)
  225. let operation_sendMessage = Operation_SendMessage(message:messageBuffer)
  226. let operations = OperationGroup(call:self, operations:[operation_sendMessage])
  227. { (event) in
  228. // TODO: if the event failed, shut down
  229. DispatchQueue.main.async {
  230. if self.pendingMessages.count > 0 {
  231. let nextMessage = self.pendingMessages.first!
  232. self.pendingMessages.removeFirst()
  233. _ = self.sendWithoutBlocking(data: nextMessage)
  234. } else {
  235. self.writing = false
  236. }
  237. }
  238. }
  239. return self.perform(operations:operations)
  240. }
  241. // receive a message over a streaming connection
  242. public func receiveMessage(callback:@escaping ((Data!) -> Void)) -> CallError {
  243. let operation_receiveMessage = Operation_ReceiveMessage()
  244. let operations = OperationGroup(call:self, operations:[operation_receiveMessage])
  245. { (event) in
  246. if let messageBuffer = operation_receiveMessage.message() {
  247. callback(messageBuffer.data())
  248. }
  249. }
  250. return self.perform(operations:operations)
  251. }
  252. // send initial metadata over a streaming connection
  253. private func sendInitialMetadata(metadata: Metadata) -> CallError {
  254. let operation_sendInitialMetadata = Operation_SendInitialMetadata(metadata:metadata);
  255. let operations = OperationGroup(call:self, operations:[operation_sendInitialMetadata])
  256. { (success) in
  257. if (success) {
  258. print("call successful")
  259. } else {
  260. return
  261. }
  262. }
  263. return self.perform(operations:operations)
  264. }
  265. // receive initial metadata from a streaming connection
  266. private func receiveInitialMetadata() -> CallError {
  267. let operation_receiveInitialMetadata = Operation_ReceiveInitialMetadata()
  268. let operations = OperationGroup(call:self, operations:[operation_receiveInitialMetadata])
  269. { (event) in
  270. let initialMetadata = operation_receiveInitialMetadata.metadata()
  271. for j in 0..<initialMetadata.count() {
  272. print("Received initial metadata -> " + initialMetadata.key(index:j) + " : " + initialMetadata.value(index:j))
  273. }
  274. }
  275. return self.perform(operations:operations)
  276. }
  277. // receive status from a streaming connection
  278. private func receiveStatus() -> CallError {
  279. let operation_receiveStatus = Operation_ReceiveStatusOnClient()
  280. let operations = OperationGroup(call:self,
  281. operations:[operation_receiveStatus])
  282. { (event) in
  283. print("status = \(operation_receiveStatus.status()), \(operation_receiveStatus.statusDetails())")
  284. }
  285. return self.perform(operations:operations)
  286. }
  287. // close a streaming connection
  288. public func close(completion:@escaping (() -> Void)) -> CallError {
  289. let operation_sendCloseFromClient = Operation_SendCloseFromClient()
  290. let operations = OperationGroup(call:self, operations:[operation_sendCloseFromClient])
  291. { (event) in
  292. completion()
  293. }
  294. return self.perform(operations:operations)
  295. }
  296. }