GRPCClientTests.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCInProcessTransport
  19. import XCTest
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. final class GRPCClientTests: XCTestCase {
  22. func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) {
  23. let server = InProcessServerTransport()
  24. let client = InProcessClientTransport(
  25. server: server,
  26. methodConfiguration: MethodConfigurations()
  27. )
  28. return (client, server)
  29. }
  30. func withInProcessConnectedClient(
  31. services: [any RegistrableRPCService],
  32. interceptors: [any ClientInterceptor] = [],
  33. _ body: (GRPCClient, GRPCServer) async throws -> Void
  34. ) async throws {
  35. let inProcess = self.makeInProcessPair()
  36. let client = GRPCClient(transport: inProcess.client, interceptors: interceptors)
  37. let server = GRPCServer(transports: [inProcess.server], services: services)
  38. try await withThrowingTaskGroup(of: Void.self) { group in
  39. group.addTask {
  40. try await server.run()
  41. }
  42. group.addTask {
  43. try await client.run()
  44. }
  45. // Make sure both server and client are running
  46. try await Task.sleep(for: .milliseconds(100))
  47. try await body(client, server)
  48. client.close()
  49. server.stopListening()
  50. }
  51. }
  52. struct IdentitySerializer: MessageSerializer {
  53. typealias Message = [UInt8]
  54. func serialize(_ message: [UInt8]) throws -> [UInt8] {
  55. return message
  56. }
  57. }
  58. struct IdentityDeserializer: MessageDeserializer {
  59. typealias Message = [UInt8]
  60. func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] {
  61. return serializedMessageBytes
  62. }
  63. }
  64. func testUnary() async throws {
  65. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  66. try await client.unary(
  67. request: .init(message: [3, 1, 4, 1, 5]),
  68. descriptor: BinaryEcho.Methods.collect,
  69. serializer: IdentitySerializer(),
  70. deserializer: IdentityDeserializer()
  71. ) { response in
  72. let message = try response.message
  73. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  74. }
  75. }
  76. }
  77. func testClientStreaming() async throws {
  78. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  79. try await client.clientStreaming(
  80. request: .init(producer: { writer in
  81. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  82. try await writer.write([byte])
  83. }
  84. }),
  85. descriptor: BinaryEcho.Methods.collect,
  86. serializer: IdentitySerializer(),
  87. deserializer: IdentityDeserializer()
  88. ) { response in
  89. let message = try response.message
  90. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  91. }
  92. }
  93. }
  94. func testServerStreaming() async throws {
  95. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  96. try await client.serverStreaming(
  97. request: .init(message: [3, 1, 4, 1, 5]),
  98. descriptor: BinaryEcho.Methods.expand,
  99. serializer: IdentitySerializer(),
  100. deserializer: IdentityDeserializer()
  101. ) { response in
  102. var responseParts = response.messages.makeAsyncIterator()
  103. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  104. let message = try await responseParts.next()
  105. XCTAssertEqual(message, [byte])
  106. }
  107. }
  108. }
  109. }
  110. func testBidirectionalStreaming() async throws {
  111. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  112. try await client.bidirectionalStreaming(
  113. request: .init(producer: { writer in
  114. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  115. try await writer.write([byte])
  116. }
  117. }),
  118. descriptor: BinaryEcho.Methods.update,
  119. serializer: IdentitySerializer(),
  120. deserializer: IdentityDeserializer()
  121. ) { response in
  122. var responseParts = response.messages.makeAsyncIterator()
  123. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  124. let message = try await responseParts.next()
  125. XCTAssertEqual(message, [byte])
  126. }
  127. }
  128. }
  129. }
  130. func testUnimplementedMethod_Unary() async throws {
  131. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  132. try await client.unary(
  133. request: .init(message: [3, 1, 4, 1, 5]),
  134. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  135. serializer: IdentitySerializer(),
  136. deserializer: IdentityDeserializer()
  137. ) { response in
  138. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  139. XCTAssertEqual(error.code, .unimplemented)
  140. }
  141. }
  142. }
  143. }
  144. func testUnimplementedMethod_ClientStreaming() async throws {
  145. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  146. try await client.clientStreaming(
  147. request: .init(producer: { writer in
  148. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  149. try await writer.write([byte])
  150. }
  151. }),
  152. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  153. serializer: IdentitySerializer(),
  154. deserializer: IdentityDeserializer()
  155. ) { response in
  156. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  157. XCTAssertEqual(error.code, .unimplemented)
  158. }
  159. }
  160. }
  161. }
  162. func testUnimplementedMethod_ServerStreaming() async throws {
  163. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  164. try await client.serverStreaming(
  165. request: .init(message: [3, 1, 4, 1, 5]),
  166. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  167. serializer: IdentitySerializer(),
  168. deserializer: IdentityDeserializer()
  169. ) { response in
  170. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  171. XCTAssertEqual(error.code, .unimplemented)
  172. }
  173. }
  174. }
  175. }
  176. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  177. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  178. try await client.bidirectionalStreaming(
  179. request: .init(producer: { writer in
  180. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  181. try await writer.write([byte])
  182. }
  183. }),
  184. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  185. serializer: IdentitySerializer(),
  186. deserializer: IdentityDeserializer()
  187. ) { response in
  188. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  189. XCTAssertEqual(error.code, .unimplemented)
  190. }
  191. }
  192. }
  193. }
  194. func testMultipleConcurrentRequests() async throws {
  195. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  196. await withThrowingTaskGroup(of: Void.self) { group in
  197. for i in UInt8.min ..< UInt8.max {
  198. group.addTask {
  199. try await client.unary(
  200. request: .init(message: [i]),
  201. descriptor: BinaryEcho.Methods.collect,
  202. serializer: IdentitySerializer(),
  203. deserializer: IdentityDeserializer()
  204. ) { response in
  205. let message = try response.message
  206. XCTAssertEqual(message, [i])
  207. }
  208. }
  209. }
  210. }
  211. }
  212. }
  213. func testInterceptorsAreAppliedInOrder() async throws {
  214. let counter1 = ManagedAtomic(0)
  215. let counter2 = ManagedAtomic(0)
  216. try await self.withInProcessConnectedClient(
  217. services: [BinaryEcho()],
  218. interceptors: [
  219. .requestCounter(counter1),
  220. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  221. .requestCounter(counter2),
  222. ]
  223. ) { client, _ in
  224. try await client.unary(
  225. request: .init(message: [3, 1, 4, 1, 5]),
  226. descriptor: BinaryEcho.Methods.collect,
  227. serializer: IdentitySerializer(),
  228. deserializer: IdentityDeserializer()
  229. ) { response in
  230. XCTAssertRejected(response) { error in
  231. XCTAssertEqual(error.code, .unavailable)
  232. }
  233. }
  234. }
  235. XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1)
  236. XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0)
  237. }
  238. func testNoNewRPCsAfterClientClose() async throws {
  239. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  240. // Run an RPC so we know the client is running properly.
  241. try await client.unary(
  242. request: .init(message: [3, 1, 4, 1, 5]),
  243. descriptor: BinaryEcho.Methods.collect,
  244. serializer: IdentitySerializer(),
  245. deserializer: IdentityDeserializer()
  246. ) { response in
  247. let message = try response.message
  248. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  249. }
  250. // New RPCs should fail immediately after this.
  251. client.close()
  252. // RPC should fail now.
  253. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  254. try await client.unary(
  255. request: .init(message: [3, 1, 4, 1, 5]),
  256. descriptor: BinaryEcho.Methods.collect,
  257. serializer: IdentitySerializer(),
  258. deserializer: IdentityDeserializer()
  259. ) { _ in }
  260. } errorHandler: { error in
  261. XCTAssertEqual(error.code, .clientIsStopped)
  262. }
  263. }
  264. }
  265. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  266. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  267. try await client.clientStreaming(
  268. request: .init(producer: { writer in
  269. // Close the client once this RCP has been started.
  270. client.close()
  271. // Attempts to start a new RPC should fail.
  272. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  273. try await client.unary(
  274. request: .init(message: [3, 1, 4, 1, 5]),
  275. descriptor: BinaryEcho.Methods.collect,
  276. serializer: IdentitySerializer(),
  277. deserializer: IdentityDeserializer()
  278. ) { _ in }
  279. } errorHandler: { error in
  280. XCTAssertEqual(error.code, .clientIsStopped)
  281. }
  282. // Now write to the already opened stream to confirm that opened streams
  283. // can successfully run to completion.
  284. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  285. try await writer.write([byte])
  286. }
  287. }),
  288. descriptor: BinaryEcho.Methods.collect,
  289. serializer: IdentitySerializer(),
  290. deserializer: IdentityDeserializer()
  291. ) { response in
  292. let message = try response.message
  293. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  294. }
  295. }
  296. }
  297. func testCancelRunningClient() async throws {
  298. let inProcess = self.makeInProcessPair()
  299. let client = GRPCClient(transport: inProcess.client)
  300. try await withThrowingTaskGroup(of: Void.self) { group in
  301. group.addTask {
  302. let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()])
  303. try await server.run()
  304. }
  305. group.addTask {
  306. try await client.run()
  307. }
  308. // Wait for client and server to be running.
  309. try await Task.sleep(for: .milliseconds(10))
  310. let task = Task {
  311. try await client.clientStreaming(
  312. request: .init(producer: { writer in
  313. try await Task.sleep(for: .seconds(5))
  314. }),
  315. descriptor: BinaryEcho.Methods.collect,
  316. serializer: IdentitySerializer(),
  317. deserializer: IdentityDeserializer()
  318. ) { response in
  319. XCTAssertRejected(response) { error in
  320. XCTAssertEqual(error.code, .unknown)
  321. }
  322. }
  323. }
  324. // Check requests are getting through.
  325. try await client.unary(
  326. request: .init(message: [3, 1, 4, 1, 5]),
  327. descriptor: BinaryEcho.Methods.collect,
  328. serializer: IdentitySerializer(),
  329. deserializer: IdentityDeserializer()
  330. ) { response in
  331. let message = try response.message
  332. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  333. }
  334. task.cancel()
  335. try await task.value
  336. group.cancelAll()
  337. }
  338. }
  339. func testRunStoppedClient() async throws {
  340. let (clientTransport, _) = self.makeInProcessPair()
  341. let client = GRPCClient(transport: clientTransport)
  342. // Run the client.
  343. let task = Task { try await client.run() }
  344. task.cancel()
  345. try await task.value
  346. // Client is stopped, should throw an error.
  347. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  348. try await client.run()
  349. } errorHandler: { error in
  350. XCTAssertEqual(error.code, .clientIsStopped)
  351. }
  352. }
  353. func testRunAlreadyRunningClient() async throws {
  354. let (clientTransport, _) = self.makeInProcessPair()
  355. let client = GRPCClient(transport: clientTransport)
  356. // Run the client.
  357. let task = Task { try await client.run() }
  358. // Make sure the client is run for the first time here.
  359. try await Task.sleep(for: .milliseconds(10))
  360. // Client is already running, should throw an error.
  361. await XCTAssertThrowsErrorAsync(ofType: ClientError.self) {
  362. try await client.run()
  363. } errorHandler: { error in
  364. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  365. }
  366. task.cancel()
  367. }
  368. }