GRPCClientTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCInProcessTransport
  18. import XCTest
  19. final class GRPCClientTests: XCTestCase {
  20. func withInProcessConnectedClient(
  21. services: [any RegistrableRPCService],
  22. interceptors: [any ClientInterceptor] = [],
  23. _ body: (GRPCClient, GRPCServer) async throws -> Void
  24. ) async throws {
  25. let inProcess = InProcessTransport()
  26. let client = GRPCClient(transport: inProcess.client, interceptors: interceptors)
  27. let server = GRPCServer(transport: inProcess.server, services: services)
  28. try await withThrowingTaskGroup(of: Void.self) { group in
  29. group.addTask {
  30. try await server.serve()
  31. }
  32. group.addTask {
  33. try await client.run()
  34. }
  35. // Make sure both server and client are running
  36. try await Task.sleep(for: .milliseconds(100))
  37. try await body(client, server)
  38. client.beginGracefulShutdown()
  39. server.beginGracefulShutdown()
  40. }
  41. }
  42. struct IdentitySerializer: MessageSerializer {
  43. typealias Message = [UInt8]
  44. func serialize(_ message: [UInt8]) throws -> [UInt8] {
  45. return message
  46. }
  47. }
  48. struct IdentityDeserializer: MessageDeserializer {
  49. typealias Message = [UInt8]
  50. func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] {
  51. return serializedMessageBytes
  52. }
  53. }
  54. func testUnary() async throws {
  55. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  56. try await client.unary(
  57. request: .init(message: [3, 1, 4, 1, 5]),
  58. descriptor: BinaryEcho.Methods.collect,
  59. serializer: IdentitySerializer(),
  60. deserializer: IdentityDeserializer(),
  61. options: .defaults
  62. ) { response in
  63. let message = try response.message
  64. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  65. }
  66. }
  67. }
  68. func testClientStreaming() async throws {
  69. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  70. try await client.clientStreaming(
  71. request: .init(producer: { writer in
  72. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  73. try await writer.write([byte])
  74. }
  75. }),
  76. descriptor: BinaryEcho.Methods.collect,
  77. serializer: IdentitySerializer(),
  78. deserializer: IdentityDeserializer(),
  79. options: .defaults
  80. ) { response in
  81. let message = try response.message
  82. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  83. }
  84. }
  85. }
  86. func testServerStreaming() async throws {
  87. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  88. try await client.serverStreaming(
  89. request: .init(message: [3, 1, 4, 1, 5]),
  90. descriptor: BinaryEcho.Methods.expand,
  91. serializer: IdentitySerializer(),
  92. deserializer: IdentityDeserializer(),
  93. options: .defaults
  94. ) { response in
  95. var responseParts = response.messages.makeAsyncIterator()
  96. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  97. let message = try await responseParts.next()
  98. XCTAssertEqual(message, [byte])
  99. }
  100. }
  101. }
  102. }
  103. func testBidirectionalStreaming() async throws {
  104. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  105. try await client.bidirectionalStreaming(
  106. request: .init(producer: { writer in
  107. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  108. try await writer.write([byte])
  109. }
  110. }),
  111. descriptor: BinaryEcho.Methods.update,
  112. serializer: IdentitySerializer(),
  113. deserializer: IdentityDeserializer(),
  114. options: .defaults
  115. ) { response in
  116. var responseParts = response.messages.makeAsyncIterator()
  117. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  118. let message = try await responseParts.next()
  119. XCTAssertEqual(message, [byte])
  120. }
  121. }
  122. }
  123. }
  124. func testUnimplementedMethod_Unary() async throws {
  125. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  126. try await client.unary(
  127. request: .init(message: [3, 1, 4, 1, 5]),
  128. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  129. serializer: IdentitySerializer(),
  130. deserializer: IdentityDeserializer(),
  131. options: .defaults
  132. ) { response in
  133. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  134. XCTAssertEqual(error.code, .unimplemented)
  135. }
  136. }
  137. }
  138. }
  139. func testUnimplementedMethod_ClientStreaming() async throws {
  140. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  141. try await client.clientStreaming(
  142. request: .init(producer: { writer in
  143. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  144. try await writer.write([byte])
  145. }
  146. }),
  147. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  148. serializer: IdentitySerializer(),
  149. deserializer: IdentityDeserializer(),
  150. options: .defaults
  151. ) { response in
  152. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  153. XCTAssertEqual(error.code, .unimplemented)
  154. }
  155. }
  156. }
  157. }
  158. func testUnimplementedMethod_ServerStreaming() async throws {
  159. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  160. try await client.serverStreaming(
  161. request: .init(message: [3, 1, 4, 1, 5]),
  162. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  163. serializer: IdentitySerializer(),
  164. deserializer: IdentityDeserializer(),
  165. options: .defaults
  166. ) { response in
  167. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  168. XCTAssertEqual(error.code, .unimplemented)
  169. }
  170. }
  171. }
  172. }
  173. func testUnimplementedMethod_BidirectionalStreaming() async throws {
  174. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  175. try await client.bidirectionalStreaming(
  176. request: .init(producer: { writer in
  177. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  178. try await writer.write([byte])
  179. }
  180. }),
  181. descriptor: MethodDescriptor(service: "not", method: "implemented"),
  182. serializer: IdentitySerializer(),
  183. deserializer: IdentityDeserializer(),
  184. options: .defaults
  185. ) { response in
  186. XCTAssertThrowsRPCError(try response.accepted.get()) { error in
  187. XCTAssertEqual(error.code, .unimplemented)
  188. }
  189. }
  190. }
  191. }
  192. func testMultipleConcurrentRequests() async throws {
  193. try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  194. await withThrowingTaskGroup(of: Void.self) { group in
  195. for i in UInt8.min ..< UInt8.max {
  196. group.addTask {
  197. try await client.unary(
  198. request: .init(message: [i]),
  199. descriptor: BinaryEcho.Methods.collect,
  200. serializer: IdentitySerializer(),
  201. deserializer: IdentityDeserializer(),
  202. options: .defaults
  203. ) { response in
  204. let message = try response.message
  205. XCTAssertEqual(message, [i])
  206. }
  207. }
  208. }
  209. }
  210. }
  211. }
  212. func testInterceptorsAreAppliedInOrder() async throws {
  213. let counter1 = AtomicCounter()
  214. let counter2 = AtomicCounter()
  215. try await self.withInProcessConnectedClient(
  216. services: [BinaryEcho()],
  217. interceptors: [
  218. .requestCounter(counter1),
  219. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  220. .requestCounter(counter2),
  221. ]
  222. ) { client, _ in
  223. try await client.unary(
  224. request: .init(message: [3, 1, 4, 1, 5]),
  225. descriptor: BinaryEcho.Methods.collect,
  226. serializer: IdentitySerializer(),
  227. deserializer: IdentityDeserializer(),
  228. options: .defaults
  229. ) { response in
  230. XCTAssertRejected(response) { error in
  231. XCTAssertEqual(error.code, .unavailable)
  232. }
  233. }
  234. }
  235. XCTAssertEqual(counter1.value, 1)
  236. XCTAssertEqual(counter2.value, 0)
  237. }
  238. func testNoNewRPCsAfterClientClose() async throws {
  239. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in
  240. // Run an RPC so we know the client is running properly.
  241. try await client.unary(
  242. request: .init(message: [3, 1, 4, 1, 5]),
  243. descriptor: BinaryEcho.Methods.collect,
  244. serializer: IdentitySerializer(),
  245. deserializer: IdentityDeserializer(),
  246. options: .defaults
  247. ) { response in
  248. let message = try response.message
  249. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  250. }
  251. // New RPCs should fail immediately after this.
  252. client.beginGracefulShutdown()
  253. // RPC should fail now.
  254. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  255. try await client.unary(
  256. request: .init(message: [3, 1, 4, 1, 5]),
  257. descriptor: BinaryEcho.Methods.collect,
  258. serializer: IdentitySerializer(),
  259. deserializer: IdentityDeserializer(),
  260. options: .defaults
  261. ) { _ in }
  262. } errorHandler: { error in
  263. XCTAssertEqual(error.code, .clientIsStopped)
  264. }
  265. }
  266. }
  267. func testInFlightRPCsCanContinueAfterClientIsClosed() async throws {
  268. try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in
  269. try await client.clientStreaming(
  270. request: .init(producer: { writer in
  271. // Close the client once this RCP has been started.
  272. client.beginGracefulShutdown()
  273. // Attempts to start a new RPC should fail.
  274. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  275. try await client.unary(
  276. request: .init(message: [3, 1, 4, 1, 5]),
  277. descriptor: BinaryEcho.Methods.collect,
  278. serializer: IdentitySerializer(),
  279. deserializer: IdentityDeserializer(),
  280. options: .defaults
  281. ) { _ in }
  282. } errorHandler: { error in
  283. XCTAssertEqual(error.code, .clientIsStopped)
  284. }
  285. // Now write to the already opened stream to confirm that opened streams
  286. // can successfully run to completion.
  287. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  288. try await writer.write([byte])
  289. }
  290. }),
  291. descriptor: BinaryEcho.Methods.collect,
  292. serializer: IdentitySerializer(),
  293. deserializer: IdentityDeserializer(),
  294. options: .defaults
  295. ) { response in
  296. let message = try response.message
  297. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  298. }
  299. }
  300. }
  301. func testCancelRunningClient() async throws {
  302. let inProcess = InProcessTransport()
  303. let client = GRPCClient(transport: inProcess.client)
  304. try await withThrowingTaskGroup(of: Void.self) { group in
  305. group.addTask {
  306. let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()])
  307. try await server.serve()
  308. }
  309. group.addTask {
  310. try await client.run()
  311. }
  312. // Wait for client and server to be running.
  313. try await client.unary(
  314. request: .init(message: [3, 1, 4, 1, 5]),
  315. descriptor: BinaryEcho.Methods.collect,
  316. serializer: IdentitySerializer(),
  317. deserializer: IdentityDeserializer(),
  318. options: .defaults
  319. ) { response in
  320. let message = try response.message
  321. XCTAssertEqual(message, [3, 1, 4, 1, 5])
  322. }
  323. let task = Task {
  324. try await client.clientStreaming(
  325. request: StreamingClientRequest { writer in
  326. try await Task.sleep(for: .seconds(5))
  327. },
  328. descriptor: BinaryEcho.Methods.collect,
  329. serializer: IdentitySerializer(),
  330. deserializer: IdentityDeserializer(),
  331. options: .defaults
  332. ) { response in
  333. XCTAssertRejected(response) { error in
  334. XCTAssertEqual(error.code, .unknown)
  335. }
  336. }
  337. }
  338. task.cancel()
  339. try await task.value
  340. group.cancelAll()
  341. }
  342. }
  343. func testRunStoppedClient() async throws {
  344. let inProcess = InProcessTransport()
  345. let client = GRPCClient(transport: inProcess.client)
  346. // Run the client.
  347. let task = Task { try await client.run() }
  348. task.cancel()
  349. try await task.value
  350. // Client is stopped, should throw an error.
  351. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  352. try await client.run()
  353. } errorHandler: { error in
  354. XCTAssertEqual(error.code, .clientIsStopped)
  355. }
  356. }
  357. func testRunAlreadyRunningClient() async throws {
  358. let inProcess = InProcessTransport()
  359. let client = GRPCClient(transport: inProcess.client)
  360. // Run the client.
  361. let task = Task { try await client.run() }
  362. // Make sure the client is run for the first time here.
  363. try await Task.sleep(for: .milliseconds(10))
  364. // Client is already running, should throw an error.
  365. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  366. try await client.run()
  367. } errorHandler: { error in
  368. XCTAssertEqual(error.code, .clientIsAlreadyRunning)
  369. }
  370. task.cancel()
  371. }
  372. }