ControlService.swift 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import GRPCCore
  18. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  19. struct ControlService: RegistrableRPCService {
  20. func registerMethods(with router: inout RPCRouter) {
  21. router.registerHandler(
  22. forMethod: MethodDescriptor(service: "Control", method: "Unary"),
  23. deserializer: JSONDeserializer<ControlInput>(),
  24. serializer: JSONSerializer<ControlOutput>(),
  25. handler: { request, context in
  26. return try await self.handle(request: request)
  27. }
  28. )
  29. router.registerHandler(
  30. forMethod: MethodDescriptor(service: "Control", method: "ServerStream"),
  31. deserializer: JSONDeserializer<ControlInput>(),
  32. serializer: JSONSerializer<ControlOutput>(),
  33. handler: { request, context in
  34. return try await self.handle(request: request)
  35. }
  36. )
  37. router.registerHandler(
  38. forMethod: MethodDescriptor(service: "Control", method: "ClientStream"),
  39. deserializer: JSONDeserializer<ControlInput>(),
  40. serializer: JSONSerializer<ControlOutput>(),
  41. handler: { request, context in
  42. return try await self.handle(request: request)
  43. }
  44. )
  45. router.registerHandler(
  46. forMethod: MethodDescriptor(service: "Control", method: "BidiStream"),
  47. deserializer: JSONDeserializer<ControlInput>(),
  48. serializer: JSONSerializer<ControlOutput>(),
  49. handler: { request, context in
  50. return try await self.handle(request: request)
  51. }
  52. )
  53. }
  54. }
  55. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  56. extension ControlService {
  57. private func handle(
  58. request: ServerRequest.Stream<ControlInput>
  59. ) async throws -> ServerResponse.Stream<ControlOutput> {
  60. var iterator = request.messages.makeAsyncIterator()
  61. guard let message = try await iterator.next() else {
  62. // Empty input stream, empty output stream.
  63. return ServerResponse.Stream { _ in [:] }
  64. }
  65. // Check if the request is for a trailers-only response.
  66. if let status = message.status, message.isTrailersOnly {
  67. let trailers = message.echoMetadataInTrailers ? request.metadata.echo() : [:]
  68. let code = Status.Code(rawValue: status.code.rawValue).flatMap { RPCError.Code($0) }
  69. if let code = code {
  70. throw RPCError(code: code, message: status.message, metadata: trailers)
  71. } else {
  72. // Invalid code, the request is invalid, so throw an appropriate error.
  73. throw RPCError(
  74. code: .invalidArgument,
  75. message: "Trailers only response must use a non-OK status code"
  76. )
  77. }
  78. }
  79. // Not a trailers-only response. Should the metadata be echo'd back?
  80. let metadata = message.echoMetadataInHeaders ? request.metadata.echo() : [:]
  81. // The iterator needs to be transferred into the response. This is okay: we won't touch the
  82. // iterator again from the current concurrency domain.
  83. let transfer = UnsafeTransfer(iterator)
  84. return ServerResponse.Stream(metadata: metadata) { writer in
  85. // Finish dealing with the first message.
  86. switch try await self.processMessage(message, metadata: request.metadata, writer: writer) {
  87. case .return(let metadata):
  88. return metadata
  89. case .continue:
  90. ()
  91. }
  92. var iterator = transfer.wrappedValue
  93. // Process the rest of the messages.
  94. while let message = try await iterator.next() {
  95. switch try await self.processMessage(message, metadata: request.metadata, writer: writer) {
  96. case .return(let metadata):
  97. return metadata
  98. case .continue:
  99. ()
  100. }
  101. }
  102. // Input stream finished without explicitly setting a status; finish the RPC cleanly.
  103. return [:]
  104. }
  105. }
  106. private enum NextProcessingStep {
  107. case `return`(Metadata)
  108. case `continue`
  109. }
  110. private func processMessage(
  111. _ input: ControlInput,
  112. metadata: Metadata,
  113. writer: RPCWriter<ControlOutput>
  114. ) async throws -> NextProcessingStep {
  115. // If messages were requested, build a response and send them back.
  116. if input.numberOfMessages > 0 {
  117. let output = ControlOutput(
  118. payload: Data(
  119. repeating: input.payloadParameters.content,
  120. count: input.payloadParameters.size
  121. )
  122. )
  123. for _ in 0 ..< input.numberOfMessages {
  124. try await writer.write(output)
  125. }
  126. }
  127. // Check whether the RPC should be finished (i.e. the input `hasStatus`).
  128. guard let status = input.status else {
  129. if input.echoMetadataInTrailers {
  130. // There was no status in the input, but echo metadata in trailers was set. This is an
  131. // implicit 'ok' status.
  132. let trailers = input.echoMetadataInTrailers ? metadata.echo() : [:]
  133. return .return(trailers)
  134. } else {
  135. // No status, and not echoing back metadata. Continue consuming the input stream.
  136. return .continue
  137. }
  138. }
  139. // Build the trailers.
  140. let trailers = input.echoMetadataInTrailers ? metadata.echo() : [:]
  141. if status.code == .ok {
  142. return .return(trailers)
  143. }
  144. // Non-OK status code, throw an error.
  145. let code = RPCError.Code(status.code)
  146. if let code = code {
  147. // Valid error code, throw it.
  148. throw RPCError(code: code, message: status.message, metadata: trailers)
  149. } else {
  150. // Invalid error code, throw an appropriate error.
  151. throw RPCError(
  152. code: .invalidArgument,
  153. message: "Invalid error code '\(status.code)'"
  154. )
  155. }
  156. }
  157. }
  158. extension Metadata {
  159. fileprivate func echo() -> Self {
  160. var copy = Metadata()
  161. copy.reserveCapacity(self.count)
  162. for (key, value) in self {
  163. // Header field names mustn't contain ":".
  164. let key = "echo-" + key.replacingOccurrences(of: ":", with: "")
  165. switch value {
  166. case .string(let stringValue):
  167. copy.addString(stringValue, forKey: key)
  168. case .binary(let binaryValue):
  169. copy.addBinary(binaryValue, forKey: key)
  170. }
  171. }
  172. return copy
  173. }
  174. }
  175. private struct UnsafeTransfer<Wrapped> {
  176. var wrappedValue: Wrapped
  177. init(_ wrappedValue: Wrapped) {
  178. self.wrappedValue = wrappedValue
  179. }
  180. }
  181. extension UnsafeTransfer: @unchecked Sendable {}