2
0

Call.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. /*
  2. *
  3. * Copyright 2016, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. #if SWIFT_PACKAGE
  34. import CgRPC
  35. import Dispatch
  36. #endif
  37. import Foundation
  38. public enum CallStyle {
  39. case unary
  40. case serverStreaming
  41. case clientStreaming
  42. case bidiStreaming
  43. }
  44. public enum CallWarning : Error {
  45. case blocked
  46. }
  47. public enum CallError : Error {
  48. case ok
  49. case unknown
  50. case notOnServer
  51. case notOnClient
  52. case alreadyAccepted
  53. case alreadyInvoked
  54. case notInvoked
  55. case alreadyFinished
  56. case tooManyOperations
  57. case invalidFlags
  58. case invalidMetadata
  59. case invalidMessage
  60. case notServerCompletionQueue
  61. case batchTooBig
  62. case payloadTypeMismatch
  63. static func callError(grpcCallError error: grpc_call_error) -> CallError {
  64. switch(error) {
  65. case GRPC_CALL_OK:
  66. return .ok
  67. case GRPC_CALL_ERROR:
  68. return .unknown
  69. case GRPC_CALL_ERROR_NOT_ON_SERVER:
  70. return .notOnServer
  71. case GRPC_CALL_ERROR_NOT_ON_CLIENT:
  72. return .notOnClient
  73. case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
  74. return .alreadyAccepted
  75. case GRPC_CALL_ERROR_ALREADY_INVOKED:
  76. return .alreadyInvoked
  77. case GRPC_CALL_ERROR_NOT_INVOKED:
  78. return .notInvoked
  79. case GRPC_CALL_ERROR_ALREADY_FINISHED:
  80. return .alreadyFinished
  81. case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
  82. return .tooManyOperations
  83. case GRPC_CALL_ERROR_INVALID_FLAGS:
  84. return .invalidFlags
  85. case GRPC_CALL_ERROR_INVALID_METADATA:
  86. return .invalidMetadata
  87. case GRPC_CALL_ERROR_INVALID_MESSAGE:
  88. return .invalidMessage
  89. case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
  90. return .notServerCompletionQueue
  91. case GRPC_CALL_ERROR_BATCH_TOO_BIG:
  92. return .batchTooBig
  93. case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
  94. return .payloadTypeMismatch
  95. default:
  96. return .unknown
  97. }
  98. }
  99. }
  100. public struct CallResult {
  101. public var statusCode : Int
  102. public var statusMessage : String?
  103. public var resultData : Data?
  104. public var initialMetadata : Metadata?
  105. public var trailingMetadata : Metadata?
  106. fileprivate init(_ op : OperationGroup) {
  107. if (op.success) {
  108. if let statusCode = op.receivedStatusCode() {
  109. self.statusCode = statusCode
  110. } else {
  111. self.statusCode = 0
  112. }
  113. self.statusMessage = op.receivedStatusMessage()
  114. self.resultData = op.receivedMessage()?.data()
  115. self.initialMetadata = op.receivedInitialMetadata()
  116. self.trailingMetadata = op.receivedTrailingMetadata()
  117. } else {
  118. self.statusCode = 0
  119. self.statusMessage = nil
  120. self.resultData = nil
  121. self.initialMetadata = nil
  122. self.trailingMetadata = nil
  123. }
  124. }
  125. }
  126. /// A gRPC API call
  127. public class Call {
  128. /// Shared mutex for synchronizing calls to cgrpc_call_perform()
  129. private static let callMutex = Mutex()
  130. /// Maximum number of messages that can be queued
  131. private static var maximumQueuedMessages = 10
  132. /// Pointer to underlying C representation
  133. private var underlyingCall : UnsafeMutableRawPointer
  134. /// Completion queue used for call
  135. private var completionQueue: CompletionQueue
  136. /// True if this instance is responsible for deleting the underlying C representation
  137. private var owned : Bool
  138. /// A queue of pending messages to send over the call
  139. private var pendingMessages : Array<Data>
  140. /// True if a message write operation is underway
  141. private var writing : Bool
  142. /// Mutex for synchronizing message sending
  143. private var sendMutex : Mutex
  144. /// Dispatch queue used for sending messages asynchronously
  145. private var messageDispatchQueue: DispatchQueue = DispatchQueue.global()
  146. /// Initializes a Call representation
  147. ///
  148. /// - Parameter call: the underlying C representation
  149. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  150. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  151. self.underlyingCall = underlyingCall
  152. self.owned = owned
  153. self.completionQueue = completionQueue
  154. self.pendingMessages = []
  155. self.writing = false
  156. self.sendMutex = Mutex()
  157. }
  158. deinit {
  159. if (owned) {
  160. cgrpc_call_destroy(underlyingCall)
  161. }
  162. }
  163. /// Initiates performance of a group of operations without waiting for completion.
  164. ///
  165. /// - Parameter operations: group of operations to be performed
  166. /// - Returns: the result of initiating the call
  167. internal func perform(_ operations: OperationGroup) throws -> Void {
  168. completionQueue.register(operations)
  169. Call.callMutex.lock()
  170. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
  171. Call.callMutex.unlock()
  172. if error != GRPC_CALL_OK {
  173. throw CallError.callError(grpcCallError:error)
  174. }
  175. }
  176. /// Starts a gRPC API call.
  177. ///
  178. /// - Parameter style: the style of call to start
  179. /// - Parameter metadata: metadata to send with the call
  180. /// - Parameter message: data containing the message to send (.unary and .serverStreaming only)
  181. /// - Parameter callback: a block to call with call results
  182. public func start(_ style: CallStyle,
  183. metadata: Metadata,
  184. message: Data? = nil,
  185. completion: @escaping (CallResult) -> Void)
  186. throws -> Void {
  187. var operations : [Operation] = []
  188. switch style {
  189. case .unary:
  190. guard let message = message else {
  191. throw CallError.invalidMessage
  192. }
  193. operations = [.sendInitialMetadata(metadata.copy() as! Metadata),
  194. .receiveInitialMetadata,
  195. .receiveStatusOnClient,
  196. .sendMessage(ByteBuffer(data:message)),
  197. .sendCloseFromClient,
  198. .receiveMessage]
  199. case .serverStreaming:
  200. guard let message = message else {
  201. throw CallError.invalidMessage
  202. }
  203. operations = [.sendInitialMetadata(metadata.copy() as! Metadata),
  204. .receiveInitialMetadata,
  205. .sendMessage(ByteBuffer(data:message)),
  206. .sendCloseFromClient
  207. ]
  208. case .clientStreaming, .bidiStreaming:
  209. operations = [.sendInitialMetadata(metadata.copy() as! Metadata),
  210. .receiveInitialMetadata]
  211. }
  212. try self.perform(OperationGroup(call:self,
  213. operations:operations,
  214. completion:{(op) in completion(CallResult(op))}))
  215. }
  216. /// Sends a message over a streaming connection.
  217. ///
  218. /// Parameter data: the message data to send
  219. public func sendMessage(data: Data) throws {
  220. self.sendMutex.lock()
  221. defer {self.sendMutex.unlock()}
  222. if self.writing {
  223. if self.pendingMessages.count == Call.maximumQueuedMessages {
  224. throw CallWarning.blocked
  225. }
  226. self.pendingMessages.append(data)
  227. } else {
  228. self.writing = true
  229. try self.sendWithoutBlocking(data: data)
  230. }
  231. }
  232. /// helper for sending queued messages
  233. private func sendWithoutBlocking(data: Data) throws -> Void {
  234. try self.perform(OperationGroup(call:self,
  235. operations:[.sendMessage(ByteBuffer(data:data))])
  236. {(operationGroup) in
  237. if operationGroup.success {
  238. self.messageDispatchQueue.async {
  239. self.sendMutex.synchronize {
  240. // if there are messages pending, send the next one
  241. if self.pendingMessages.count > 0 {
  242. let nextMessage = self.pendingMessages.first!
  243. self.pendingMessages.removeFirst()
  244. do {
  245. try self.sendWithoutBlocking(data: nextMessage)
  246. } catch (let callError) {
  247. print("Call sendWithoutBlocking: grpc error \(callError)")
  248. }
  249. } else {
  250. // otherwise, we are finished writing
  251. self.writing = false
  252. }
  253. }
  254. }
  255. } else {
  256. // TODO: if the event failed, shut down
  257. self.writing = false
  258. }
  259. })
  260. }
  261. // Receive a message over a streaming connection.
  262. public func receiveMessage(callback:@escaping ((Data!) throws -> Void)) throws -> Void {
  263. try self.perform(OperationGroup(call:self, operations:[.receiveMessage])
  264. {(operationGroup) in
  265. if operationGroup.success {
  266. if let messageBuffer = operationGroup.receivedMessage() {
  267. try callback(messageBuffer.data())
  268. } else {
  269. try callback(nil) // an empty response signals the end of a connection
  270. }
  271. }
  272. })
  273. }
  274. // Closes a streaming connection.
  275. public func close(completion:@escaping (() -> Void)) throws -> Void {
  276. try self.perform(OperationGroup(call:self, operations:[.sendCloseFromClient])
  277. {(operationGroup) in completion()
  278. })
  279. }
  280. }