Call.swift 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. public enum CallStyle {
  22. case unary
  23. case serverStreaming
  24. case clientStreaming
  25. case bidiStreaming
  26. }
  27. public enum CallWarning : Error {
  28. case blocked
  29. }
  30. public enum CallError : Error {
  31. case ok
  32. case unknown
  33. case notOnServer
  34. case notOnClient
  35. case alreadyAccepted
  36. case alreadyInvoked
  37. case notInvoked
  38. case alreadyFinished
  39. case tooManyOperations
  40. case invalidFlags
  41. case invalidMetadata
  42. case invalidMessage
  43. case notServerCompletionQueue
  44. case batchTooBig
  45. case payloadTypeMismatch
  46. static func callError(grpcCallError error: grpc_call_error) -> CallError {
  47. switch(error) {
  48. case GRPC_CALL_OK:
  49. return .ok
  50. case GRPC_CALL_ERROR:
  51. return .unknown
  52. case GRPC_CALL_ERROR_NOT_ON_SERVER:
  53. return .notOnServer
  54. case GRPC_CALL_ERROR_NOT_ON_CLIENT:
  55. return .notOnClient
  56. case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
  57. return .alreadyAccepted
  58. case GRPC_CALL_ERROR_ALREADY_INVOKED:
  59. return .alreadyInvoked
  60. case GRPC_CALL_ERROR_NOT_INVOKED:
  61. return .notInvoked
  62. case GRPC_CALL_ERROR_ALREADY_FINISHED:
  63. return .alreadyFinished
  64. case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
  65. return .tooManyOperations
  66. case GRPC_CALL_ERROR_INVALID_FLAGS:
  67. return .invalidFlags
  68. case GRPC_CALL_ERROR_INVALID_METADATA:
  69. return .invalidMetadata
  70. case GRPC_CALL_ERROR_INVALID_MESSAGE:
  71. return .invalidMessage
  72. case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
  73. return .notServerCompletionQueue
  74. case GRPC_CALL_ERROR_BATCH_TOO_BIG:
  75. return .batchTooBig
  76. case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
  77. return .payloadTypeMismatch
  78. default:
  79. return .unknown
  80. }
  81. }
  82. }
  83. public struct CallResult : CustomStringConvertible {
  84. public var statusCode : Int
  85. public var statusMessage : String?
  86. public var resultData : Data?
  87. public var initialMetadata : Metadata?
  88. public var trailingMetadata : Metadata?
  89. fileprivate init(_ op : OperationGroup) {
  90. if (op.success) {
  91. if let statusCode = op.receivedStatusCode() {
  92. self.statusCode = statusCode
  93. } else {
  94. self.statusCode = 0
  95. }
  96. self.statusMessage = op.receivedStatusMessage()
  97. self.resultData = op.receivedMessage()?.data()
  98. self.initialMetadata = op.receivedInitialMetadata()
  99. self.trailingMetadata = op.receivedTrailingMetadata()
  100. } else {
  101. self.statusCode = 0
  102. self.statusMessage = nil
  103. self.resultData = nil
  104. self.initialMetadata = nil
  105. self.trailingMetadata = nil
  106. }
  107. }
  108. public var description : String {
  109. var result = "status \(statusCode)"
  110. if let statusMessage = self.statusMessage {
  111. result += ": " + statusMessage
  112. }
  113. if let resultData = self.resultData {
  114. result += "\n"
  115. result += resultData.description
  116. }
  117. if let initialMetadata = self.initialMetadata {
  118. result += "\n"
  119. result += initialMetadata.description
  120. }
  121. if let trailingMetadata = self.trailingMetadata {
  122. result += "\n"
  123. result += trailingMetadata.description
  124. }
  125. return result
  126. }
  127. }
  128. /// A gRPC API call
  129. public class Call {
  130. /// Shared mutex for synchronizing calls to cgrpc_call_perform()
  131. private static let callMutex = Mutex()
  132. /// Maximum number of messages that can be queued
  133. public static var messageQueueMaxLength = 0
  134. /// Pointer to underlying C representation
  135. private var underlyingCall : UnsafeMutableRawPointer
  136. /// Completion queue used for call
  137. private var completionQueue: CompletionQueue
  138. /// True if this instance is responsible for deleting the underlying C representation
  139. private var owned : Bool
  140. /// A queue of pending messages to send over the call
  141. private var messageQueue : Array<Data>
  142. /// True if a message write operation is underway
  143. private var writing : Bool
  144. /// Mutex for synchronizing message sending
  145. private var sendMutex : Mutex
  146. /// Dispatch queue used for sending messages asynchronously
  147. private var messageDispatchQueue: DispatchQueue = DispatchQueue.global()
  148. /// Initializes a Call representation
  149. ///
  150. /// - Parameter call: the underlying C representation
  151. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  152. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  153. self.underlyingCall = underlyingCall
  154. self.owned = owned
  155. self.completionQueue = completionQueue
  156. self.messageQueue = []
  157. self.writing = false
  158. self.sendMutex = Mutex()
  159. }
  160. deinit {
  161. if (owned) {
  162. cgrpc_call_destroy(underlyingCall)
  163. }
  164. }
  165. /// Initiates performance of a group of operations without waiting for completion.
  166. ///
  167. /// - Parameter operations: group of operations to be performed
  168. /// - Returns: the result of initiating the call
  169. internal func perform(_ operations: OperationGroup) throws -> Void {
  170. completionQueue.register(operations)
  171. Call.callMutex.lock()
  172. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
  173. Call.callMutex.unlock()
  174. if error != GRPC_CALL_OK {
  175. throw CallError.callError(grpcCallError:error)
  176. }
  177. }
  178. /// Starts a gRPC API call.
  179. ///
  180. /// - Parameter style: the style of call to start
  181. /// - Parameter metadata: metadata to send with the call
  182. /// - Parameter message: data containing the message to send (.unary and .serverStreaming only)
  183. /// - Parameter callback: a block to call with call results
  184. public func start(_ style: CallStyle,
  185. metadata: Metadata,
  186. message: Data? = nil,
  187. completion: @escaping (CallResult) -> Void)
  188. throws -> Void {
  189. var operations : [Operation] = []
  190. switch style {
  191. case .unary:
  192. guard let message = message else {
  193. throw CallError.invalidMessage
  194. }
  195. operations = [.sendInitialMetadata(metadata.copy() as! Metadata),
  196. .receiveInitialMetadata,
  197. .receiveStatusOnClient,
  198. .sendMessage(ByteBuffer(data:message)),
  199. .sendCloseFromClient,
  200. .receiveMessage]
  201. case .serverStreaming:
  202. guard let message = message else {
  203. throw CallError.invalidMessage
  204. }
  205. operations = [.sendInitialMetadata(metadata.copy() as! Metadata),
  206. .receiveInitialMetadata,
  207. .sendMessage(ByteBuffer(data:message)),
  208. .sendCloseFromClient
  209. ]
  210. case .clientStreaming, .bidiStreaming:
  211. operations = [.sendInitialMetadata(metadata.copy() as! Metadata),
  212. .receiveInitialMetadata]
  213. }
  214. try self.perform(OperationGroup(call:self,
  215. operations:operations,
  216. completion:{(op) in completion(CallResult(op))}))
  217. }
  218. /// Sends a message over a streaming connection.
  219. ///
  220. /// Parameter data: the message data to send
  221. public func sendMessage(data: Data, errorHandler:@escaping (Error)->()) throws {
  222. self.sendMutex.lock()
  223. defer {self.sendMutex.unlock()}
  224. if self.writing {
  225. if (Call.messageQueueMaxLength > 0) && // if max length is <= 0, consider it infinite
  226. (self.messageQueue.count == Call.messageQueueMaxLength) {
  227. throw CallWarning.blocked
  228. }
  229. self.messageQueue.append(data)
  230. } else {
  231. self.writing = true
  232. try self.sendWithoutBlocking(data: data, errorHandler:errorHandler)
  233. }
  234. }
  235. /// helper for sending queued messages
  236. private func sendWithoutBlocking(data: Data, errorHandler:@escaping (Error)->())
  237. throws -> Void {
  238. try self.perform(OperationGroup(call:self,
  239. operations:[.sendMessage(ByteBuffer(data:data))])
  240. {(operationGroup) in
  241. if operationGroup.success {
  242. self.messageDispatchQueue.async {
  243. self.sendMutex.synchronize {
  244. // if there are messages pending, send the next one
  245. if self.messageQueue.count > 0 {
  246. let nextMessage = self.messageQueue.removeFirst()
  247. do {
  248. try self.sendWithoutBlocking(data: nextMessage, errorHandler:errorHandler)
  249. } catch (let callError) {
  250. errorHandler(callError)
  251. }
  252. } else {
  253. // otherwise, we are finished writing
  254. self.writing = false
  255. }
  256. }
  257. }
  258. } else {
  259. // if the event failed, shut down
  260. self.writing = false
  261. errorHandler(CallError.unknown)
  262. }
  263. })
  264. }
  265. // Receive a message over a streaming connection.
  266. public func receiveMessage(callback:@escaping ((Data!) throws -> Void)) throws -> Void {
  267. try self.perform(OperationGroup(call:self, operations:[.receiveMessage])
  268. {(operationGroup) in
  269. if operationGroup.success {
  270. if let messageBuffer = operationGroup.receivedMessage() {
  271. try callback(messageBuffer.data())
  272. } else {
  273. try callback(nil) // an empty response signals the end of a connection
  274. }
  275. }
  276. })
  277. }
  278. // Closes a streaming connection.
  279. public func close(completion:@escaping (() -> Void)) throws -> Void {
  280. try self.perform(OperationGroup(call:self, operations:[.sendCloseFromClient])
  281. {(operationGroup) in completion()
  282. })
  283. }
  284. // Get the current message queue length
  285. public func messageQueueLength() -> Int {
  286. return messageQueue.count
  287. }
  288. }