GRPCClientTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCInProcessTransport
  19. import XCTest
  20. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  21. final class GRPCClientTests: XCTestCase {
  22. func withInProcessConnectedClient(
  23. services: [any RegistrableRPCService],
  24. interceptors: [any ClientInterceptor] = [],
  25. _ body: (GRPCClient, GRPCServer) async throws -> Void
  26. ) async throws {
  27. let inProcess = InProcessTransport.makePair()
  28. let client = GRPCClient(transport: inProcess.client, interceptors: interceptors)
  29. let server = GRPCServer(transport: inProcess.server, services: services)
  30. try await withThrowingTaskGroup(of: Void.self) { group in
  31. group.addTask {
  32. try await server.run()
  33. }
  34. group.addTask {
  35. try await client.run()
  36. }
  37. // Make sure both server and client are running
  38. try await Task.sleep(for: .milliseconds(100))
  39. try await body(client, server)
  40. client.close()
  41. server.stopListening()
  42. }
  43. }
  44. struct IdentitySerializer: MessageSerializer {
  45. typealias Message = [UInt8]
  46. func serialize(_ message: [UInt8]) throws -> [UInt8] {
  47. return message
  48. }
  49. }
  50. struct IdentityDeserializer: MessageDeserializer {
  51. typealias Message = [UInt8]
  52. func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] {
  53. return serializedMessageBytes
  54. }
  55. }
  56. func testUnary() async throws {
  57. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  58. try await client.unary(
  59. request: .init(message: [3, 1, 4, 1, 5]),
  60. descriptor: BinaryEcho.Methods.collect,
  61. serializer: IdentitySerializer(),
  62. deserializer: IdentityDeserializer()
  63. ) { response in
  64. let message = try response.message
  65. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  66. }
  67. }
  68. }
  69. func testClientStreaming() async throws {
  70. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  71. try await client.clientStreaming(
  72. request: .init(producer: { writer in
  73. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  74. try await writer.write([byte])
  75. }
  76. }),
  77. descriptor: BinaryEcho.Methods.collect,
  78. serializer: IdentitySerializer(),
  79. deserializer: IdentityDeserializer()
  80. ) { response in
  81. let message = try response.message
  82. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  83. }
  84. }
  85. }
  86. func testServerStreaming() async throws {
  87. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  88. try await client.serverStreaming(
  89. request: .init(message: [3, 1, 4, 1, 5]),
  90. descriptor: BinaryEcho.Methods.expand,
  91. serializer: IdentitySerializer(),
  92. deserializer: IdentityDeserializer()
  93. ) { response in
  94. var responseParts = response.messages.makeAsyncIterator()
  95. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  96. let message = try await responseParts.next()
  97. XCTAssertEqual(message, [byte])
  98. }
  99. }
  100. }
  101. }
  102. func testBidirectionalStreaming() async throws {
  103. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  104. try await client.bidirectionalStreaming(
  105. request: .init(producer: { writer in
  106. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  107. try await writer.write([byte])
  108. }
  109. }),
  110. descriptor: BinaryEcho.Methods.update,
  111. serializer: IdentitySerializer(),
  112. deserializer: IdentityDeserializer()
  113. ) { response in
  114. var responseParts = response.messages.makeAsyncIterator()
  115. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  116. let message = try await responseParts.next()
  117. XCTAssertEqual(message, [byte])
  118. }
  119. }
  120. }
  121. }
  122. func testUnimplementedMethod_Unary() async throws {
  123. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  124. try await client.unary(
  125. request: .init(message: [3, 1, 4, 1, 5]),
  126. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  127. serializer: IdentitySerializer(),
  128. deserializer: IdentityDeserializer()
  129. ) { response in
  130. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  131. XCTAssertEqual(error.code, .unimplemented)
  132. }
  133. }
  134. }
  135. }
  136. func testUnimplementedMethod_ClientStreaming() async throws {
  137. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  138. try await client.clientStreaming(
  139. request: .init(producer: { writer in
  140. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  141. try await writer.write([byte])
  142. }
  143. }),
  144. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  145. serializer: IdentitySerializer(),
  146. deserializer: IdentityDeserializer()
  147. ) { response in
  148. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  149. XCTAssertEqual(error.code, .unimplemented)
  150. }
  151. }
  152. }
  153. }
  154. func testUnimplementedMethod_ServerStreaming() async throws {
  155. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  156. try await client.serverStreaming(
  157. request: .init(message: [3, 1, 4, 1, 5]),
  158. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  159. serializer: IdentitySerializer(),
  160. deserializer: IdentityDeserializer()
  161. ) { response in
  162. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  163. XCTAssertEqual(error.code, .unimplemented)
  164. }
  165. }
  166. }
  167. }
  168. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  169. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  170. try await client.bidirectionalStreaming(
  171. request: .init(producer: { writer in
  172. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  173. try await writer.write([byte])
  174. }
  175. }),
  176. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  177. serializer: IdentitySerializer(),
  178. deserializer: IdentityDeserializer()
  179. ) { response in
  180. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  181. XCTAssertEqual(error.code, .unimplemented)
  182. }
  183. }
  184. }
  185. }
  186. func testMultipleConcurrentRequests() async throws {
  187. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  188. await withThrowingTaskGroup(of: Void.self) { group in
  189. for i in UInt8.min ..< UInt8.max {
  190. group.addTask {
  191. try await client.unary(
  192. request: .init(message: [i]),
  193. descriptor: BinaryEcho.Methods.collect,
  194. serializer: IdentitySerializer(),
  195. deserializer: IdentityDeserializer()
  196. ) { response in
  197. let message = try response.message
  198. XCTAssertEqual(message, [i])
  199. }
  200. }
  201. }
  202. }
  203. }
  204. }
  205. func testInterceptorsAreAppliedInOrder() async throws {
  206. let counter1 = ManagedAtomic(0)
  207. let counter2 = ManagedAtomic(0)
  208. try await self.withInProcessConnectedClient(
  209. services: [BinaryEcho()],
  210. interceptors: [
  211. .requestCounter(counter1),
  212. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  213. .requestCounter(counter2),
  214. ]
  215. ) { client, _ in
  216. try await client.unary(
  217. request: .init(message: [3, 1, 4, 1, 5]),
  218. descriptor: BinaryEcho.Methods.collect,
  219. serializer: IdentitySerializer(),
  220. deserializer: IdentityDeserializer()
  221. ) { response in
  222. XCTAssertRejected(response) { error in
  223. XCTAssertEqual(error.code, .unavailable)
  224. }
  225. }
  226. }
  227. XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1)
  228. XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0)
  229. }
  230. func testNoNewRPCsAfterClientClose() async throws {
  231. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  232. // Run an RPC so we know the client is running properly.
  233. try await client.unary(
  234. request: .init(message: [3, 1, 4, 1, 5]),
  235. descriptor: BinaryEcho.Methods.collect,
  236. serializer: IdentitySerializer(),
  237. deserializer: IdentityDeserializer()
  238. ) { response in
  239. let message = try response.message
  240. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  241. }
  242. // New RPCs should fail immediately after this.
  243. client.close()
  244. // RPC should fail now.
  245. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  246. try await client.unary(
  247. request: .init(message: [3, 1, 4, 1, 5]),
  248. descriptor: BinaryEcho.Methods.collect,
  249. serializer: IdentitySerializer(),
  250. deserializer: IdentityDeserializer()
  251. ) { _ in }
  252. } errorHandler: { error in
  253. XCTAssertEqual(error.code, .clientIsStopped)
  254. }
  255. }
  256. }
  257. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  258. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  259. try await client.clientStreaming(
  260. request: .init(producer: { writer in
  261. // Close the client once this RCP has been started.
  262. client.close()
  263. // Attempts to start a new RPC should fail.
  264. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  265. try await client.unary(
  266. request: .init(message: [3, 1, 4, 1, 5]),
  267. descriptor: BinaryEcho.Methods.collect,
  268. serializer: IdentitySerializer(),
  269. deserializer: IdentityDeserializer()
  270. ) { _ in }
  271. } errorHandler: { error in
  272. XCTAssertEqual(error.code, .clientIsStopped)
  273. }
  274. // Now write to the already opened stream to confirm that opened streams
  275. // can successfully run to completion.
  276. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  277. try await writer.write([byte])
  278. }
  279. }),
  280. descriptor: BinaryEcho.Methods.collect,
  281. serializer: IdentitySerializer(),
  282. deserializer: IdentityDeserializer()
  283. ) { response in
  284. let message = try response.message
  285. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  286. }
  287. }
  288. }
  289. func testCancelRunningClient() async throws {
  290. let inProcess = InProcessTransport.makePair()
  291. let client = GRPCClient(transport: inProcess.client)
  292. try await withThrowingTaskGroup(of: Void.self) { group in
  293. group.addTask {
  294. let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()])
  295. try await server.run()
  296. }
  297. group.addTask {
  298. try await client.run()
  299. }
  300. // Wait for client and server to be running.
  301. try await Task.sleep(for: .milliseconds(10))
  302. let task = Task {
  303. try await client.clientStreaming(
  304. request: .init(producer: { writer in
  305. try await Task.sleep(for: .seconds(5))
  306. }),
  307. descriptor: BinaryEcho.Methods.collect,
  308. serializer: IdentitySerializer(),
  309. deserializer: IdentityDeserializer()
  310. ) { response in
  311. XCTAssertRejected(response) { error in
  312. XCTAssertEqual(error.code, .unknown)
  313. }
  314. }
  315. }
  316. // Check requests are getting through.
  317. try await client.unary(
  318. request: .init(message: [3, 1, 4, 1, 5]),
  319. descriptor: BinaryEcho.Methods.collect,
  320. serializer: IdentitySerializer(),
  321. deserializer: IdentityDeserializer()
  322. ) { response in
  323. let message = try response.message
  324. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  325. }
  326. task.cancel()
  327. try await task.value
  328. group.cancelAll()
  329. }
  330. }
  331. func testRunStoppedClient() async throws {
  332. let (_, clientTransport) = InProcessTransport.makePair()
  333. let client = GRPCClient(transport: clientTransport)
  334. // Run the client.
  335. let task = Task { try await client.run() }
  336. task.cancel()
  337. try await task.value
  338. // Client is stopped, should throw an error.
  339. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  340. try await client.run()
  341. } errorHandler: { error in
  342. XCTAssertEqual(error.code, .clientIsStopped)
  343. }
  344. }
  345. func testRunAlreadyRunningClient() async throws {
  346. let (_, clientTransport) = InProcessTransport.makePair()
  347. let client = GRPCClient(transport: clientTransport)
  348. // Run the client.
  349. let task = Task { try await client.run() }
  350. // Make sure the client is run for the first time here.
  351. try await Task.sleep(for: .milliseconds(10))
  352. // Client is already running, should throw an error.
  353. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  354. try await client.run()
  355. } errorHandler: { error in
  356. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  357. }
  358. task.cancel()
  359. }
  360. }