BenchmarkService.swift 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import struct Foundation.Data
  19. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  20. struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
  21. /// Used to check if the server can be streaming responses.
  22. private let working = ManagedAtomic<Bool>(true)
  23. /// One request followed by one response.
  24. /// The server returns a client payload with the size requested by the client.
  25. func unaryCall(
  26. request: GRPCCore.ServerRequest.Single<Grpc_Testing_BenchmarkService.Method.UnaryCall.Input>
  27. ) async throws
  28. -> GRPCCore.ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.UnaryCall.Output>
  29. {
  30. // Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent
  31. // if the request is successful.
  32. if request.message.responseStatus.isInitialized {
  33. try self.checkOkStatus(request.message.responseStatus)
  34. }
  35. return ServerResponse.Single(
  36. message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with {
  37. $0.payload = Grpc_Testing_Payload.with {
  38. $0.body = Data(count: Int(request.message.responseSize))
  39. }
  40. }
  41. )
  42. }
  43. /// Repeated sequence of one request followed by one response.
  44. /// The server returns a payload with the size requested by the client for each received message.
  45. func streamingCall(
  46. request: GRPCCore.ServerRequest.Stream<Grpc_Testing_BenchmarkService.Method.StreamingCall.Input>
  47. ) async throws
  48. -> GRPCCore.ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingCall.Output>
  49. {
  50. return ServerResponse.Stream { writer in
  51. for try await message in request.messages {
  52. if message.responseStatus.isInitialized {
  53. try self.checkOkStatus(message.responseStatus)
  54. }
  55. try await writer.write(
  56. Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
  57. $0.payload = Grpc_Testing_Payload.with {
  58. $0.body = Data(count: Int(message.responseSize))
  59. }
  60. }
  61. )
  62. }
  63. return [:]
  64. }
  65. }
  66. /// Single-sided unbounded streaming from client to server.
  67. /// The server returns a payload with the size requested by the client once the client does WritesDone.
  68. func streamingFromClient(
  69. request: ServerRequest.Stream<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Input>
  70. ) async throws
  71. -> ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output>
  72. {
  73. var responseSize = 0
  74. for try await message in request.messages {
  75. if message.responseStatus.isInitialized {
  76. try self.checkOkStatus(message.responseStatus)
  77. }
  78. responseSize = Int(message.responseSize)
  79. }
  80. return ServerResponse.Single(
  81. message: Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output.with {
  82. $0.payload = Grpc_Testing_Payload.with {
  83. $0.body = Data(count: responseSize)
  84. }
  85. }
  86. )
  87. }
  88. /// Single-sided unbounded streaming from server to client.
  89. /// The server repeatedly returns a payload with the size requested by the client.
  90. func streamingFromServer(
  91. request: ServerRequest.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromServer.Input>
  92. ) async throws
  93. -> ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingFromServer.Output>
  94. {
  95. if request.message.responseStatus.isInitialized {
  96. try self.checkOkStatus(request.message.responseStatus)
  97. }
  98. let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
  99. $0.payload = Grpc_Testing_Payload.with {
  100. $0.body = Data(count: Int(request.message.responseSize))
  101. }
  102. }
  103. return ServerResponse.Stream { writer in
  104. while working.load(ordering: .relaxed) {
  105. try await writer.write(response)
  106. }
  107. return [:]
  108. }
  109. }
  110. /// Two-sided unbounded streaming between server to client.
  111. /// Both sides send the content of their own choice to the other.
  112. func streamingBothWays(
  113. request: GRPCCore.ServerRequest.Stream<
  114. Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Input
  115. >
  116. ) async throws
  117. -> ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Output>
  118. {
  119. // The 100 size is used by the other implementations as well.
  120. // We are using the same canned response size for all responses
  121. // as it is allowed by the spec.
  122. let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
  123. $0.payload = Grpc_Testing_Payload.with {
  124. $0.body = Data(count: 100)
  125. }
  126. }
  127. // Marks if the inbound streaming is ongoing or finished.
  128. let inboundStreaming = ManagedAtomic<Bool>(true)
  129. return ServerResponse.Stream { writer in
  130. try await withThrowingTaskGroup(of: Void.self) { group in
  131. group.addTask {
  132. for try await message in request.messages {
  133. if message.responseStatus.isInitialized {
  134. try self.checkOkStatus(message.responseStatus)
  135. }
  136. }
  137. inboundStreaming.store(false, ordering: .relaxed)
  138. }
  139. group.addTask {
  140. while inboundStreaming.load(ordering: .relaxed)
  141. && self.working.load(ordering: .acquiring)
  142. {
  143. try await writer.write(response)
  144. }
  145. }
  146. try await group.next()
  147. group.cancelAll()
  148. return [:]
  149. }
  150. }
  151. }
  152. }
  153. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  154. extension BenchmarkService {
  155. private func checkOkStatus(_ responseStatus: Grpc_Testing_EchoStatus) throws {
  156. guard let code = Status.Code(rawValue: Int(responseStatus.code)) else {
  157. throw RPCError(code: .invalidArgument, message: "The response status code is invalid.")
  158. }
  159. if let code = RPCError.Code(code) {
  160. throw RPCError(code: code, message: responseStatus.message)
  161. }
  162. }
  163. }