GRPCClientTests.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCInProcessTransport
  18. import Testing
  19. import XCTest
  20. final class GRPCClientTests: XCTestCase {
  21. func withInProcessConnectedClient(
  22. services: [any RegistrableRPCService],
  23. interceptorPipeline: [ClientInterceptorPipelineOperation] = [],
  24. _ body: (GRPCClient, GRPCServer) async throws -> Void
  25. ) async throws {
  26. let inProcess = InProcessTransport()
  27. _ = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
  28. _ = GRPCServer(transport: inProcess.server, services: services)
  29. try await withGRPCServer(
  30. transport: inProcess.server,
  31. services: services
  32. ) { server in
  33. try await withGRPCClient(
  34. transport: inProcess.client,
  35. interceptorPipeline: interceptorPipeline
  36. ) { client in
  37. try await Task.sleep(for: .milliseconds(100))
  38. try await body(client, server)
  39. }
  40. }
  41. }
  42. struct IdentitySerializer: MessageSerializer {
  43. typealias Message = [UInt8]
  44. func serialize(_ message: [UInt8]) throws -> [UInt8] {
  45. return message
  46. }
  47. }
  48. struct IdentityDeserializer: MessageDeserializer {
  49. typealias Message = [UInt8]
  50. func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] {
  51. return serializedMessageBytes
  52. }
  53. }
  54. func testUnary() async throws {
  55. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  56. try await client.unary(
  57. request: .init(message: [3, 1, 4, 1, 5]),
  58. descriptor: BinaryEcho.Methods.collect,
  59. serializer: IdentitySerializer(),
  60. deserializer: IdentityDeserializer(),
  61. options: .defaults
  62. ) { response in
  63. let message = try response.message
  64. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  65. }
  66. }
  67. }
  68. func testClientStreaming() async throws {
  69. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  70. try await client.clientStreaming(
  71. request: .init(producer: { writer in
  72. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  73. try await writer.write([byte])
  74. }
  75. }),
  76. descriptor: BinaryEcho.Methods.collect,
  77. serializer: IdentitySerializer(),
  78. deserializer: IdentityDeserializer(),
  79. options: .defaults
  80. ) { response in
  81. let message = try response.message
  82. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  83. }
  84. }
  85. }
  86. func testServerStreaming() async throws {
  87. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  88. try await client.serverStreaming(
  89. request: .init(message: [3, 1, 4, 1, 5]),
  90. descriptor: BinaryEcho.Methods.expand,
  91. serializer: IdentitySerializer(),
  92. deserializer: IdentityDeserializer(),
  93. options: .defaults
  94. ) { response in
  95. var responseParts = response.messages.makeAsyncIterator()
  96. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  97. let message = try await responseParts.next()
  98. XCTAssertEqual(message, [byte])
  99. }
  100. }
  101. }
  102. }
  103. func testBidirectionalStreaming() async throws {
  104. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  105. try await client.bidirectionalStreaming(
  106. request: .init(producer: { writer in
  107. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  108. try await writer.write([byte])
  109. }
  110. }),
  111. descriptor: BinaryEcho.Methods.update,
  112. serializer: IdentitySerializer(),
  113. deserializer: IdentityDeserializer(),
  114. options: .defaults
  115. ) { response in
  116. var responseParts = response.messages.makeAsyncIterator()
  117. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  118. let message = try await responseParts.next()
  119. XCTAssertEqual(message, [byte])
  120. }
  121. }
  122. }
  123. }
  124. func testUnimplementedMethod_Unary() async throws {
  125. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  126. try await client.unary(
  127. request: .init(message: [3, 1, 4, 1, 5]),
  128. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  129. serializer: IdentitySerializer(),
  130. deserializer: IdentityDeserializer(),
  131. options: .defaults
  132. ) { response in
  133. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  134. XCTAssertEqual(error.code, .unimplemented)
  135. }
  136. }
  137. }
  138. }
  139. func testUnimplementedMethod_ClientStreaming() async throws {
  140. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  141. try await client.clientStreaming(
  142. request: .init(producer: { writer in
  143. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  144. try await writer.write([byte])
  145. }
  146. }),
  147. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  148. serializer: IdentitySerializer(),
  149. deserializer: IdentityDeserializer(),
  150. options: .defaults
  151. ) { response in
  152. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  153. XCTAssertEqual(error.code, .unimplemented)
  154. }
  155. }
  156. }
  157. }
  158. func testUnimplementedMethod_ServerStreaming() async throws {
  159. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  160. try await client.serverStreaming(
  161. request: .init(message: [3, 1, 4, 1, 5]),
  162. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  163. serializer: IdentitySerializer(),
  164. deserializer: IdentityDeserializer(),
  165. options: .defaults
  166. ) { response in
  167. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  168. XCTAssertEqual(error.code, .unimplemented)
  169. }
  170. }
  171. }
  172. }
  173. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  174. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  175. try await client.bidirectionalStreaming(
  176. request: .init(producer: { writer in
  177. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  178. try await writer.write([byte])
  179. }
  180. }),
  181. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  182. serializer: IdentitySerializer(),
  183. deserializer: IdentityDeserializer(),
  184. options: .defaults
  185. ) { response in
  186. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  187. XCTAssertEqual(error.code, .unimplemented)
  188. }
  189. }
  190. }
  191. }
  192. func testMultipleConcurrentRequests() async throws {
  193. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  194. await withThrowingTaskGroup(of: Void.self) { group in
  195. for i in UInt8.min ..< UInt8.max {
  196. group.addTask {
  197. try await client.unary(
  198. request: .init(message: [i]),
  199. descriptor: BinaryEcho.Methods.collect,
  200. serializer: IdentitySerializer(),
  201. deserializer: IdentityDeserializer(),
  202. options: .defaults
  203. ) { response in
  204. let message = try response.message
  205. XCTAssertEqual(message, [i])
  206. }
  207. }
  208. }
  209. }
  210. }
  211. }
  212. func testInterceptorsAreAppliedInOrder() async throws {
  213. let counter1 = AtomicCounter()
  214. let counter2 = AtomicCounter()
  215. try await self.withInProcessConnectedClient(
  216. services: [BinaryEcho()],
  217. interceptorPipeline: [
  218. .apply(.requestCounter(counter1), to: .all),
  219. .apply(.rejectAll(with: RPCError(code: .unavailable, message: "")), to: .all),
  220. .apply(.requestCounter(counter2), to: .all),
  221. ]
  222. ) { client, _ in
  223. try await client.unary(
  224. request: .init(message: [3, 1, 4, 1, 5]),
  225. descriptor: BinaryEcho.Methods.collect,
  226. serializer: IdentitySerializer(),
  227. deserializer: IdentityDeserializer(),
  228. options: .defaults
  229. ) { response in
  230. XCTAssertRejected(response) { error in
  231. XCTAssertEqual(error.code, .unavailable)
  232. }
  233. }
  234. }
  235. XCTAssertEqual(counter1.value, 1)
  236. XCTAssertEqual(counter2.value, 0)
  237. }
  238. func testNoNewRPCsAfterClientClose() async throws {
  239. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  240. // Run an RPC so we know the client is running properly.
  241. try await client.unary(
  242. request: .init(message: [3, 1, 4, 1, 5]),
  243. descriptor: BinaryEcho.Methods.collect,
  244. serializer: IdentitySerializer(),
  245. deserializer: IdentityDeserializer(),
  246. options: .defaults
  247. ) { response in
  248. let message = try response.message
  249. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  250. }
  251. // New RPCs should fail immediately after this.
  252. client.beginGracefulShutdown()
  253. // RPC should fail now.
  254. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  255. try await client.unary(
  256. request: .init(message: [3, 1, 4, 1, 5]),
  257. descriptor: BinaryEcho.Methods.collect,
  258. serializer: IdentitySerializer(),
  259. deserializer: IdentityDeserializer(),
  260. options: .defaults
  261. ) { _ in }
  262. } errorHandler: { error in
  263. XCTAssertEqual(error.code, .clientIsStopped)
  264. }
  265. }
  266. }
  267. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  268. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  269. try await client.clientStreaming(
  270. request: .init(producer: { writer in
  271. // Close the client once this RCP has been started.
  272. client.beginGracefulShutdown()
  273. // Attempts to start a new RPC should fail.
  274. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  275. try await client.unary(
  276. request: .init(message: [3, 1, 4, 1, 5]),
  277. descriptor: BinaryEcho.Methods.collect,
  278. serializer: IdentitySerializer(),
  279. deserializer: IdentityDeserializer(),
  280. options: .defaults
  281. ) { _ in }
  282. } errorHandler: { error in
  283. XCTAssertEqual(error.code, .clientIsStopped)
  284. }
  285. // Now write to the already opened stream to confirm that opened streams
  286. // can successfully run to completion.
  287. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  288. try await writer.write([byte])
  289. }
  290. }),
  291. descriptor: BinaryEcho.Methods.collect,
  292. serializer: IdentitySerializer(),
  293. deserializer: IdentityDeserializer(),
  294. options: .defaults
  295. ) { response in
  296. let message = try response.message
  297. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  298. }
  299. }
  300. }
  301. func testCancelRunningClient() async throws {
  302. let inProcess = InProcessTransport()
  303. let client = GRPCClient(transport: inProcess.client)
  304. try await withThrowingTaskGroup(of: Void.self) { group in
  305. group.addTask {
  306. let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()])
  307. try await server.serve()
  308. }
  309. group.addTask {
  310. try await client.run()
  311. }
  312. // Wait for client and server to be running.
  313. try await client.unary(
  314. request: .init(message: [3, 1, 4, 1, 5]),
  315. descriptor: BinaryEcho.Methods.collect,
  316. serializer: IdentitySerializer(),
  317. deserializer: IdentityDeserializer(),
  318. options: .defaults
  319. ) { response in
  320. let message = try response.message
  321. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  322. }
  323. let task = Task {
  324. try await client.clientStreaming(
  325. request: StreamingClientRequest { writer in
  326. try await Task.sleep(for: .seconds(5))
  327. },
  328. descriptor: BinaryEcho.Methods.collect,
  329. serializer: IdentitySerializer(),
  330. deserializer: IdentityDeserializer(),
  331. options: .defaults
  332. ) { response in
  333. XCTAssertRejected(response) { error in
  334. XCTAssertEqual(error.code, .unknown)
  335. }
  336. }
  337. }
  338. task.cancel()
  339. try await task.value
  340. group.cancelAll()
  341. }
  342. }
  343. func testRunStoppedClient() async throws {
  344. let inProcess = InProcessTransport()
  345. let client = GRPCClient(transport: inProcess.client)
  346. // Run the client.
  347. let task = Task { try await client.run() }
  348. task.cancel()
  349. try await task.value
  350. // Client is stopped, should throw an error.
  351. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  352. try await client.run()
  353. } errorHandler: { error in
  354. XCTAssertEqual(error.code, .clientIsStopped)
  355. }
  356. }
  357. func testRunAlreadyRunningClient() async throws {
  358. let inProcess = InProcessTransport()
  359. let client = GRPCClient(transport: inProcess.client)
  360. // Run the client.
  361. let task = Task { try await client.run() }
  362. // Make sure the client is run for the first time here.
  363. try await Task.sleep(for: .milliseconds(10))
  364. // Client is already running, should throw an error.
  365. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  366. try await client.run()
  367. } errorHandler: { error in
  368. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  369. }
  370. task.cancel()
  371. }
  372. }
  373. @Suite("GRPC Client Tests")
  374. struct ClientTests {
  375. @Test("Interceptors are applied only to specified services")
  376. func testInterceptorsAreAppliedToSpecifiedServices() async throws {
  377. let onlyBinaryEchoCounter = AtomicCounter()
  378. let allServicesCounter = AtomicCounter()
  379. let onlyHelloWorldCounter = AtomicCounter()
  380. let bothServicesCounter = AtomicCounter()
  381. try await self.withInProcessConnectedClient(
  382. services: [BinaryEcho(), HelloWorld()],
  383. interceptorPipeline: [
  384. .apply(
  385. .requestCounter(onlyBinaryEchoCounter),
  386. to: .services([BinaryEcho.serviceDescriptor])
  387. ),
  388. .apply(.requestCounter(allServicesCounter), to: .all),
  389. .apply(
  390. .requestCounter(onlyHelloWorldCounter),
  391. to: .services([HelloWorld.serviceDescriptor])
  392. ),
  393. .apply(
  394. .requestCounter(bothServicesCounter),
  395. to: .services([BinaryEcho.serviceDescriptor, HelloWorld.serviceDescriptor])
  396. ),
  397. ]
  398. ) { client, _ in
  399. // Make a request to the `BinaryEcho` service and assert that only
  400. // the counters associated to interceptors that apply to it are incremented.
  401. try await client.unary(
  402. request: .init(message: Array("hello".utf8)),
  403. descriptor: BinaryEcho.Methods.get,
  404. serializer: IdentitySerializer(),
  405. deserializer: IdentityDeserializer(),
  406. options: .defaults
  407. ) { response in
  408. let message = try #require(try response.message)
  409. #expect(message == Array("hello".utf8))
  410. }
  411. #expect(onlyBinaryEchoCounter.value == 1)
  412. #expect(allServicesCounter.value == 1)
  413. #expect(onlyHelloWorldCounter.value == 0)
  414. #expect(bothServicesCounter.value == 1)
  415. // Now, make a request to the `HelloWorld` service and assert that only
  416. // the counters associated to interceptors that apply to it are incremented.
  417. try await client.unary(
  418. request: .init(message: Array("Swift".utf8)),
  419. descriptor: HelloWorld.Methods.sayHello,
  420. serializer: IdentitySerializer(),
  421. deserializer: IdentityDeserializer(),
  422. options: .defaults
  423. ) { response in
  424. let message = try #require(try response.message)
  425. #expect(message == Array("Hello, Swift!".utf8))
  426. }
  427. #expect(onlyBinaryEchoCounter.value == 1)
  428. #expect(allServicesCounter.value == 2)
  429. #expect(onlyHelloWorldCounter.value == 1)
  430. #expect(bothServicesCounter.value == 2)
  431. }
  432. }
  433. @Test("Interceptors are applied only to specified methods")
  434. func testInterceptorsAreAppliedToSpecifiedMethods() async throws {
  435. let onlyBinaryEchoGetCounter = AtomicCounter()
  436. let onlyBinaryEchoCollectCounter = AtomicCounter()
  437. let bothBinaryEchoMethodsCounter = AtomicCounter()
  438. let allMethodsCounter = AtomicCounter()
  439. try await self.withInProcessConnectedClient(
  440. services: [BinaryEcho()],
  441. interceptorPipeline: [
  442. .apply(
  443. .requestCounter(onlyBinaryEchoGetCounter),
  444. to: .methods([BinaryEcho.Methods.get])
  445. ),
  446. .apply(.requestCounter(allMethodsCounter), to: .all),
  447. .apply(
  448. .requestCounter(onlyBinaryEchoCollectCounter),
  449. to: .methods([BinaryEcho.Methods.collect])
  450. ),
  451. .apply(
  452. .requestCounter(bothBinaryEchoMethodsCounter),
  453. to: .methods([BinaryEcho.Methods.get, BinaryEcho.Methods.collect])
  454. ),
  455. ]
  456. ) { client, _ in
  457. // Make a request to the `BinaryEcho/get` method and assert that only
  458. // the counters associated to interceptors that apply to it are incremented.
  459. try await client.unary(
  460. request: .init(message: Array("hello".utf8)),
  461. descriptor: BinaryEcho.Methods.get,
  462. serializer: IdentitySerializer(),
  463. deserializer: IdentityDeserializer(),
  464. options: .defaults
  465. ) { response in
  466. let message = try #require(try response.message)
  467. #expect(message == Array("hello".utf8))
  468. }
  469. #expect(onlyBinaryEchoGetCounter.value == 1)
  470. #expect(allMethodsCounter.value == 1)
  471. #expect(onlyBinaryEchoCollectCounter.value == 0)
  472. #expect(bothBinaryEchoMethodsCounter.value == 1)
  473. // Now, make a request to the `BinaryEcho/collect` method and assert that only
  474. // the counters associated to interceptors that apply to it are incremented.
  475. try await client.unary(
  476. request: .init(message: Array("hello".utf8)),
  477. descriptor: BinaryEcho.Methods.collect,
  478. serializer: IdentitySerializer(),
  479. deserializer: IdentityDeserializer(),
  480. options: .defaults
  481. ) { response in
  482. let message = try #require(try response.message)
  483. #expect(message == Array("hello".utf8))
  484. }
  485. #expect(onlyBinaryEchoGetCounter.value == 1)
  486. #expect(allMethodsCounter.value == 2)
  487. #expect(onlyBinaryEchoCollectCounter.value == 1)
  488. #expect(bothBinaryEchoMethodsCounter.value == 2)
  489. }
  490. }
  491. func withInProcessConnectedClient(
  492. services: [any RegistrableRPCService],
  493. interceptorPipeline: [ClientInterceptorPipelineOperation] = [],
  494. _ body: (GRPCClient, GRPCServer) async throws -> Void
  495. ) async throws {
  496. let inProcess = InProcessTransport()
  497. let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
  498. let server = GRPCServer(transport: inProcess.server, services: services)
  499. try await withThrowingTaskGroup(of: Void.self) { group in
  500. group.addTask {
  501. try await server.serve()
  502. }
  503. group.addTask {
  504. try await client.run()
  505. }
  506. // Make sure both server and client are running
  507. try await Task.sleep(for: .milliseconds(100))
  508. try await body(client, server)
  509. client.beginGracefulShutdown()
  510. server.beginGracefulShutdown()
  511. }
  512. }
  513. }