GRPCClientTests.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCInProcessTransport
  19. import XCTest
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. final class GRPCClientTests: XCTestCase {
  22. func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) {
  23. let server = InProcessServerTransport()
  24. let client = InProcessClientTransport(
  25. server: server,
  26. methodConfiguration: MethodConfigurations()
  27. )
  28. return (client, server)
  29. }
  30. func withInProcessConnectedClient(
  31. services: [any RegistrableRPCService],
  32. interceptors: [any ClientInterceptor] = [],
  33. _ body: (GRPCClient, GRPCServer) async throws -> Void
  34. ) async throws {
  35. let inProcess = self.makeInProcessPair()
  36. var configuration = GRPCClient.Configuration()
  37. configuration.interceptors.add(contentsOf: interceptors)
  38. let client = GRPCClient(transport: inProcess.client, configuration: configuration)
  39. let server = GRPCServer()
  40. server.transports.add(inProcess.server)
  41. for service in services {
  42. server.services.register(service)
  43. }
  44. try await withThrowingTaskGroup(of: Void.self) { group in
  45. group.addTask {
  46. try await server.run()
  47. }
  48. group.addTask {
  49. try await client.run()
  50. }
  51. // Make sure both server and client are running
  52. try await Task.sleep(for: .milliseconds(100))
  53. try await body(client, server)
  54. client.close()
  55. server.stopListening()
  56. }
  57. }
  58. struct IdentitySerializer: MessageSerializer {
  59. typealias Message = [UInt8]
  60. func serialize(_ message: [UInt8]) throws -> [UInt8] {
  61. return message
  62. }
  63. }
  64. struct IdentityDeserializer: MessageDeserializer {
  65. typealias Message = [UInt8]
  66. func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] {
  67. return serializedMessageBytes
  68. }
  69. }
  70. func testUnary() async throws {
  71. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  72. try await client.unary(
  73. request: .init(message: [3, 1, 4, 1, 5]),
  74. descriptor: BinaryEcho.Methods.collect,
  75. serializer: IdentitySerializer(),
  76. deserializer: IdentityDeserializer()
  77. ) { response in
  78. let message = try response.message
  79. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  80. }
  81. }
  82. }
  83. func testClientStreaming() async throws {
  84. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  85. try await client.clientStreaming(
  86. request: .init(producer: { writer in
  87. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  88. try await writer.write([byte])
  89. }
  90. }),
  91. descriptor: BinaryEcho.Methods.collect,
  92. serializer: IdentitySerializer(),
  93. deserializer: IdentityDeserializer()
  94. ) { response in
  95. let message = try response.message
  96. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  97. }
  98. }
  99. }
  100. func testServerStreaming() async throws {
  101. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  102. try await client.serverStreaming(
  103. request: .init(message: [3, 1, 4, 1, 5]),
  104. descriptor: BinaryEcho.Methods.expand,
  105. serializer: IdentitySerializer(),
  106. deserializer: IdentityDeserializer()
  107. ) { response in
  108. var responseParts = response.messages.makeAsyncIterator()
  109. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  110. let message = try await responseParts.next()
  111. XCTAssertEqual(message, [byte])
  112. }
  113. }
  114. }
  115. }
  116. func testBidirectionalStreaming() async throws {
  117. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  118. try await client.bidirectionalStreaming(
  119. request: .init(producer: { writer in
  120. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  121. try await writer.write([byte])
  122. }
  123. }),
  124. descriptor: BinaryEcho.Methods.update,
  125. serializer: IdentitySerializer(),
  126. deserializer: IdentityDeserializer()
  127. ) { response in
  128. var responseParts = response.messages.makeAsyncIterator()
  129. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  130. let message = try await responseParts.next()
  131. XCTAssertEqual(message, [byte])
  132. }
  133. }
  134. }
  135. }
  136. func testUnimplementedMethod_Unary() async throws {
  137. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  138. try await client.unary(
  139. request: .init(message: [3, 1, 4, 1, 5]),
  140. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  141. serializer: IdentitySerializer(),
  142. deserializer: IdentityDeserializer()
  143. ) { response in
  144. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  145. XCTAssertEqual(error.code, .unimplemented)
  146. }
  147. }
  148. }
  149. }
  150. func testUnimplementedMethod_ClientStreaming() async throws {
  151. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  152. try await client.clientStreaming(
  153. request: .init(producer: { writer in
  154. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  155. try await writer.write([byte])
  156. }
  157. }),
  158. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  159. serializer: IdentitySerializer(),
  160. deserializer: IdentityDeserializer()
  161. ) { response in
  162. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  163. XCTAssertEqual(error.code, .unimplemented)
  164. }
  165. }
  166. }
  167. }
  168. func testUnimplementedMethod_ServerStreaming() async throws {
  169. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  170. try await client.serverStreaming(
  171. request: .init(message: [3, 1, 4, 1, 5]),
  172. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  173. serializer: IdentitySerializer(),
  174. deserializer: IdentityDeserializer()
  175. ) { response in
  176. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  177. XCTAssertEqual(error.code, .unimplemented)
  178. }
  179. }
  180. }
  181. }
  182. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  183. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  184. try await client.bidirectionalStreaming(
  185. request: .init(producer: { writer in
  186. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  187. try await writer.write([byte])
  188. }
  189. }),
  190. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  191. serializer: IdentitySerializer(),
  192. deserializer: IdentityDeserializer()
  193. ) { response in
  194. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  195. XCTAssertEqual(error.code, .unimplemented)
  196. }
  197. }
  198. }
  199. }
  200. func testMultipleConcurrentRequests() async throws {
  201. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  202. await withThrowingTaskGroup(of: Void.self) { group in
  203. for i in UInt8.min ..< UInt8.max {
  204. group.addTask {
  205. try await client.unary(
  206. request: .init(message: [i]),
  207. descriptor: BinaryEcho.Methods.collect,
  208. serializer: IdentitySerializer(),
  209. deserializer: IdentityDeserializer()
  210. ) { response in
  211. let message = try response.message
  212. XCTAssertEqual(message, [i])
  213. }
  214. }
  215. }
  216. }
  217. }
  218. }
  219. func testInterceptorsAreAppliedInOrder() async throws {
  220. let counter1 = ManagedAtomic(0)
  221. let counter2 = ManagedAtomic(0)
  222. try await self.withInProcessConnectedClient(
  223. services: [BinaryEcho()],
  224. interceptors: [
  225. .requestCounter(counter1),
  226. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  227. .requestCounter(counter2),
  228. ]
  229. ) { client, _ in
  230. try await client.unary(
  231. request: .init(message: [3, 1, 4, 1, 5]),
  232. descriptor: BinaryEcho.Methods.collect,
  233. serializer: IdentitySerializer(),
  234. deserializer: IdentityDeserializer()
  235. ) { response in
  236. XCTAssertRejected(response) { error in
  237. XCTAssertEqual(error.code, .unavailable)
  238. }
  239. }
  240. }
  241. XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1)
  242. XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0)
  243. }
  244. func testNoNewRPCsAfterClientClose() async throws {
  245. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  246. // Run an RPC so we know the client is running properly.
  247. try await client.unary(
  248. request: .init(message: [3, 1, 4, 1, 5]),
  249. descriptor: BinaryEcho.Methods.collect,
  250. serializer: IdentitySerializer(),
  251. deserializer: IdentityDeserializer()
  252. ) { response in
  253. let message = try response.message
  254. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  255. }
  256. // New RPCs should fail immediately after this.
  257. client.close()
  258. // RPC should fail now.
  259. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  260. try await client.unary(
  261. request: .init(message: [3, 1, 4, 1, 5]),
  262. descriptor: BinaryEcho.Methods.collect,
  263. serializer: IdentitySerializer(),
  264. deserializer: IdentityDeserializer()
  265. ) { _ in }
  266. } errorHandler: { error in
  267. XCTAssertEqual(error.code, .clientIsStopped)
  268. }
  269. }
  270. }
  271. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  272. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  273. try await client.clientStreaming(
  274. request: .init(producer: { writer in
  275. // Close the client once this RCP has been started.
  276. client.close()
  277. // Attempts to start a new RPC should fail.
  278. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  279. try await client.unary(
  280. request: .init(message: [3, 1, 4, 1, 5]),
  281. descriptor: BinaryEcho.Methods.collect,
  282. serializer: IdentitySerializer(),
  283. deserializer: IdentityDeserializer()
  284. ) { _ in }
  285. } errorHandler: { error in
  286. XCTAssertEqual(error.code, .clientIsStopped)
  287. }
  288. // Now write to the already opened stream to confirm that opened streams
  289. // can successfully run to completion.
  290. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  291. try await writer.write([byte])
  292. }
  293. }),
  294. descriptor: BinaryEcho.Methods.collect,
  295. serializer: IdentitySerializer(),
  296. deserializer: IdentityDeserializer()
  297. ) { response in
  298. let message = try response.message
  299. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  300. }
  301. }
  302. }
  303. func testCancelRunningClient() async throws {
  304. let inProcess = self.makeInProcessPair()
  305. let client = GRPCClient(transport: inProcess.client)
  306. try await withThrowingTaskGroup(of: Void.self) { group in
  307. group.addTask {
  308. let server = GRPCServer()
  309. server.services.register(BinaryEcho())
  310. server.transports.add(inProcess.server)
  311. try await server.run()
  312. }
  313. group.addTask {
  314. try await client.run()
  315. }
  316. // Wait for client and server to be running.
  317. try await Task.sleep(for: .milliseconds(10))
  318. let task = Task {
  319. try await client.clientStreaming(
  320. request: .init(producer: { writer in
  321. try await Task.sleep(for: .seconds(5))
  322. }),
  323. descriptor: BinaryEcho.Methods.collect,
  324. serializer: IdentitySerializer(),
  325. deserializer: IdentityDeserializer()
  326. ) { response in
  327. XCTAssertRejected(response) { error in
  328. XCTAssertEqual(error.code, .unknown)
  329. }
  330. }
  331. }
  332. // Check requests are getting through.
  333. try await client.unary(
  334. request: .init(message: [3, 1, 4, 1, 5]),
  335. descriptor: BinaryEcho.Methods.collect,
  336. serializer: IdentitySerializer(),
  337. deserializer: IdentityDeserializer()
  338. ) { response in
  339. let message = try response.message
  340. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  341. }
  342. task.cancel()
  343. try await task.value
  344. group.cancelAll()
  345. }
  346. }
  347. func testRunStoppedClient() async throws {
  348. let (clientTransport, _) = self.makeInProcessPair()
  349. let client = GRPCClient(transport: clientTransport)
  350. // Run the client.
  351. let task = Task { try await client.run() }
  352. task.cancel()
  353. try await task.value
  354. // Client is stopped, should throw an error.
  355. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  356. try await client.run()
  357. } errorHandler: { error in
  358. XCTAssertEqual(error.code, .clientIsStopped)
  359. }
  360. }
  361. func testRunAlreadyRunningClient() async throws {
  362. let (clientTransport, _) = self.makeInProcessPair()
  363. let client = GRPCClient(transport: clientTransport)
  364. // Run the client.
  365. let task = Task { try await client.run() }
  366. // Make sure the client is run for the first time here.
  367. try await Task.sleep(for: .milliseconds(10))
  368. // Client is already running, should throw an error.
  369. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  370. try await client.run()
  371. } errorHandler: { error in
  372. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  373. }
  374. task.cancel()
  375. }
  376. func testRunClientNotRunning() async throws {
  377. let (clientTransport, _) = self.makeInProcessPair()
  378. let client = GRPCClient(transport: clientTransport)
  379. // Client is not running, should throw an error.
  380. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  381. try await client.unary(
  382. request: .init(message: [3, 1, 4, 1, 5]),
  383. descriptor: BinaryEcho.Methods.collect,
  384. serializer: IdentitySerializer(),
  385. deserializer: IdentityDeserializer()
  386. ) { response in
  387. let message = try response.message
  388. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  389. }
  390. } errorHandler: { error in
  391. XCTAssertEqual(error.code, .clientIsNotRunning)
  392. }
  393. }
  394. func testInterceptorsDescription() async throws {
  395. var config = GRPCClient.Configuration()
  396. config.interceptors.add(.rejectAll(with: .init(code: .aborted, message: "")))
  397. config.interceptors.add(.requestCounter(.init(0)))
  398. let description = String(describing: config.interceptors)
  399. let expected = #"["RejectAllClientInterceptor", "RequestCountingClientInterceptor"]"#
  400. XCTAssertEqual(description, expected)
  401. }
  402. }