2
0

GRPCClientTests.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCInProcessTransport
  18. import Testing
  19. import XCTest
  20. final class GRPCClientTests: XCTestCase {
  21. func withInProcessConnectedClient(
  22. services: [any RegistrableRPCService],
  23. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>] = [],
  24. _ body: (
  25. GRPCClient<InProcessTransport.Client>,
  26. GRPCServer<InProcessTransport.Server>
  27. ) async throws -> Void
  28. ) async throws {
  29. let inProcess = InProcessTransport()
  30. _ = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
  31. _ = GRPCServer(transport: inProcess.server, services: services)
  32. try await withGRPCServer(
  33. transport: inProcess.server,
  34. services: services
  35. ) { server in
  36. try await withGRPCClient(
  37. transport: inProcess.client,
  38. interceptorPipeline: interceptorPipeline
  39. ) { client in
  40. try await Task.sleep(for: .milliseconds(100))
  41. try await body(client, server)
  42. }
  43. }
  44. }
  45. func testUnary() async throws {
  46. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  47. try await client.unary(
  48. request: .init(message: [3, 1, 4, 1, 5]),
  49. descriptor: BinaryEcho.Methods.collect,
  50. serializer: IdentitySerializer(),
  51. deserializer: IdentityDeserializer(),
  52. options: .defaults
  53. ) { response in
  54. let message = try response.message
  55. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  56. }
  57. }
  58. }
  59. func testClientStreaming() async throws {
  60. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  61. try await client.clientStreaming(
  62. request: .init(producer: { writer in
  63. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  64. try await writer.write([byte])
  65. }
  66. }),
  67. descriptor: BinaryEcho.Methods.collect,
  68. serializer: IdentitySerializer(),
  69. deserializer: IdentityDeserializer(),
  70. options: .defaults
  71. ) { response in
  72. let message = try response.message
  73. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  74. }
  75. }
  76. }
  77. func testServerStreaming() async throws {
  78. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  79. try await client.serverStreaming(
  80. request: .init(message: [3, 1, 4, 1, 5]),
  81. descriptor: BinaryEcho.Methods.expand,
  82. serializer: IdentitySerializer(),
  83. deserializer: IdentityDeserializer(),
  84. options: .defaults
  85. ) { response in
  86. var responseParts = response.messages.makeAsyncIterator()
  87. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  88. let message = try await responseParts.next()
  89. XCTAssertEqual(message, [byte])
  90. }
  91. }
  92. }
  93. }
  94. func testBidirectionalStreaming() async throws {
  95. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  96. try await client.bidirectionalStreaming(
  97. request: .init(producer: { writer in
  98. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  99. try await writer.write([byte])
  100. }
  101. }),
  102. descriptor: BinaryEcho.Methods.update,
  103. serializer: IdentitySerializer(),
  104. deserializer: IdentityDeserializer(),
  105. options: .defaults
  106. ) { response in
  107. var responseParts = response.messages.makeAsyncIterator()
  108. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  109. let message = try await responseParts.next()
  110. XCTAssertEqual(message, [byte])
  111. }
  112. }
  113. }
  114. }
  115. func testUnimplementedMethod_Unary() async throws {
  116. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  117. try await client.unary(
  118. request: .init(message: [3, 1, 4, 1, 5]),
  119. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  120. serializer: IdentitySerializer(),
  121. deserializer: IdentityDeserializer(),
  122. options: .defaults
  123. ) { response in
  124. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  125. XCTAssertEqual(error.code, .unimplemented)
  126. }
  127. }
  128. }
  129. }
  130. func testUnimplementedMethod_ClientStreaming() async throws {
  131. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  132. try await client.clientStreaming(
  133. request: .init(producer: { writer in
  134. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  135. try await writer.write([byte])
  136. }
  137. }),
  138. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  139. serializer: IdentitySerializer(),
  140. deserializer: IdentityDeserializer(),
  141. options: .defaults
  142. ) { response in
  143. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  144. XCTAssertEqual(error.code, .unimplemented)
  145. }
  146. }
  147. }
  148. }
  149. func testUnimplementedMethod_ServerStreaming() async throws {
  150. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  151. try await client.serverStreaming(
  152. request: .init(message: [3, 1, 4, 1, 5]),
  153. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  154. serializer: IdentitySerializer(),
  155. deserializer: IdentityDeserializer(),
  156. options: .defaults
  157. ) { response in
  158. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  159. XCTAssertEqual(error.code, .unimplemented)
  160. }
  161. }
  162. }
  163. }
  164. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  165. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  166. try await client.bidirectionalStreaming(
  167. request: .init(producer: { writer in
  168. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  169. try await writer.write([byte])
  170. }
  171. }),
  172. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  173. serializer: IdentitySerializer(),
  174. deserializer: IdentityDeserializer(),
  175. options: .defaults
  176. ) { response in
  177. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  178. XCTAssertEqual(error.code, .unimplemented)
  179. }
  180. }
  181. }
  182. }
  183. func testMultipleConcurrentRequests() async throws {
  184. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  185. await withThrowingTaskGroup(of: Void.self) { group in
  186. for i in UInt8.min ..< UInt8.max {
  187. group.addTask {
  188. try await client.unary(
  189. request: .init(message: [i]),
  190. descriptor: BinaryEcho.Methods.collect,
  191. serializer: IdentitySerializer(),
  192. deserializer: IdentityDeserializer(),
  193. options: .defaults
  194. ) { response in
  195. let message = try response.message
  196. XCTAssertEqual(message, [i])
  197. }
  198. }
  199. }
  200. }
  201. }
  202. }
  203. func testInterceptorsAreAppliedInOrder() async throws {
  204. let counter1 = AtomicCounter()
  205. let counter2 = AtomicCounter()
  206. try await self.withInProcessConnectedClient(
  207. services: [BinaryEcho()],
  208. interceptorPipeline: [
  209. .apply(.requestCounter(counter1), to: .all),
  210. .apply(.rejectAll(with: RPCError(code: .unavailable, message: "")), to: .all),
  211. .apply(.requestCounter(counter2), to: .all),
  212. ]
  213. ) { client, _ in
  214. try await client.unary(
  215. request: .init(message: [3, 1, 4, 1, 5]),
  216. descriptor: BinaryEcho.Methods.collect,
  217. serializer: IdentitySerializer(),
  218. deserializer: IdentityDeserializer(),
  219. options: .defaults
  220. ) { response in
  221. XCTAssertRejected(response) { error in
  222. XCTAssertEqual(error.code, .unavailable)
  223. }
  224. }
  225. }
  226. XCTAssertEqual(counter1.value, 1)
  227. XCTAssertEqual(counter2.value, 0)
  228. }
  229. func testNoNewRPCsAfterClientClose() async throws {
  230. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  231. // Run an RPC so we know the client is running properly.
  232. try await client.unary(
  233. request: .init(message: [3, 1, 4, 1, 5]),
  234. descriptor: BinaryEcho.Methods.collect,
  235. serializer: IdentitySerializer(),
  236. deserializer: IdentityDeserializer(),
  237. options: .defaults
  238. ) { response in
  239. let message = try response.message
  240. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  241. }
  242. // New RPCs should fail immediately after this.
  243. client.beginGracefulShutdown()
  244. // RPC should fail now.
  245. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  246. try await client.unary(
  247. request: .init(message: [3, 1, 4, 1, 5]),
  248. descriptor: BinaryEcho.Methods.collect,
  249. serializer: IdentitySerializer(),
  250. deserializer: IdentityDeserializer(),
  251. options: .defaults
  252. ) { _ in }
  253. } errorHandler: { error in
  254. XCTAssertEqual(error.code, .clientIsStopped)
  255. }
  256. }
  257. }
  258. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  259. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  260. try await client.clientStreaming(
  261. request: .init(producer: { writer in
  262. // Close the client once this RCP has been started.
  263. client.beginGracefulShutdown()
  264. // Attempts to start a new RPC should fail.
  265. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  266. try await client.unary(
  267. request: .init(message: [3, 1, 4, 1, 5]),
  268. descriptor: BinaryEcho.Methods.collect,
  269. serializer: IdentitySerializer(),
  270. deserializer: IdentityDeserializer(),
  271. options: .defaults
  272. ) { _ in }
  273. } errorHandler: { error in
  274. XCTAssertEqual(error.code, .clientIsStopped)
  275. }
  276. // Now write to the already opened stream to confirm that opened streams
  277. // can successfully run to completion.
  278. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  279. try await writer.write([byte])
  280. }
  281. }),
  282. descriptor: BinaryEcho.Methods.collect,
  283. serializer: IdentitySerializer(),
  284. deserializer: IdentityDeserializer(),
  285. options: .defaults
  286. ) { response in
  287. let message = try response.message
  288. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  289. }
  290. }
  291. }
  292. func testCancelRunningClient() async throws {
  293. let inProcess = InProcessTransport()
  294. let client = GRPCClient(transport: inProcess.client)
  295. try await withThrowingTaskGroup(of: Void.self) { group in
  296. group.addTask {
  297. let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()])
  298. try await server.serve()
  299. }
  300. group.addTask {
  301. try await client.runConnections()
  302. }
  303. // Wait for client and server to be running.
  304. try await client.unary(
  305. request: .init(message: [3, 1, 4, 1, 5]),
  306. descriptor: BinaryEcho.Methods.collect,
  307. serializer: IdentitySerializer(),
  308. deserializer: IdentityDeserializer(),
  309. options: .defaults
  310. ) { response in
  311. let message = try response.message
  312. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  313. }
  314. let task = Task {
  315. try await client.clientStreaming(
  316. request: StreamingClientRequest { writer in
  317. try await Task.sleep(for: .seconds(5))
  318. },
  319. descriptor: BinaryEcho.Methods.collect,
  320. serializer: IdentitySerializer(),
  321. deserializer: IdentityDeserializer(),
  322. options: .defaults
  323. ) { response in
  324. XCTAssertRejected(response) { error in
  325. XCTAssertEqual(error.code, .unknown)
  326. }
  327. }
  328. }
  329. task.cancel()
  330. try await task.value
  331. group.cancelAll()
  332. }
  333. }
  334. func testRunStoppedClient() async throws {
  335. let inProcess = InProcessTransport()
  336. let client = GRPCClient(transport: inProcess.client)
  337. // Run the client.
  338. let task = Task { try await client.runConnections() }
  339. task.cancel()
  340. try await task.value
  341. // Client is stopped, should throw an error.
  342. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  343. try await client.runConnections()
  344. } errorHandler: { error in
  345. XCTAssertEqual(error.code, .clientIsStopped)
  346. }
  347. }
  348. func testRunAlreadyRunningClient() async throws {
  349. let inProcess = InProcessTransport()
  350. let client = GRPCClient(transport: inProcess.client)
  351. // Run the client.
  352. let task = Task { try await client.runConnections() }
  353. // Make sure the client is run for the first time here.
  354. try await Task.sleep(for: .milliseconds(10))
  355. // Client is already running, should throw an error.
  356. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  357. try await client.runConnections()
  358. } errorHandler: { error in
  359. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  360. }
  361. task.cancel()
  362. }
  363. }
  364. @Suite("GRPC Client Tests")
  365. struct ClientTests {
  366. @Test("Interceptors are applied only to specified services")
  367. func testInterceptorsAreAppliedToSpecifiedServices() async throws {
  368. let onlyBinaryEchoCounter = AtomicCounter()
  369. let allServicesCounter = AtomicCounter()
  370. let onlyHelloWorldCounter = AtomicCounter()
  371. let bothServicesCounter = AtomicCounter()
  372. try await self.withInProcessConnectedClient(
  373. services: [BinaryEcho(), HelloWorld()],
  374. interceptorPipeline: [
  375. .apply(
  376. .requestCounter(onlyBinaryEchoCounter),
  377. to: .services([BinaryEcho.serviceDescriptor])
  378. ),
  379. .apply(.requestCounter(allServicesCounter), to: .all),
  380. .apply(
  381. .requestCounter(onlyHelloWorldCounter),
  382. to: .services([HelloWorld.serviceDescriptor])
  383. ),
  384. .apply(
  385. .requestCounter(bothServicesCounter),
  386. to: .services([BinaryEcho.serviceDescriptor, HelloWorld.serviceDescriptor])
  387. ),
  388. ]
  389. ) { client, _ in
  390. // Make a request to the `BinaryEcho` service and assert that only
  391. // the counters associated to interceptors that apply to it are incremented.
  392. try await client.unary(
  393. request: .init(message: Array("hello".utf8)),
  394. descriptor: BinaryEcho.Methods.get,
  395. serializer: IdentitySerializer(),
  396. deserializer: IdentityDeserializer(),
  397. options: .defaults
  398. ) { response in
  399. let message = try #require(try response.message)
  400. #expect(message == Array("hello".utf8))
  401. }
  402. #expect(onlyBinaryEchoCounter.value == 1)
  403. #expect(allServicesCounter.value == 1)
  404. #expect(onlyHelloWorldCounter.value == 0)
  405. #expect(bothServicesCounter.value == 1)
  406. // Now, make a request to the `HelloWorld` service and assert that only
  407. // the counters associated to interceptors that apply to it are incremented.
  408. try await client.unary(
  409. request: .init(message: Array("Swift".utf8)),
  410. descriptor: HelloWorld.Methods.sayHello,
  411. serializer: IdentitySerializer(),
  412. deserializer: IdentityDeserializer(),
  413. options: .defaults
  414. ) { response in
  415. let message = try #require(try response.message)
  416. #expect(message == Array("Hello, Swift!".utf8))
  417. }
  418. #expect(onlyBinaryEchoCounter.value == 1)
  419. #expect(allServicesCounter.value == 2)
  420. #expect(onlyHelloWorldCounter.value == 1)
  421. #expect(bothServicesCounter.value == 2)
  422. }
  423. }
  424. @Test("Interceptors are applied only to specified methods")
  425. func testInterceptorsAreAppliedToSpecifiedMethods() async throws {
  426. let onlyBinaryEchoGetCounter = AtomicCounter()
  427. let onlyBinaryEchoCollectCounter = AtomicCounter()
  428. let bothBinaryEchoMethodsCounter = AtomicCounter()
  429. let allMethodsCounter = AtomicCounter()
  430. try await self.withInProcessConnectedClient(
  431. services: [BinaryEcho()],
  432. interceptorPipeline: [
  433. .apply(
  434. .requestCounter(onlyBinaryEchoGetCounter),
  435. to: .methods([BinaryEcho.Methods.get])
  436. ),
  437. .apply(.requestCounter(allMethodsCounter), to: .all),
  438. .apply(
  439. .requestCounter(onlyBinaryEchoCollectCounter),
  440. to: .methods([BinaryEcho.Methods.collect])
  441. ),
  442. .apply(
  443. .requestCounter(bothBinaryEchoMethodsCounter),
  444. to: .methods([BinaryEcho.Methods.get, BinaryEcho.Methods.collect])
  445. ),
  446. ]
  447. ) { client, _ in
  448. // Make a request to the `BinaryEcho/get` method and assert that only
  449. // the counters associated to interceptors that apply to it are incremented.
  450. try await client.unary(
  451. request: .init(message: Array("hello".utf8)),
  452. descriptor: BinaryEcho.Methods.get,
  453. serializer: IdentitySerializer(),
  454. deserializer: IdentityDeserializer(),
  455. options: .defaults
  456. ) { response in
  457. let message = try #require(try response.message)
  458. #expect(message == Array("hello".utf8))
  459. }
  460. #expect(onlyBinaryEchoGetCounter.value == 1)
  461. #expect(allMethodsCounter.value == 1)
  462. #expect(onlyBinaryEchoCollectCounter.value == 0)
  463. #expect(bothBinaryEchoMethodsCounter.value == 1)
  464. // Now, make a request to the `BinaryEcho/collect` method and assert that only
  465. // the counters associated to interceptors that apply to it are incremented.
  466. try await client.unary(
  467. request: .init(message: Array("hello".utf8)),
  468. descriptor: BinaryEcho.Methods.collect,
  469. serializer: IdentitySerializer(),
  470. deserializer: IdentityDeserializer(),
  471. options: .defaults
  472. ) { response in
  473. let message = try #require(try response.message)
  474. #expect(message == Array("hello".utf8))
  475. }
  476. #expect(onlyBinaryEchoGetCounter.value == 1)
  477. #expect(allMethodsCounter.value == 2)
  478. #expect(onlyBinaryEchoCollectCounter.value == 1)
  479. #expect(bothBinaryEchoMethodsCounter.value == 2)
  480. }
  481. }
  482. func withInProcessConnectedClient(
  483. services: [any RegistrableRPCService],
  484. interceptorPipeline: [ConditionalInterceptor<any ClientInterceptor>] = [],
  485. _ body: (
  486. GRPCClient<InProcessTransport.Client>,
  487. GRPCServer<InProcessTransport.Server>
  488. ) async throws -> Void
  489. ) async throws {
  490. let inProcess = InProcessTransport()
  491. let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
  492. let server = GRPCServer(transport: inProcess.server, services: services)
  493. try await withThrowingTaskGroup(of: Void.self) { group in
  494. group.addTask {
  495. try await server.serve()
  496. }
  497. group.addTask {
  498. try await client.runConnections()
  499. }
  500. // Make sure both server and client are running
  501. try await Task.sleep(for: .milliseconds(100))
  502. try await body(client, server)
  503. client.beginGracefulShutdown()
  504. server.beginGracefulShutdown()
  505. }
  506. }
  507. }