Call.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. public enum CallStyle {
  22. case unary
  23. case serverStreaming
  24. case clientStreaming
  25. case bidiStreaming
  26. }
  27. public enum CallWarning: Error {
  28. case blocked
  29. }
  30. public enum CallError: Error {
  31. case ok
  32. case unknown
  33. case notOnServer
  34. case notOnClient
  35. case alreadyAccepted
  36. case alreadyInvoked
  37. case notInvoked
  38. case alreadyFinished
  39. case tooManyOperations
  40. case invalidFlags
  41. case invalidMetadata
  42. case invalidMessage
  43. case notServerCompletionQueue
  44. case batchTooBig
  45. case payloadTypeMismatch
  46. static func callError(grpcCallError error: grpc_call_error) -> CallError {
  47. switch error {
  48. case GRPC_CALL_OK:
  49. return .ok
  50. case GRPC_CALL_ERROR:
  51. return .unknown
  52. case GRPC_CALL_ERROR_NOT_ON_SERVER:
  53. return .notOnServer
  54. case GRPC_CALL_ERROR_NOT_ON_CLIENT:
  55. return .notOnClient
  56. case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
  57. return .alreadyAccepted
  58. case GRPC_CALL_ERROR_ALREADY_INVOKED:
  59. return .alreadyInvoked
  60. case GRPC_CALL_ERROR_NOT_INVOKED:
  61. return .notInvoked
  62. case GRPC_CALL_ERROR_ALREADY_FINISHED:
  63. return .alreadyFinished
  64. case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
  65. return .tooManyOperations
  66. case GRPC_CALL_ERROR_INVALID_FLAGS:
  67. return .invalidFlags
  68. case GRPC_CALL_ERROR_INVALID_METADATA:
  69. return .invalidMetadata
  70. case GRPC_CALL_ERROR_INVALID_MESSAGE:
  71. return .invalidMessage
  72. case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
  73. return .notServerCompletionQueue
  74. case GRPC_CALL_ERROR_BATCH_TOO_BIG:
  75. return .batchTooBig
  76. case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
  77. return .payloadTypeMismatch
  78. default:
  79. return .unknown
  80. }
  81. }
  82. }
  83. public struct CallResult: CustomStringConvertible {
  84. public let statusCode: StatusCode
  85. public let statusMessage: String?
  86. public let resultData: Data?
  87. public let initialMetadata: Metadata?
  88. public let trailingMetadata: Metadata?
  89. fileprivate init(_ op: OperationGroup) {
  90. if op.success {
  91. if let statusCodeRawValue = op.receivedStatusCode() {
  92. if let statusCode = StatusCode(rawValue: statusCodeRawValue) {
  93. self.statusCode = statusCode
  94. } else {
  95. statusCode = .unknown
  96. }
  97. } else {
  98. statusCode = .ok
  99. }
  100. statusMessage = op.receivedStatusMessage()
  101. resultData = op.receivedMessage()?.data()
  102. initialMetadata = op.receivedInitialMetadata()
  103. trailingMetadata = op.receivedTrailingMetadata()
  104. } else {
  105. statusCode = .ok
  106. statusMessage = nil
  107. resultData = nil
  108. initialMetadata = nil
  109. trailingMetadata = nil
  110. }
  111. }
  112. public var description: String {
  113. var result = "status \(statusCode)"
  114. if let statusMessage = self.statusMessage {
  115. result += ": " + statusMessage
  116. }
  117. if let resultData = self.resultData {
  118. result += "\n"
  119. result += resultData.description
  120. }
  121. if let initialMetadata = self.initialMetadata {
  122. result += "\n"
  123. result += initialMetadata.description
  124. }
  125. if let trailingMetadata = self.trailingMetadata {
  126. result += "\n"
  127. result += trailingMetadata.description
  128. }
  129. return result
  130. }
  131. }
  132. /// A gRPC API call
  133. public class Call {
  134. /// Shared mutex for synchronizing calls to cgrpc_call_perform()
  135. private static let callMutex = Mutex()
  136. /// Maximum number of messages that can be queued
  137. public static var messageQueueMaxLength: Int? = nil
  138. /// Pointer to underlying C representation
  139. private let underlyingCall: UnsafeMutableRawPointer
  140. /// Completion queue used for call
  141. private let completionQueue: CompletionQueue
  142. /// True if this instance is responsible for deleting the underlying C representation
  143. private let owned: Bool
  144. /// A queue of pending messages to send over the call
  145. private var messageQueue: [(dataToSend: Data, completion: ((Error?) -> Void)?)] = []
  146. /// A dispatch group that contains all pending send operations.
  147. /// You can wait on it to ensure that all currently enqueued messages have been sent.
  148. public let messageQueueEmpty = DispatchGroup()
  149. /// True if a message write operation is underway
  150. private var writing: Bool
  151. /// Mutex for synchronizing message sending
  152. private let sendMutex: Mutex
  153. /// Dispatch queue used for sending messages asynchronously
  154. private let messageDispatchQueue: DispatchQueue = DispatchQueue.global()
  155. /// Initializes a Call representation
  156. ///
  157. /// - Parameter call: the underlying C representation
  158. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  159. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  160. self.underlyingCall = underlyingCall
  161. self.owned = owned
  162. self.completionQueue = completionQueue
  163. writing = false
  164. sendMutex = Mutex()
  165. }
  166. deinit {
  167. if owned {
  168. cgrpc_call_destroy(underlyingCall)
  169. }
  170. }
  171. /// Initiates performance of a group of operations without waiting for completion.
  172. ///
  173. /// - Parameter operations: group of operations to be performed
  174. /// - Returns: the result of initiating the call
  175. /// - Throws: `CallError` if fails to call.
  176. func perform(_ operations: OperationGroup) throws {
  177. completionQueue.register(operations)
  178. Call.callMutex.lock()
  179. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
  180. Call.callMutex.unlock()
  181. if error != GRPC_CALL_OK {
  182. throw CallError.callError(grpcCallError: error)
  183. }
  184. }
  185. /// Starts a gRPC API call.
  186. ///
  187. /// - Parameter style: the style of call to start
  188. /// - Parameter metadata: metadata to send with the call
  189. /// - Parameter message: data containing the message to send (.unary and .serverStreaming only)
  190. /// - Parameter completion: a block to call with call results
  191. /// - Throws: `CallError` if fails to call.
  192. public func start(_ style: CallStyle,
  193. metadata: Metadata,
  194. message: Data? = nil,
  195. completion: ((CallResult) -> Void)? = nil) throws {
  196. var operations: [Operation] = []
  197. switch style {
  198. case .unary:
  199. guard let message = message else {
  200. throw CallError.invalidMessage
  201. }
  202. operations = [
  203. .sendInitialMetadata(metadata.copy()),
  204. .sendMessage(ByteBuffer(data:message)),
  205. .sendCloseFromClient,
  206. .receiveInitialMetadata,
  207. .receiveMessage,
  208. .receiveStatusOnClient,
  209. ]
  210. case .serverStreaming:
  211. guard let message = message else {
  212. throw CallError.invalidMessage
  213. }
  214. operations = [
  215. .sendInitialMetadata(metadata.copy()),
  216. .sendMessage(ByteBuffer(data:message)),
  217. .sendCloseFromClient,
  218. .receiveInitialMetadata,
  219. .receiveStatusOnClient,
  220. ]
  221. case .clientStreaming, .bidiStreaming:
  222. operations = [
  223. .sendInitialMetadata(metadata.copy()),
  224. .receiveInitialMetadata,
  225. .receiveStatusOnClient,
  226. ]
  227. }
  228. try perform(OperationGroup(call: self,
  229. operations: operations,
  230. completion: completion != nil
  231. ? { op in completion?(CallResult(op)) }
  232. : nil))
  233. }
  234. /// Sends a message over a streaming connection.
  235. ///
  236. /// Parameter data: the message data to send
  237. /// - Throws: `CallError` if fails to call. `CallWarning` if blocked.
  238. public func sendMessage(data: Data, completion: ((Error?) -> Void)? = nil) throws {
  239. messageQueueEmpty.enter()
  240. try sendMutex.synchronize {
  241. if writing {
  242. if let messageQueueMaxLength = Call.messageQueueMaxLength,
  243. messageQueue.count >= messageQueueMaxLength {
  244. throw CallWarning.blocked
  245. }
  246. messageQueue.append((dataToSend: data, completion: completion))
  247. } else {
  248. writing = true
  249. try sendWithoutBlocking(data: data, completion: completion)
  250. }
  251. }
  252. }
  253. /// helper for sending queued messages
  254. private func sendWithoutBlocking(data: Data, completion: ((Error?) -> Void)?) throws {
  255. try perform(OperationGroup(call: self,
  256. operations: [.sendMessage(ByteBuffer(data: data))]) { operationGroup in
  257. // TODO(timburks, danielalm): Is the `async` dispatch here needed, and/or should we call the completion handler
  258. // and leave `messageQueueEmpty` in the `async` block as well?
  259. self.messageDispatchQueue.async {
  260. // Always enqueue the next message, even if sending this one failed. This ensures that all send completion
  261. // handlers are called eventually.
  262. self.sendMutex.synchronize {
  263. // if there are messages pending, send the next one
  264. if self.messageQueue.count > 0 {
  265. let (nextMessage, nextCompletionHandler) = self.messageQueue.removeFirst()
  266. do {
  267. try self.sendWithoutBlocking(data: nextMessage, completion: nextCompletionHandler)
  268. } catch (let callError) {
  269. nextCompletionHandler?(callError)
  270. }
  271. } else {
  272. // otherwise, we are finished writing
  273. self.writing = false
  274. }
  275. }
  276. }
  277. completion?(operationGroup.success ? nil : CallError.unknown)
  278. self.messageQueueEmpty.leave()
  279. })
  280. }
  281. // Receive a message over a streaming connection.
  282. /// - Throws: `CallError` if fails to call.
  283. public func closeAndReceiveMessage(completion: @escaping (Data?) throws -> Void) throws {
  284. try perform(OperationGroup(call: self, operations: [.sendCloseFromClient, .receiveMessage]) { operationGroup in
  285. if operationGroup.success {
  286. if let messageBuffer = operationGroup.receivedMessage() {
  287. try completion(messageBuffer.data())
  288. } else {
  289. try completion(nil) // an empty response signals the end of a connection
  290. }
  291. }
  292. })
  293. }
  294. // Receive a message over a streaming connection.
  295. /// - Throws: `CallError` if fails to call.
  296. public func receiveMessage(completion: @escaping (Data?) throws -> Void) throws {
  297. try perform(OperationGroup(call: self, operations: [.receiveMessage]) { operationGroup in
  298. if operationGroup.success {
  299. try completion(operationGroup.receivedMessage()?.data())
  300. } else {
  301. try completion(nil)
  302. }
  303. })
  304. }
  305. // Closes a streaming connection.
  306. /// - Throws: `CallError` if fails to call.
  307. public func close(completion: (() -> Void)? = nil) throws {
  308. try perform(OperationGroup(call: self, operations: [.sendCloseFromClient],
  309. completion: completion != nil
  310. ? { op in completion?() }
  311. : nil))
  312. }
  313. // Get the current message queue length
  314. public func messageQueueLength() -> Int {
  315. return messageQueue.count
  316. }
  317. /// Finishes the request side of this call, notifies the server that the RPC should be cancelled,
  318. /// and finishes the response side of the call with an error of code CANCELED.
  319. public func cancel() {
  320. Call.callMutex.synchronize {
  321. cgrpc_call_cancel(underlyingCall)
  322. }
  323. }
  324. }