2
0

Call.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. public enum CallStyle {
  22. case unary
  23. case serverStreaming
  24. case clientStreaming
  25. case bidiStreaming
  26. }
  27. public enum CallWarning: Error {
  28. case blocked
  29. }
  30. public enum CallError: Error {
  31. case ok
  32. case unknown
  33. case notOnServer
  34. case notOnClient
  35. case alreadyAccepted
  36. case alreadyInvoked
  37. case notInvoked
  38. case alreadyFinished
  39. case tooManyOperations
  40. case invalidFlags
  41. case invalidMetadata
  42. case invalidMessage
  43. case notServerCompletionQueue
  44. case batchTooBig
  45. case payloadTypeMismatch
  46. static func callError(grpcCallError error: grpc_call_error) -> CallError {
  47. switch error {
  48. case GRPC_CALL_OK:
  49. return .ok
  50. case GRPC_CALL_ERROR:
  51. return .unknown
  52. case GRPC_CALL_ERROR_NOT_ON_SERVER:
  53. return .notOnServer
  54. case GRPC_CALL_ERROR_NOT_ON_CLIENT:
  55. return .notOnClient
  56. case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
  57. return .alreadyAccepted
  58. case GRPC_CALL_ERROR_ALREADY_INVOKED:
  59. return .alreadyInvoked
  60. case GRPC_CALL_ERROR_NOT_INVOKED:
  61. return .notInvoked
  62. case GRPC_CALL_ERROR_ALREADY_FINISHED:
  63. return .alreadyFinished
  64. case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
  65. return .tooManyOperations
  66. case GRPC_CALL_ERROR_INVALID_FLAGS:
  67. return .invalidFlags
  68. case GRPC_CALL_ERROR_INVALID_METADATA:
  69. return .invalidMetadata
  70. case GRPC_CALL_ERROR_INVALID_MESSAGE:
  71. return .invalidMessage
  72. case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
  73. return .notServerCompletionQueue
  74. case GRPC_CALL_ERROR_BATCH_TOO_BIG:
  75. return .batchTooBig
  76. case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
  77. return .payloadTypeMismatch
  78. default:
  79. return .unknown
  80. }
  81. }
  82. }
  83. public struct CallResult: CustomStringConvertible {
  84. public var statusCode: StatusCode
  85. public var statusMessage: String?
  86. public var resultData: Data?
  87. public var initialMetadata: Metadata?
  88. public var trailingMetadata: Metadata?
  89. fileprivate init(_ op: OperationGroup) {
  90. if op.success {
  91. if let statusCodeRawValue = op.receivedStatusCode() {
  92. if let statusCode = StatusCode(rawValue: statusCodeRawValue) {
  93. self.statusCode = statusCode
  94. } else {
  95. statusCode = .unknown
  96. }
  97. } else {
  98. statusCode = .ok
  99. }
  100. statusMessage = op.receivedStatusMessage()
  101. resultData = op.receivedMessage()?.data()
  102. initialMetadata = op.receivedInitialMetadata()
  103. trailingMetadata = op.receivedTrailingMetadata()
  104. } else {
  105. statusCode = .ok
  106. statusMessage = nil
  107. resultData = nil
  108. initialMetadata = nil
  109. trailingMetadata = nil
  110. }
  111. }
  112. public var description: String {
  113. var result = "status \(statusCode)"
  114. if let statusMessage = self.statusMessage {
  115. result += ": " + statusMessage
  116. }
  117. if let resultData = self.resultData {
  118. result += "\n"
  119. result += resultData.description
  120. }
  121. if let initialMetadata = self.initialMetadata {
  122. result += "\n"
  123. result += initialMetadata.description
  124. }
  125. if let trailingMetadata = self.trailingMetadata {
  126. result += "\n"
  127. result += trailingMetadata.description
  128. }
  129. return result
  130. }
  131. }
  132. /// A gRPC API call
  133. public class Call {
  134. /// Shared mutex for synchronizing calls to cgrpc_call_perform()
  135. private static let callMutex = Mutex()
  136. /// Maximum number of messages that can be queued
  137. public static var messageQueueMaxLength = 0
  138. /// Pointer to underlying C representation
  139. private var underlyingCall: UnsafeMutableRawPointer
  140. /// Completion queue used for call
  141. private var completionQueue: CompletionQueue
  142. /// True if this instance is responsible for deleting the underlying C representation
  143. private var owned: Bool
  144. /// A queue of pending messages to send over the call
  145. private var messageQueue: Array<Data>
  146. /// True if a message write operation is underway
  147. private var writing: Bool
  148. /// Mutex for synchronizing message sending
  149. private var sendMutex: Mutex
  150. /// Dispatch queue used for sending messages asynchronously
  151. private var messageDispatchQueue: DispatchQueue = DispatchQueue.global()
  152. /// Initializes a Call representation
  153. ///
  154. /// - Parameter call: the underlying C representation
  155. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  156. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  157. self.underlyingCall = underlyingCall
  158. self.owned = owned
  159. self.completionQueue = completionQueue
  160. messageQueue = []
  161. writing = false
  162. sendMutex = Mutex()
  163. }
  164. deinit {
  165. if owned {
  166. cgrpc_call_destroy(underlyingCall)
  167. }
  168. }
  169. /// Initiates performance of a group of operations without waiting for completion.
  170. ///
  171. /// - Parameter operations: group of operations to be performed
  172. /// - Returns: the result of initiating the call
  173. /// - Throws: `CallError` if fails to call.
  174. func perform(_ operations: OperationGroup) throws {
  175. completionQueue.register(operations)
  176. Call.callMutex.lock()
  177. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
  178. Call.callMutex.unlock()
  179. if error != GRPC_CALL_OK {
  180. throw CallError.callError(grpcCallError: error)
  181. }
  182. }
  183. /// Starts a gRPC API call.
  184. ///
  185. /// - Parameter style: the style of call to start
  186. /// - Parameter metadata: metadata to send with the call
  187. /// - Parameter message: data containing the message to send (.unary and .serverStreaming only)
  188. /// - Parameter callback: a block to call with call results
  189. /// - Throws: `CallError` if fails to call.
  190. public func start(_ style: CallStyle,
  191. metadata: Metadata,
  192. message: Data? = nil,
  193. completion: @escaping (CallResult) -> Void) throws {
  194. var operations: [Operation] = []
  195. switch style {
  196. case .unary:
  197. guard let message = message else {
  198. throw CallError.invalidMessage
  199. }
  200. operations = [
  201. .sendInitialMetadata(metadata.copy() as! Metadata),
  202. .receiveInitialMetadata,
  203. .receiveStatusOnClient,
  204. .sendMessage(ByteBuffer(data: message)),
  205. .sendCloseFromClient,
  206. .receiveMessage
  207. ]
  208. case .serverStreaming:
  209. guard let message = message else {
  210. throw CallError.invalidMessage
  211. }
  212. operations = [
  213. .sendInitialMetadata(metadata.copy() as! Metadata),
  214. .receiveInitialMetadata,
  215. .sendMessage(ByteBuffer(data: message)),
  216. .sendCloseFromClient
  217. ]
  218. case .clientStreaming, .bidiStreaming:
  219. operations = [
  220. .sendInitialMetadata(metadata.copy() as! Metadata),
  221. .receiveInitialMetadata
  222. ]
  223. }
  224. try perform(OperationGroup(call: self,
  225. operations: operations,
  226. completion: { op in completion(CallResult(op)) }))
  227. }
  228. /// Sends a message over a streaming connection.
  229. ///
  230. /// Parameter data: the message data to send
  231. /// - Throws: `CallError` if fails to call. `CallWarning` if blocked.
  232. public func sendMessage(data: Data, errorHandler: @escaping (Error) -> Void) throws {
  233. sendMutex.lock()
  234. defer { self.sendMutex.unlock() }
  235. if writing {
  236. if (Call.messageQueueMaxLength > 0) && // if max length is <= 0, consider it infinite
  237. (messageQueue.count == Call.messageQueueMaxLength) {
  238. throw CallWarning.blocked
  239. }
  240. messageQueue.append(data)
  241. } else {
  242. writing = true
  243. try sendWithoutBlocking(data: data, errorHandler: errorHandler)
  244. }
  245. }
  246. /// helper for sending queued messages
  247. private func sendWithoutBlocking(data: Data, errorHandler: @escaping (Error) -> Void) throws {
  248. try perform(OperationGroup(call: self,
  249. operations: [.sendMessage(ByteBuffer(data: data))]) { operationGroup in
  250. if operationGroup.success {
  251. self.messageDispatchQueue.async {
  252. self.sendMutex.synchronize {
  253. // if there are messages pending, send the next one
  254. if self.messageQueue.count > 0 {
  255. let nextMessage = self.messageQueue.removeFirst()
  256. do {
  257. try self.sendWithoutBlocking(data: nextMessage, errorHandler: errorHandler)
  258. } catch (let callError) {
  259. errorHandler(callError)
  260. }
  261. } else {
  262. // otherwise, we are finished writing
  263. self.writing = false
  264. }
  265. }
  266. }
  267. } else {
  268. // if the event failed, shut down
  269. self.writing = false
  270. errorHandler(CallError.unknown)
  271. }
  272. })
  273. }
  274. // Receive a message over a streaming connection.
  275. /// - Throws: `CallError` if fails to call.
  276. public func receiveMessage(callback: @escaping ((Data!) throws -> Void)) throws {
  277. try perform(OperationGroup(call: self, operations: [.receiveMessage]) { operationGroup in
  278. if operationGroup.success {
  279. if let messageBuffer = operationGroup.receivedMessage() {
  280. try callback(messageBuffer.data())
  281. } else {
  282. try callback(nil) // an empty response signals the end of a connection
  283. }
  284. }
  285. })
  286. }
  287. // Closes a streaming connection.
  288. /// - Throws: `CallError` if fails to call.
  289. public func close(completion: @escaping (() -> Void)) throws {
  290. try perform(OperationGroup(call: self, operations: [.sendCloseFromClient]) { _ in completion()
  291. })
  292. }
  293. // Get the current message queue length
  294. public func messageQueueLength() -> Int {
  295. return messageQueue.count
  296. }
  297. /// Finishes the request side of this call, notifies the server that the RPC should be cancelled,
  298. /// and finishes the response side of the call with an error of code CANCELED.
  299. public func cancel() {
  300. Call.callMutex.lock()
  301. cgrpc_call_cancel(underlyingCall)
  302. Call.callMutex.unlock()
  303. }
  304. }