Call.swift 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. import Dispatch
  19. #endif
  20. import Foundation
  21. public enum CallStyle {
  22. case unary
  23. case serverStreaming
  24. case clientStreaming
  25. case bidiStreaming
  26. }
  27. public enum CallWarning: Error {
  28. case blocked
  29. }
  30. /// A gRPC API call
  31. public class Call {
  32. /// Shared mutex for synchronizing calls to cgrpc_call_perform()
  33. private static let callMutex = Mutex()
  34. /// Maximum number of messages that can be queued
  35. public static var messageQueueMaxLength: Int? = nil
  36. /// Pointer to underlying C representation
  37. private let underlyingCall: UnsafeMutableRawPointer
  38. /// Completion queue used for call
  39. private let completionQueue: CompletionQueue
  40. /// True if this instance is responsible for deleting the underlying C representation
  41. private let owned: Bool
  42. /// A queue of pending messages to send over the call
  43. private var messageQueue: [(dataToSend: Data, completion: ((Error?) -> Void)?)] = []
  44. /// A dispatch group that contains all pending send operations.
  45. /// You can wait on it to ensure that all currently enqueued messages have been sent.
  46. public let messageQueueEmpty = DispatchGroup()
  47. /// True if a message write operation is underway
  48. private var writing: Bool
  49. /// Mutex for synchronizing message sending
  50. private let sendMutex: Mutex
  51. /// Initializes a Call representation
  52. ///
  53. /// - Parameter call: the underlying C representation
  54. /// - Parameter owned: true if this instance is responsible for deleting the underlying call
  55. init(underlyingCall: UnsafeMutableRawPointer, owned: Bool, completionQueue: CompletionQueue) {
  56. self.underlyingCall = underlyingCall
  57. self.owned = owned
  58. self.completionQueue = completionQueue
  59. writing = false
  60. sendMutex = Mutex()
  61. }
  62. deinit {
  63. if owned {
  64. cgrpc_call_destroy(underlyingCall)
  65. }
  66. }
  67. /// Initiates performance of a group of operations without waiting for completion.
  68. ///
  69. /// - Parameter operations: group of operations to be performed
  70. /// - Returns: the result of initiating the call
  71. /// - Throws: `CallError` if fails to call.
  72. func perform(_ operations: OperationGroup) throws {
  73. try completionQueue.register(operations) {
  74. Call.callMutex.lock()
  75. // We need to do the perform *inside* the `completionQueue.register` call, to ensure that the queue can't get
  76. // shutdown in between registering the operation group and calling `cgrpc_call_perform`.
  77. let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, UnsafeMutableRawPointer(bitPattern:operations.tag))
  78. Call.callMutex.unlock()
  79. if error != GRPC_CALL_OK {
  80. throw CallError.callError(grpcCallError: error)
  81. }
  82. }
  83. }
  84. /// Starts a gRPC API call.
  85. ///
  86. /// - Parameter style: the style of call to start
  87. /// - Parameter metadata: metadata to send with the call
  88. /// - Parameter message: data containing the message to send (.unary and .serverStreaming only)
  89. /// - Parameter completion: a block to call with call results
  90. /// The argument to `completion` will always have `.success = true`
  91. /// because operations containing `.receiveCloseOnClient` always succeed.
  92. /// Runs synchronously on the completion queue's thread. Should not be blocking.
  93. /// - Throws: `CallError` if fails to call.
  94. public func start(_ style: CallStyle,
  95. metadata: Metadata,
  96. message: Data? = nil,
  97. completion: ((CallResult) -> Void)? = nil) throws {
  98. var operations: [Operation] = []
  99. switch style {
  100. case .unary:
  101. guard let message = message else {
  102. throw CallError.invalidMessage
  103. }
  104. operations = [
  105. .sendInitialMetadata(metadata.copy()),
  106. .sendMessage(ByteBuffer(data:message)),
  107. .sendCloseFromClient,
  108. .receiveInitialMetadata,
  109. .receiveMessage,
  110. .receiveStatusOnClient,
  111. ]
  112. case .serverStreaming:
  113. guard let message = message else {
  114. throw CallError.invalidMessage
  115. }
  116. operations = [
  117. .sendInitialMetadata(metadata.copy()),
  118. .sendMessage(ByteBuffer(data:message)),
  119. .sendCloseFromClient,
  120. .receiveInitialMetadata,
  121. .receiveStatusOnClient,
  122. ]
  123. case .clientStreaming, .bidiStreaming:
  124. try perform(OperationGroup(call: self,
  125. operations: [
  126. .sendInitialMetadata(metadata.copy()),
  127. .receiveInitialMetadata
  128. ],
  129. completion: nil))
  130. try perform(OperationGroup(call: self,
  131. operations: [.receiveStatusOnClient],
  132. completion: completion != nil
  133. ? { op in completion?(CallResult(op)) }
  134. : nil))
  135. return
  136. }
  137. try perform(OperationGroup(call: self,
  138. operations: operations,
  139. completion: completion != nil
  140. ? { op in completion?(CallResult(op)) }
  141. : nil))
  142. }
  143. /// Sends a message over a streaming connection.
  144. ///
  145. /// - Parameter data: the message data to send
  146. /// - Parameter completion: Runs synchronously on the completion queue's thread once the message has been sent. Should not be blocking.
  147. /// - Throws: `CallError` if fails to call. `CallWarning` if blocked.
  148. public func sendMessage(data: Data, completion: ((Error?) -> Void)? = nil) throws {
  149. try sendMutex.synchronize {
  150. if writing {
  151. if let messageQueueMaxLength = Call.messageQueueMaxLength,
  152. messageQueue.count >= messageQueueMaxLength {
  153. throw CallWarning.blocked
  154. }
  155. messageQueue.append((dataToSend: data, completion: completion))
  156. } else {
  157. writing = true
  158. try sendWithoutBlocking(data: data, completion: completion)
  159. }
  160. messageQueueEmpty.enter()
  161. }
  162. }
  163. /// helper for sending queued messages
  164. private func sendWithoutBlocking(data: Data, completion: ((Error?) -> Void)?) throws {
  165. try perform(OperationGroup(
  166. call: self,
  167. operations: [.sendMessage(ByteBuffer(data: data))]) { operationGroup in
  168. // Always enqueue the next message, even if sending this one failed. This ensures that all send completion
  169. // handlers are called eventually.
  170. self.sendMutex.synchronize {
  171. // if there are messages pending, send the next one
  172. if self.messageQueue.count > 0 {
  173. let (nextMessage, nextCompletionHandler) = self.messageQueue.removeFirst()
  174. do {
  175. try self.sendWithoutBlocking(data: nextMessage, completion: nextCompletionHandler)
  176. } catch {
  177. nextCompletionHandler?(error)
  178. }
  179. } else {
  180. // otherwise, we are finished writing
  181. self.writing = false
  182. }
  183. }
  184. completion?(operationGroup.success ? nil : CallError.unknown)
  185. self.messageQueueEmpty.leave()
  186. })
  187. }
  188. // Receive a message over a streaming connection.
  189. /// - Parameter completion: Runs synchronously on the completion queue's thread once the message has been received. Should not be blocking.
  190. /// - Throws: `CallError` if fails to call.
  191. public func closeAndReceiveMessage(completion: @escaping (CallResult) -> Void) throws {
  192. try perform(OperationGroup(call: self, operations: [.sendCloseFromClient, .receiveMessage]) { operationGroup in
  193. completion(CallResult(operationGroup))
  194. })
  195. }
  196. // Receive a message over a streaming connection.
  197. /// - Parameter completion: Runs synchronously on the completion queue's thread once the message has been received. Should not be blocking.
  198. /// - Throws: `CallError` if fails to call.
  199. public func receiveMessage(completion: @escaping (CallResult) -> Void) throws {
  200. try perform(OperationGroup(call: self, operations: [.receiveMessage]) { operationGroup in
  201. completion(CallResult(operationGroup))
  202. })
  203. }
  204. // Closes a streaming connection.
  205. /// - Parameter completion: Runs synchronously on the completion queue's thread once the connection has been closed. Should not be blocking.
  206. /// - Throws: `CallError` if fails to call.
  207. public func close(completion: (() -> Void)? = nil) throws {
  208. try perform(OperationGroup(call: self, operations: [.sendCloseFromClient],
  209. completion: completion != nil
  210. ? { op in completion?() }
  211. : nil))
  212. }
  213. // Get the current message queue length
  214. public func messageQueueLength() -> Int {
  215. return messageQueue.count
  216. }
  217. /// Finishes the request side of this call, notifies the server that the RPC should be cancelled,
  218. /// and finishes the response side of the call with an error of code CANCELED.
  219. public func cancel() {
  220. Call.callMutex.synchronize {
  221. cgrpc_call_cancel(underlyingCall)
  222. }
  223. }
  224. }