GRPCClientTests.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCInProcessTransport
  18. import Testing
  19. import XCTest
  20. @available(gRPCSwift 2.0, *)
  21. final class GRPCClientTests: XCTestCase {
  22. func withInProcessConnectedClient(
  23. services: [any RegistrableRPCService],
  24. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>] = [],
  25. _ body: (
  26. GRPCClient<InProcessTransport.Client>,
  27. GRPCServer<InProcessTransport.Server>
  28. ) async throws -> Void
  29. ) async throws {
  30. let inProcess = InProcessTransport()
  31. _ = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
  32. _ = GRPCServer(transport: inProcess.server, services: services)
  33. try await withGRPCServer(
  34. transport: inProcess.server,
  35. services: services
  36. ) { server in
  37. try await withGRPCClient(
  38. transport: inProcess.client,
  39. interceptorPipeline: interceptorPipeline
  40. ) { client in
  41. try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
  42. try await body(client, server)
  43. }
  44. }
  45. }
  46. func testUnary() async throws {
  47. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  48. try await client.unary(
  49. request: .init(message: [3, 1, 4, 1, 5]),
  50. descriptor: BinaryEcho.Methods.collect,
  51. serializer: IdentitySerializer(),
  52. deserializer: IdentityDeserializer(),
  53. options: .defaults
  54. ) { response in
  55. let message = try response.message
  56. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  57. }
  58. }
  59. }
  60. func testClientStreaming() async throws {
  61. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  62. try await client.clientStreaming(
  63. request: .init(producer: { writer in
  64. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  65. try await writer.write([byte])
  66. }
  67. }),
  68. descriptor: BinaryEcho.Methods.collect,
  69. serializer: IdentitySerializer(),
  70. deserializer: IdentityDeserializer(),
  71. options: .defaults
  72. ) { response in
  73. let message = try response.message
  74. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  75. }
  76. }
  77. }
  78. func testServerStreaming() async throws {
  79. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  80. try await client.serverStreaming(
  81. request: .init(message: [3, 1, 4, 1, 5]),
  82. descriptor: BinaryEcho.Methods.expand,
  83. serializer: IdentitySerializer(),
  84. deserializer: IdentityDeserializer(),
  85. options: .defaults
  86. ) { response in
  87. var responseParts = response.messages.makeAsyncIterator()
  88. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  89. let message = try await responseParts.next()
  90. XCTAssertEqual(message, [byte])
  91. }
  92. }
  93. }
  94. }
  95. func testBidirectionalStreaming() async throws {
  96. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  97. try await client.bidirectionalStreaming(
  98. request: .init(producer: { writer in
  99. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  100. try await writer.write([byte])
  101. }
  102. }),
  103. descriptor: BinaryEcho.Methods.update,
  104. serializer: IdentitySerializer(),
  105. deserializer: IdentityDeserializer(),
  106. options: .defaults
  107. ) { response in
  108. var responseParts = response.messages.makeAsyncIterator()
  109. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  110. let message = try await responseParts.next()
  111. XCTAssertEqual(message, [byte])
  112. }
  113. }
  114. }
  115. }
  116. func testUnimplementedMethod_Unary() async throws {
  117. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  118. try await client.unary(
  119. request: .init(message: [3, 1, 4, 1, 5]),
  120. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  121. serializer: IdentitySerializer(),
  122. deserializer: IdentityDeserializer(),
  123. options: .defaults
  124. ) { response in
  125. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  126. XCTAssertEqual(error.code, .unimplemented)
  127. }
  128. }
  129. }
  130. }
  131. func testUnimplementedMethod_ClientStreaming() async throws {
  132. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  133. try await client.clientStreaming(
  134. request: .init(producer: { writer in
  135. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  136. try await writer.write([byte])
  137. }
  138. }),
  139. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  140. serializer: IdentitySerializer(),
  141. deserializer: IdentityDeserializer(),
  142. options: .defaults
  143. ) { response in
  144. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  145. XCTAssertEqual(error.code, .unimplemented)
  146. }
  147. }
  148. }
  149. }
  150. func testUnimplementedMethod_ServerStreaming() async throws {
  151. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  152. try await client.serverStreaming(
  153. request: .init(message: [3, 1, 4, 1, 5]),
  154. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  155. serializer: IdentitySerializer(),
  156. deserializer: IdentityDeserializer(),
  157. options: .defaults
  158. ) { response in
  159. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  160. XCTAssertEqual(error.code, .unimplemented)
  161. }
  162. }
  163. }
  164. }
  165. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  166. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  167. try await client.bidirectionalStreaming(
  168. request: .init(producer: { writer in
  169. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  170. try await writer.write([byte])
  171. }
  172. }),
  173. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  174. serializer: IdentitySerializer(),
  175. deserializer: IdentityDeserializer(),
  176. options: .defaults
  177. ) { response in
  178. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  179. XCTAssertEqual(error.code, .unimplemented)
  180. }
  181. }
  182. }
  183. }
  184. func testMultipleConcurrentRequests() async throws {
  185. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  186. await withThrowingTaskGroup(of: Void.self) { group in
  187. for i in UInt8.min ..< UInt8.max {
  188. group.addTask {
  189. try await client.unary(
  190. request: .init(message: [i]),
  191. descriptor: BinaryEcho.Methods.collect,
  192. serializer: IdentitySerializer(),
  193. deserializer: IdentityDeserializer(),
  194. options: .defaults
  195. ) { response in
  196. let message = try response.message
  197. XCTAssertEqual(message, [i])
  198. }
  199. }
  200. }
  201. }
  202. }
  203. }
  204. func testInterceptorsAreAppliedInOrder() async throws {
  205. let counter1 = AtomicCounter()
  206. let counter2 = AtomicCounter()
  207. try await self.withInProcessConnectedClient(
  208. services: [BinaryEcho()],
  209. interceptorPipeline: [
  210. .apply(.requestCounter(counter1), to: .all),
  211. .apply(.rejectAll(with: RPCError(code: .unavailable, message: "")), to: .all),
  212. .apply(.requestCounter(counter2), to: .all),
  213. ]
  214. ) { client, _ in
  215. try await client.unary(
  216. request: .init(message: [3, 1, 4, 1, 5]),
  217. descriptor: BinaryEcho.Methods.collect,
  218. serializer: IdentitySerializer(),
  219. deserializer: IdentityDeserializer(),
  220. options: .defaults
  221. ) { response in
  222. XCTAssertRejected(response) { error in
  223. XCTAssertEqual(error.code, .unavailable)
  224. }
  225. }
  226. }
  227. XCTAssertEqual(counter1.value, 1)
  228. XCTAssertEqual(counter2.value, 0)
  229. }
  230. func testNoNewRPCsAfterClientClose() async throws {
  231. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  232. // Run an RPC so we know the client is running properly.
  233. try await client.unary(
  234. request: .init(message: [3, 1, 4, 1, 5]),
  235. descriptor: BinaryEcho.Methods.collect,
  236. serializer: IdentitySerializer(),
  237. deserializer: IdentityDeserializer(),
  238. options: .defaults
  239. ) { response in
  240. let message = try response.message
  241. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  242. }
  243. // New RPCs should fail immediately after this.
  244. client.beginGracefulShutdown()
  245. // RPC should fail now.
  246. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  247. try await client.unary(
  248. request: .init(message: [3, 1, 4, 1, 5]),
  249. descriptor: BinaryEcho.Methods.collect,
  250. serializer: IdentitySerializer(),
  251. deserializer: IdentityDeserializer(),
  252. options: .defaults
  253. ) { _ in }
  254. } errorHandler: { error in
  255. XCTAssertEqual(error.code, .clientIsStopped)
  256. }
  257. }
  258. }
  259. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  260. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  261. try await client.clientStreaming(
  262. request: .init(producer: { writer in
  263. // Close the client once this RCP has been started.
  264. client.beginGracefulShutdown()
  265. // Attempts to start a new RPC should fail.
  266. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  267. try await client.unary(
  268. request: .init(message: [3, 1, 4, 1, 5]),
  269. descriptor: BinaryEcho.Methods.collect,
  270. serializer: IdentitySerializer(),
  271. deserializer: IdentityDeserializer(),
  272. options: .defaults
  273. ) { _ in }
  274. } errorHandler: { error in
  275. XCTAssertEqual(error.code, .clientIsStopped)
  276. }
  277. // Now write to the already opened stream to confirm that opened streams
  278. // can successfully run to completion.
  279. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  280. try await writer.write([byte])
  281. }
  282. }),
  283. descriptor: BinaryEcho.Methods.collect,
  284. serializer: IdentitySerializer(),
  285. deserializer: IdentityDeserializer(),
  286. options: .defaults
  287. ) { response in
  288. let message = try response.message
  289. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  290. }
  291. }
  292. }
  293. func testCancelRunningClient() async throws {
  294. let inProcess = InProcessTransport()
  295. let client = GRPCClient(transport: inProcess.client)
  296. try await withThrowingTaskGroup(of: Void.self) { group in
  297. group.addTask {
  298. let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()])
  299. try await server.serve()
  300. }
  301. group.addTask {
  302. try await client.runConnections()
  303. }
  304. // Wait for client and server to be running.
  305. try await client.unary(
  306. request: .init(message: [3, 1, 4, 1, 5]),
  307. descriptor: BinaryEcho.Methods.collect,
  308. serializer: IdentitySerializer(),
  309. deserializer: IdentityDeserializer(),
  310. options: .defaults
  311. ) { response in
  312. let message = try response.message
  313. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  314. }
  315. let task = Task {
  316. try await client.clientStreaming(
  317. request: StreamingClientRequest { writer in
  318. try await Task.sleep(for: .seconds(5), tolerance: .zero)
  319. },
  320. descriptor: BinaryEcho.Methods.collect,
  321. serializer: IdentitySerializer(),
  322. deserializer: IdentityDeserializer(),
  323. options: .defaults
  324. ) { response in
  325. XCTAssertRejected(response) { error in
  326. XCTAssertEqual(error.code, .unknown)
  327. }
  328. }
  329. }
  330. task.cancel()
  331. try await task.value
  332. group.cancelAll()
  333. }
  334. }
  335. func testRunStoppedClient() async throws {
  336. let inProcess = InProcessTransport()
  337. let client = GRPCClient(transport: inProcess.client)
  338. // Run the client.
  339. let task = Task { try await client.runConnections() }
  340. task.cancel()
  341. try await task.value
  342. // Client is stopped, should throw an error.
  343. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  344. try await client.runConnections()
  345. } errorHandler: { error in
  346. XCTAssertEqual(error.code, .clientIsStopped)
  347. }
  348. }
  349. func testRunAlreadyRunningClient() async throws {
  350. let inProcess = InProcessTransport()
  351. let client = GRPCClient(transport: inProcess.client)
  352. // Run the client.
  353. let task = Task { try await client.runConnections() }
  354. // Make sure the client is run for the first time here.
  355. try await Task.sleep(for: .milliseconds(10), tolerance: .zero)
  356. // Client is already running, should throw an error.
  357. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  358. try await client.runConnections()
  359. } errorHandler: { error in
  360. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  361. }
  362. task.cancel()
  363. }
  364. }
  365. @Suite("GRPC Client Tests")
  366. struct ClientTests {
  367. @Test("Interceptors are applied only to specified services")
  368. @available(gRPCSwift 2.0, *)
  369. func testInterceptorsAreAppliedToSpecifiedServices() async throws {
  370. let onlyBinaryEchoCounter = AtomicCounter()
  371. let allServicesCounter = AtomicCounter()
  372. let onlyHelloWorldCounter = AtomicCounter()
  373. let bothServicesCounter = AtomicCounter()
  374. try await self.withInProcessConnectedClient(
  375. services: [BinaryEcho(), HelloWorld()],
  376. interceptorPipeline: [
  377. .apply(
  378. .requestCounter(onlyBinaryEchoCounter),
  379. to: .services([BinaryEcho.serviceDescriptor])
  380. ),
  381. .apply(.requestCounter(allServicesCounter), to: .all),
  382. .apply(
  383. .requestCounter(onlyHelloWorldCounter),
  384. to: .services([HelloWorld.serviceDescriptor])
  385. ),
  386. .apply(
  387. .requestCounter(bothServicesCounter),
  388. to: .services([BinaryEcho.serviceDescriptor, HelloWorld.serviceDescriptor])
  389. ),
  390. ]
  391. ) { client, _ in
  392. // Make a request to the `BinaryEcho` service and assert that only
  393. // the counters associated to interceptors that apply to it are incremented.
  394. try await client.unary(
  395. request: .init(message: Array("hello".utf8)),
  396. descriptor: BinaryEcho.Methods.get,
  397. serializer: IdentitySerializer(),
  398. deserializer: IdentityDeserializer(),
  399. options: .defaults
  400. ) { response in
  401. let message = try response.message
  402. #expect(message == Array("hello".utf8))
  403. }
  404. #expect(onlyBinaryEchoCounter.value == 1)
  405. #expect(allServicesCounter.value == 1)
  406. #expect(onlyHelloWorldCounter.value == 0)
  407. #expect(bothServicesCounter.value == 1)
  408. // Now, make a request to the `HelloWorld` service and assert that only
  409. // the counters associated to interceptors that apply to it are incremented.
  410. try await client.unary(
  411. request: .init(message: Array("Swift".utf8)),
  412. descriptor: HelloWorld.Methods.sayHello,
  413. serializer: IdentitySerializer(),
  414. deserializer: IdentityDeserializer(),
  415. options: .defaults
  416. ) { response in
  417. let message = try response.message
  418. #expect(message == Array("Hello, Swift!".utf8))
  419. }
  420. #expect(onlyBinaryEchoCounter.value == 1)
  421. #expect(allServicesCounter.value == 2)
  422. #expect(onlyHelloWorldCounter.value == 1)
  423. #expect(bothServicesCounter.value == 2)
  424. }
  425. }
  426. @Test("Interceptors are applied only to specified methods")
  427. @available(gRPCSwift 2.0, *)
  428. func testInterceptorsAreAppliedToSpecifiedMethods() async throws {
  429. let onlyBinaryEchoGetCounter = AtomicCounter()
  430. let onlyBinaryEchoCollectCounter = AtomicCounter()
  431. let bothBinaryEchoMethodsCounter = AtomicCounter()
  432. let allMethodsCounter = AtomicCounter()
  433. try await self.withInProcessConnectedClient(
  434. services: [BinaryEcho()],
  435. interceptorPipeline: [
  436. .apply(
  437. .requestCounter(onlyBinaryEchoGetCounter),
  438. to: .methods([BinaryEcho.Methods.get])
  439. ),
  440. .apply(.requestCounter(allMethodsCounter), to: .all),
  441. .apply(
  442. .requestCounter(onlyBinaryEchoCollectCounter),
  443. to: .methods([BinaryEcho.Methods.collect])
  444. ),
  445. .apply(
  446. .requestCounter(bothBinaryEchoMethodsCounter),
  447. to: .methods([BinaryEcho.Methods.get, BinaryEcho.Methods.collect])
  448. ),
  449. ]
  450. ) { client, _ in
  451. // Make a request to the `BinaryEcho/get` method and assert that only
  452. // the counters associated to interceptors that apply to it are incremented.
  453. try await client.unary(
  454. request: .init(message: Array("hello".utf8)),
  455. descriptor: BinaryEcho.Methods.get,
  456. serializer: IdentitySerializer(),
  457. deserializer: IdentityDeserializer(),
  458. options: .defaults
  459. ) { response in
  460. let message = try response.message
  461. #expect(message == Array("hello".utf8))
  462. }
  463. #expect(onlyBinaryEchoGetCounter.value == 1)
  464. #expect(allMethodsCounter.value == 1)
  465. #expect(onlyBinaryEchoCollectCounter.value == 0)
  466. #expect(bothBinaryEchoMethodsCounter.value == 1)
  467. // Now, make a request to the `BinaryEcho/collect` method and assert that only
  468. // the counters associated to interceptors that apply to it are incremented.
  469. try await client.unary(
  470. request: .init(message: Array("hello".utf8)),
  471. descriptor: BinaryEcho.Methods.collect,
  472. serializer: IdentitySerializer(),
  473. deserializer: IdentityDeserializer(),
  474. options: .defaults
  475. ) { response in
  476. let message = try response.message
  477. #expect(message == Array("hello".utf8))
  478. }
  479. #expect(onlyBinaryEchoGetCounter.value == 1)
  480. #expect(allMethodsCounter.value == 2)
  481. #expect(onlyBinaryEchoCollectCounter.value == 1)
  482. #expect(bothBinaryEchoMethodsCounter.value == 2)
  483. }
  484. }
  485. @available(gRPCSwift 2.0, *)
  486. func withInProcessConnectedClient(
  487. services: [any RegistrableRPCService],
  488. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>] = [],
  489. _ body: (
  490. GRPCClient<InProcessTransport.Client>,
  491. GRPCServer<InProcessTransport.Server>
  492. ) async throws -> Void
  493. ) async throws {
  494. let inProcess = InProcessTransport()
  495. let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
  496. let server = GRPCServer(transport: inProcess.server, services: services)
  497. try await withThrowingTaskGroup(of: Void.self) { group in
  498. group.addTask {
  499. try await server.serve()
  500. }
  501. group.addTask {
  502. try await client.runConnections()
  503. }
  504. // Make sure both server and client are running
  505. try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
  506. try await body(client, server)
  507. client.beginGracefulShutdown()
  508. server.beginGracefulShutdown()
  509. }
  510. }
  511. }