ControlService.swift 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import struct Foundation.Data
  18. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  19. struct ControlService: ControlStreamingServiceProtocol {
  20. func unary(
  21. request: ServerRequest.Stream<Control.Method.Unary.Input>,
  22. context: ServerContext
  23. ) async throws -> ServerResponse.Stream<Control.Method.Unary.Output> {
  24. try await self.handle(request: request)
  25. }
  26. func serverStream(
  27. request: ServerRequest.Stream<Control.Method.ServerStream.Input>,
  28. context: ServerContext
  29. ) async throws -> ServerResponse.Stream<Control.Method.ServerStream.Output> {
  30. try await self.handle(request: request)
  31. }
  32. func clientStream(
  33. request: ServerRequest.Stream<Control.Method.ClientStream.Input>,
  34. context: ServerContext
  35. ) async throws -> ServerResponse.Stream<Control.Method.ClientStream.Output> {
  36. try await self.handle(request: request)
  37. }
  38. func bidiStream(
  39. request: ServerRequest.Stream<Control.Method.BidiStream.Input>,
  40. context: ServerContext
  41. ) async throws -> ServerResponse.Stream<Control.Method.BidiStream.Output> {
  42. try await self.handle(request: request)
  43. }
  44. }
  45. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  46. extension ControlService {
  47. private func handle(
  48. request: ServerRequest.Stream<ControlInput>
  49. ) async throws -> ServerResponse.Stream<ControlOutput> {
  50. var iterator = request.messages.makeAsyncIterator()
  51. guard let message = try await iterator.next() else {
  52. // Empty input stream, empty output stream.
  53. return ServerResponse.Stream { _ in [:] }
  54. }
  55. // Check if the request is for a trailers-only response.
  56. if message.hasStatus, message.isTrailersOnly {
  57. let trailers = message.echoMetadataInTrailers ? request.metadata.echo() : [:]
  58. let code = Status.Code(rawValue: message.status.code.rawValue).flatMap { RPCError.Code($0) }
  59. if let code = code {
  60. throw RPCError(code: code, message: message.status.message, metadata: trailers)
  61. } else {
  62. // Invalid code, the request is invalid, so throw an appropriate error.
  63. throw RPCError(
  64. code: .invalidArgument,
  65. message: "Trailers only response must use a non-OK status code"
  66. )
  67. }
  68. }
  69. // Not a trailers-only response. Should the metadata be echo'd back?
  70. let metadata = message.echoMetadataInHeaders ? request.metadata.echo() : [:]
  71. // The iterator needs to be transferred into the response. This is okay: we won't touch the
  72. // iterator again from the current concurrency domain.
  73. let transfer = UnsafeTransfer(iterator)
  74. return ServerResponse.Stream(metadata: metadata) { writer in
  75. // Finish dealing with the first message.
  76. switch try await self.processMessage(message, metadata: request.metadata, writer: writer) {
  77. case .return(let metadata):
  78. return metadata
  79. case .continue:
  80. ()
  81. }
  82. var iterator = transfer.wrappedValue
  83. // Process the rest of the messages.
  84. while let message = try await iterator.next() {
  85. switch try await self.processMessage(message, metadata: request.metadata, writer: writer) {
  86. case .return(let metadata):
  87. return metadata
  88. case .continue:
  89. ()
  90. }
  91. }
  92. // Input stream finished without explicitly setting a status; finish the RPC cleanly.
  93. return [:]
  94. }
  95. }
  96. private enum NextProcessingStep {
  97. case `return`(Metadata)
  98. case `continue`
  99. }
  100. private func processMessage(
  101. _ input: ControlInput,
  102. metadata: Metadata,
  103. writer: RPCWriter<ControlOutput>
  104. ) async throws -> NextProcessingStep {
  105. // If messages were requested, build a response and send them back.
  106. if input.numberOfMessages > 0 {
  107. let output = ControlOutput.with {
  108. $0.payload = Data(
  109. repeating: UInt8(truncatingIfNeeded: input.messageParams.content),
  110. count: Int(input.messageParams.size)
  111. )
  112. }
  113. for _ in 0 ..< input.numberOfMessages {
  114. try await writer.write(output)
  115. }
  116. }
  117. // Check whether the RPC should be finished (i.e. the input `hasStatus`).
  118. guard input.hasStatus else {
  119. if input.echoMetadataInTrailers {
  120. // There was no status in the input, but echo metadata in trailers was set. This is an
  121. // implicit 'ok' status.
  122. let trailers = input.echoMetadataInTrailers ? metadata.echo() : [:]
  123. return .return(trailers)
  124. } else {
  125. // No status, and not echoing back metadata. Continue consuming the input stream.
  126. return .continue
  127. }
  128. }
  129. // Build the trailers.
  130. let trailers = input.echoMetadataInTrailers ? metadata.echo() : [:]
  131. if input.status.code == .ok {
  132. return .return(trailers)
  133. }
  134. // Non-OK status code, throw an error.
  135. let code = Status.Code(rawValue: input.status.code.rawValue).flatMap { RPCError.Code($0) }
  136. if let code = code {
  137. // Valid error code, throw it.
  138. throw RPCError(code: code, message: input.status.message, metadata: trailers)
  139. } else {
  140. // Invalid error code, throw an appropriate error.
  141. throw RPCError(
  142. code: .invalidArgument,
  143. message: "Invalid error code '\(input.status.code)'"
  144. )
  145. }
  146. }
  147. }
  148. extension Metadata {
  149. fileprivate func echo() -> Self {
  150. var copy = Metadata()
  151. copy.reserveCapacity(self.count)
  152. for (key, value) in self {
  153. // Header field names mustn't contain ":".
  154. let key = "echo-" + key.replacingOccurrences(of: ":", with: "")
  155. switch value {
  156. case .string(let stringValue):
  157. copy.addString(stringValue, forKey: key)
  158. case .binary(let binaryValue):
  159. copy.addBinary(binaryValue, forKey: key)
  160. }
  161. }
  162. return copy
  163. }
  164. }
  165. private struct UnsafeTransfer<Wrapped> {
  166. var wrappedValue: Wrapped
  167. init(_ wrappedValue: Wrapped) {
  168. self.wrappedValue = wrappedValue
  169. }
  170. }
  171. extension UnsafeTransfer: @unchecked Sendable {}