2
0

OperationGroup.swift 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. * Copyright 2016, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if SWIFT_PACKAGE
  17. import CgRPC
  18. #endif
  19. /// A collection of gRPC operations
  20. class OperationGroup {
  21. /// A mutex for synchronizing tag generation
  22. static let tagMutex = Mutex()
  23. /// Used to generate unique tags for OperationGroups
  24. private static var nextTag: Int64 = 1
  25. /// Automatically-assigned tag that is used by the completion queue that watches this group.
  26. let tag: Int64
  27. /// The call associated with the operation group. Retained while the operations are running.
  28. private let call: Call
  29. /// An array of operation objects that are passed into the initializer.
  30. private let operations: [Operation]
  31. /// An array of observers used to watch the operation
  32. private var underlyingObservers: [UnsafeMutableRawPointer] = []
  33. /// Pointer to underlying C representation
  34. let underlyingOperations: UnsafeMutableRawPointer?
  35. /// Completion handler that is called when the group completes
  36. let completion: ((OperationGroup) throws -> Void)?
  37. /// Indicates that the OperationGroup completed successfully
  38. var success = false
  39. /// Creates the underlying observer needed to run an operation
  40. ///
  41. /// - Parameter: operation: the operation to observe
  42. /// - Returns: the observer
  43. private func underlyingObserverForOperation(operation: Operation) -> UnsafeMutableRawPointer {
  44. let underlyingObserver: UnsafeMutableRawPointer
  45. switch operation {
  46. case .sendInitialMetadata(let metadata):
  47. underlyingObserver = cgrpc_observer_create_send_initial_metadata(metadata.underlyingArray)!
  48. case .sendMessage(let message):
  49. underlyingObserver = cgrpc_observer_create_send_message()!
  50. cgrpc_observer_send_message_set_message(underlyingObserver, message.underlyingByteBuffer)
  51. case .sendCloseFromClient:
  52. underlyingObserver = cgrpc_observer_create_send_close_from_client()!
  53. case .sendStatusFromServer(let statusCode, let statusMessage, let metadata):
  54. underlyingObserver = cgrpc_observer_create_send_status_from_server(metadata.underlyingArray)!
  55. cgrpc_observer_send_status_from_server_set_status(underlyingObserver, Int32(statusCode.rawValue))
  56. cgrpc_observer_send_status_from_server_set_status_details(underlyingObserver, statusMessage)
  57. case .receiveInitialMetadata:
  58. underlyingObserver = cgrpc_observer_create_recv_initial_metadata()!
  59. case .receiveMessage:
  60. underlyingObserver = cgrpc_observer_create_recv_message()!
  61. case .receiveStatusOnClient:
  62. underlyingObserver = cgrpc_observer_create_recv_status_on_client()!
  63. case .receiveCloseOnServer:
  64. underlyingObserver = cgrpc_observer_create_recv_close_on_server()!
  65. }
  66. return underlyingObserver
  67. }
  68. /// Initializes an OperationGroup representation
  69. ///
  70. /// - Parameter operations: an array of operations
  71. init(call: Call,
  72. operations: [Operation],
  73. completion: ((OperationGroup) throws -> Void)? = nil) {
  74. self.call = call
  75. self.operations = operations
  76. self.completion = completion
  77. // set tag to a unique value (per execution)
  78. OperationGroup.tagMutex.lock()
  79. tag = OperationGroup.nextTag
  80. OperationGroup.nextTag += 1
  81. OperationGroup.tagMutex.unlock()
  82. // create underlying observers and operations
  83. underlyingOperations = cgrpc_operations_create()
  84. cgrpc_operations_reserve_space_for_operations(underlyingOperations, Int32(operations.count))
  85. for operation in operations {
  86. let underlyingObserver = underlyingObserverForOperation(operation: operation)
  87. underlyingObservers.append(underlyingObserver)
  88. cgrpc_operations_add_operation(underlyingOperations, underlyingObserver)
  89. }
  90. }
  91. deinit {
  92. for underlyingObserver in underlyingObservers {
  93. cgrpc_observer_destroy(underlyingObserver)
  94. }
  95. cgrpc_operations_destroy(underlyingOperations)
  96. }
  97. /// WARNING: The following assumes that at most one operation of each type is in the group.
  98. /// Gets the message that was received
  99. ///
  100. /// - Returns: message
  101. func receivedMessage() -> ByteBuffer? {
  102. for (i, operation) in operations.enumerated() {
  103. switch operation {
  104. case .receiveMessage:
  105. if let b = cgrpc_observer_recv_message_get_message(underlyingObservers[i]) {
  106. return ByteBuffer(underlyingByteBuffer: b)
  107. } else {
  108. return nil
  109. }
  110. default: continue
  111. }
  112. }
  113. return nil
  114. }
  115. /// Gets initial metadata that was received
  116. ///
  117. /// - Returns: metadata
  118. func receivedInitialMetadata() -> Metadata? {
  119. for (i, operation) in operations.enumerated() {
  120. switch operation {
  121. case .receiveInitialMetadata:
  122. return Metadata(underlyingArray: cgrpc_observer_recv_initial_metadata_get_metadata(underlyingObservers[i]))
  123. default:
  124. continue
  125. }
  126. }
  127. return nil
  128. }
  129. /// Gets a status code that was received
  130. ///
  131. /// - Returns: status code
  132. func receivedStatusCode() -> Int? {
  133. for (i, operation) in operations.enumerated() {
  134. switch operation {
  135. case .receiveStatusOnClient:
  136. return cgrpc_observer_recv_status_on_client_get_status(underlyingObservers[i])
  137. default:
  138. continue
  139. }
  140. }
  141. return nil
  142. }
  143. /// Gets a status message that was received
  144. ///
  145. /// - Returns: status message
  146. func receivedStatusMessage() -> String? {
  147. for (i, operation) in operations.enumerated() {
  148. switch operation {
  149. case .receiveStatusOnClient:
  150. // We actually know that this method will never return nil, so we can forcibly the result. (Also below.)
  151. let string = cgrpc_observer_recv_status_on_client_copy_status_details(underlyingObservers[i])!
  152. defer { cgrpc_free_copied_string(string) }
  153. return String(cString: string, encoding: String.Encoding.utf8)
  154. default:
  155. continue
  156. }
  157. }
  158. return nil
  159. }
  160. /// Gets trailing metadata that was received
  161. ///
  162. /// - Returns: metadata
  163. func receivedTrailingMetadata() -> Metadata? {
  164. for (i, operation) in operations.enumerated() {
  165. switch operation {
  166. case .receiveStatusOnClient:
  167. return Metadata(underlyingArray: cgrpc_observer_recv_status_on_client_get_metadata(underlyingObservers[i]))
  168. default:
  169. continue
  170. }
  171. }
  172. return nil
  173. }
  174. }