GRPCServerTests.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCInProcessTransport
  18. import Testing
  19. import XCTest
  20. @available(gRPCSwift 2.0, *)
  21. final class GRPCServerTests: XCTestCase {
  22. func withInProcessClientConnectedToServer(
  23. services: [any RegistrableRPCService],
  24. interceptorPipeline: [ConditionalInterceptor<any ServerInterceptor>] = [],
  25. _ body: (InProcessTransport.Client, GRPCServer<InProcessTransport.Server>) async throws -> Void
  26. ) async throws {
  27. let inProcess = InProcessTransport()
  28. try await withGRPCServer(
  29. transport: inProcess.server,
  30. services: services,
  31. interceptorPipeline: interceptorPipeline
  32. ) { server in
  33. try await withThrowingTaskGroup(of: Void.self) { group in
  34. group.addTask {
  35. try await inProcess.client.connect()
  36. }
  37. try await body(inProcess.client, server)
  38. inProcess.client.beginGracefulShutdown()
  39. }
  40. }
  41. }
  42. func testServerHandlesUnary() async throws {
  43. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  44. try await client.withStream(
  45. descriptor: BinaryEcho.Methods.get,
  46. options: .defaults
  47. ) { stream, _ in
  48. try await stream.outbound.write(.metadata([:]))
  49. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  50. await stream.outbound.finish()
  51. var responseParts = stream.inbound.makeAsyncIterator()
  52. let metadata = try await responseParts.next()
  53. XCTAssertMetadata(metadata)
  54. let message = try await responseParts.next()
  55. XCTAssertMessage(message) {
  56. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  57. }
  58. let status = try await responseParts.next()
  59. XCTAssertStatus(status) { status, _ in
  60. XCTAssertEqual(status.code, .ok)
  61. }
  62. }
  63. }
  64. }
  65. func testServerHandlesClientStreaming() async throws {
  66. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  67. try await client.withStream(
  68. descriptor: BinaryEcho.Methods.collect,
  69. options: .defaults
  70. ) { stream, _ in
  71. try await stream.outbound.write(.metadata([:]))
  72. try await stream.outbound.write(.message([3]))
  73. try await stream.outbound.write(.message([1]))
  74. try await stream.outbound.write(.message([4]))
  75. try await stream.outbound.write(.message([1]))
  76. try await stream.outbound.write(.message([5]))
  77. await stream.outbound.finish()
  78. var responseParts = stream.inbound.makeAsyncIterator()
  79. let metadata = try await responseParts.next()
  80. XCTAssertMetadata(metadata)
  81. let message = try await responseParts.next()
  82. XCTAssertMessage(message) {
  83. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  84. }
  85. let status = try await responseParts.next()
  86. XCTAssertStatus(status) { status, _ in
  87. XCTAssertEqual(status.code, .ok)
  88. }
  89. }
  90. }
  91. }
  92. func testServerHandlesServerStreaming() async throws {
  93. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  94. try await client.withStream(
  95. descriptor: BinaryEcho.Methods.expand,
  96. options: .defaults
  97. ) { stream, _ in
  98. try await stream.outbound.write(.metadata([:]))
  99. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  100. await stream.outbound.finish()
  101. var responseParts = stream.inbound.makeAsyncIterator()
  102. let metadata = try await responseParts.next()
  103. XCTAssertMetadata(metadata)
  104. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  105. let message = try await responseParts.next()
  106. XCTAssertMessage(message) {
  107. XCTAssertEqual($0, [byte])
  108. }
  109. }
  110. let status = try await responseParts.next()
  111. XCTAssertStatus(status) { status, _ in
  112. XCTAssertEqual(status.code, .ok)
  113. }
  114. }
  115. }
  116. }
  117. func testServerHandlesBidirectionalStreaming() async throws {
  118. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  119. try await client.withStream(
  120. descriptor: BinaryEcho.Methods.update,
  121. options: .defaults
  122. ) { stream, _ in
  123. try await stream.outbound.write(.metadata([:]))
  124. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  125. try await stream.outbound.write(.message([byte]))
  126. }
  127. await stream.outbound.finish()
  128. var responseParts = stream.inbound.makeAsyncIterator()
  129. let metadata = try await responseParts.next()
  130. XCTAssertMetadata(metadata)
  131. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  132. let message = try await responseParts.next()
  133. XCTAssertMessage(message) {
  134. XCTAssertEqual($0, [byte])
  135. }
  136. }
  137. let status = try await responseParts.next()
  138. XCTAssertStatus(status) { status, _ in
  139. XCTAssertEqual(status.code, .ok)
  140. }
  141. }
  142. }
  143. }
  144. func testUnimplementedMethod() async throws {
  145. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  146. try await client.withStream(
  147. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  148. options: .defaults
  149. ) { stream, _ in
  150. try await stream.outbound.write(.metadata([:]))
  151. await stream.outbound.finish()
  152. var responseParts = stream.inbound.makeAsyncIterator()
  153. let status = try await responseParts.next()
  154. XCTAssertStatus(status) { status, _ in
  155. XCTAssertEqual(status.code, .unimplemented)
  156. }
  157. }
  158. }
  159. }
  160. func testMultipleConcurrentRequests() async throws {
  161. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  162. await withThrowingTaskGroup(of: Void.self) { group in
  163. for i in UInt8.min ..< UInt8.max {
  164. group.addTask {
  165. try await client.withStream(
  166. descriptor: BinaryEcho.Methods.get,
  167. options: .defaults
  168. ) { stream, _ in
  169. try await stream.outbound.write(.metadata([:]))
  170. try await stream.outbound.write(.message([i]))
  171. await stream.outbound.finish()
  172. var responseParts = stream.inbound.makeAsyncIterator()
  173. let metadata = try await responseParts.next()
  174. XCTAssertMetadata(metadata)
  175. let message = try await responseParts.next()
  176. XCTAssertMessage(message) { XCTAssertEqual($0, [i]) }
  177. let status = try await responseParts.next()
  178. XCTAssertStatus(status) { status, _ in
  179. XCTAssertEqual(status.code, .ok)
  180. }
  181. }
  182. }
  183. }
  184. }
  185. }
  186. }
  187. func testInterceptorsAreAppliedInOrder() async throws {
  188. let counter1 = AtomicCounter()
  189. let counter2 = AtomicCounter()
  190. try await self.withInProcessClientConnectedToServer(
  191. services: [BinaryEcho()],
  192. interceptorPipeline: [
  193. .apply(.requestCounter(counter1), to: .all),
  194. .apply(.rejectAll(with: RPCError(code: .unavailable, message: "")), to: .all),
  195. .apply(.requestCounter(counter2), to: .all),
  196. ]
  197. ) { client, _ in
  198. try await client.withStream(
  199. descriptor: BinaryEcho.Methods.get,
  200. options: .defaults
  201. ) { stream, _ in
  202. try await stream.outbound.write(.metadata([:]))
  203. await stream.outbound.finish()
  204. let parts = try await stream.inbound.collect()
  205. XCTAssertStatus(parts.first) { status, _ in
  206. XCTAssertEqual(status.code, .unavailable)
  207. }
  208. }
  209. }
  210. XCTAssertEqual(counter1.value, 1)
  211. XCTAssertEqual(counter2.value, 0)
  212. }
  213. func testInterceptorsAreNotAppliedToUnimplementedMethods() async throws {
  214. let counter = AtomicCounter()
  215. try await self.withInProcessClientConnectedToServer(
  216. services: [BinaryEcho()],
  217. interceptorPipeline: [.apply(.requestCounter(counter), to: .all)]
  218. ) { client, _ in
  219. try await client.withStream(
  220. descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
  221. options: .defaults
  222. ) { stream, _ in
  223. try await stream.outbound.write(.metadata([:]))
  224. await stream.outbound.finish()
  225. let parts = try await stream.inbound.collect()
  226. XCTAssertStatus(parts.first) { status, _ in
  227. XCTAssertEqual(status.code, .unimplemented)
  228. }
  229. }
  230. }
  231. XCTAssertEqual(counter.value, 0)
  232. }
  233. func testNoNewRPCsAfterServerStopListening() async throws {
  234. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  235. // Run an RPC so we know the server is up.
  236. try await self.doEchoGet(using: client)
  237. // New streams should fail immediately after this.
  238. server.beginGracefulShutdown()
  239. // RPC should fail now.
  240. await XCTAssertThrowsRPCErrorAsync {
  241. try await client.withStream(
  242. descriptor: BinaryEcho.Methods.get,
  243. options: .defaults
  244. ) { stream, _ in
  245. XCTFail("Stream shouldn't be opened")
  246. }
  247. } errorHandler: { error in
  248. XCTAssertEqual(error.code, .failedPrecondition)
  249. }
  250. }
  251. }
  252. func testInFlightRPCsCanContinueAfterServerStopListening() async throws {
  253. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  254. try await client.withStream(
  255. descriptor: BinaryEcho.Methods.update,
  256. options: .defaults
  257. ) { stream, _ in
  258. try await stream.outbound.write(.metadata([:]))
  259. var iterator = stream.inbound.makeAsyncIterator()
  260. // Don't need to validate the response, just that the server is running.
  261. let metadata = try await iterator.next()
  262. XCTAssertMetadata(metadata)
  263. // New streams should fail immediately after this.
  264. server.beginGracefulShutdown()
  265. try await stream.outbound.write(.message([0]))
  266. await stream.outbound.finish()
  267. let message = try await iterator.next()
  268. XCTAssertMessage(message) { XCTAssertEqual($0, [0]) }
  269. let status = try await iterator.next()
  270. XCTAssertStatus(status)
  271. }
  272. }
  273. }
  274. func testCancelRunningServer() async throws {
  275. let inProcess = InProcessTransport()
  276. let task = Task {
  277. let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()])
  278. try await server.serve()
  279. }
  280. try await withThrowingTaskGroup(of: Void.self) { group in
  281. group.addTask {
  282. try? await inProcess.client.connect()
  283. }
  284. try await self.doEchoGet(using: inProcess.client)
  285. // The server must be running at this point as an RPC has completed.
  286. task.cancel()
  287. try await task.value
  288. group.cancelAll()
  289. }
  290. }
  291. func testTestRunStoppedServer() async throws {
  292. let server = GRPCServer(
  293. transport: InProcessTransport.Server(peer: "in-process:1234"),
  294. services: []
  295. )
  296. // Run the server.
  297. let task = Task { try await server.serve() }
  298. task.cancel()
  299. try await task.value
  300. // Server is stopped, should throw an error.
  301. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  302. try await server.serve()
  303. } errorHandler: { error in
  304. XCTAssertEqual(error.code, .serverIsStopped)
  305. }
  306. }
  307. func testRunServerWhenTransportThrows() async throws {
  308. let server = GRPCServer(transport: ThrowOnRunServerTransport(), services: [])
  309. await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
  310. try await server.serve()
  311. } errorHandler: { error in
  312. XCTAssertEqual(error.code, .transportError)
  313. }
  314. }
  315. private func doEchoGet(using transport: some ClientTransport<[UInt8]>) async throws {
  316. try await transport.withStream(
  317. descriptor: BinaryEcho.Methods.get,
  318. options: .defaults
  319. ) { stream, _ in
  320. try await stream.outbound.write(.metadata([:]))
  321. try await stream.outbound.write(.message([0]))
  322. await stream.outbound.finish()
  323. // Don't need to validate the response, just that the server is running.
  324. let parts = try await stream.inbound.collect()
  325. XCTAssertEqual(parts.count, 3)
  326. }
  327. }
  328. }
  329. @Suite("GRPC Server Tests")
  330. struct ServerTests {
  331. @Test("Interceptors are applied only to specified services")
  332. @available(gRPCSwift 2.0, *)
  333. func testInterceptorsAreAppliedToSpecifiedServices() async throws {
  334. let onlyBinaryEchoCounter = AtomicCounter()
  335. let allServicesCounter = AtomicCounter()
  336. let onlyHelloWorldCounter = AtomicCounter()
  337. let bothServicesCounter = AtomicCounter()
  338. try await self.withInProcessClientConnectedToServer(
  339. services: [BinaryEcho(), HelloWorld()],
  340. interceptorPipeline: [
  341. .apply(
  342. .requestCounter(onlyBinaryEchoCounter),
  343. to: .services([BinaryEcho.serviceDescriptor])
  344. ),
  345. .apply(.requestCounter(allServicesCounter), to: .all),
  346. .apply(
  347. .requestCounter(onlyHelloWorldCounter),
  348. to: .services([HelloWorld.serviceDescriptor])
  349. ),
  350. .apply(
  351. .requestCounter(bothServicesCounter),
  352. to: .services([BinaryEcho.serviceDescriptor, HelloWorld.serviceDescriptor])
  353. ),
  354. ]
  355. ) { client, _ in
  356. // Make a request to the `BinaryEcho` service and assert that only
  357. // the counters associated to interceptors that apply to it are incremented.
  358. try await client.withStream(
  359. descriptor: BinaryEcho.Methods.get,
  360. options: .defaults
  361. ) { stream, _ in
  362. try await stream.outbound.write(.metadata([:]))
  363. try await stream.outbound.write(.message(Array("hello".utf8)))
  364. await stream.outbound.finish()
  365. var responseParts = stream.inbound.makeAsyncIterator()
  366. let metadata = try await responseParts.next()
  367. self.assertMetadata(metadata)
  368. let message = try await responseParts.next()
  369. self.assertMessage(message) {
  370. #expect($0 == Array("hello".utf8))
  371. }
  372. let status = try await responseParts.next()
  373. self.assertStatus(status) { status, _ in
  374. #expect(status.code == .ok, Comment(rawValue: status.description))
  375. }
  376. }
  377. #expect(onlyBinaryEchoCounter.value == 1)
  378. #expect(allServicesCounter.value == 1)
  379. #expect(onlyHelloWorldCounter.value == 0)
  380. #expect(bothServicesCounter.value == 1)
  381. // Now, make a request to the `HelloWorld` service and assert that only
  382. // the counters associated to interceptors that apply to it are incremented.
  383. try await client.withStream(
  384. descriptor: HelloWorld.Methods.sayHello,
  385. options: .defaults
  386. ) { stream, _ in
  387. try await stream.outbound.write(.metadata([:]))
  388. try await stream.outbound.write(.message(Array("Swift".utf8)))
  389. await stream.outbound.finish()
  390. var responseParts = stream.inbound.makeAsyncIterator()
  391. let metadata = try await responseParts.next()
  392. self.assertMetadata(metadata)
  393. let message = try await responseParts.next()
  394. self.assertMessage(message) {
  395. #expect($0 == Array("Hello, Swift!".utf8))
  396. }
  397. let status = try await responseParts.next()
  398. self.assertStatus(status) { status, _ in
  399. #expect(status.code == .ok, Comment(rawValue: status.description))
  400. }
  401. }
  402. #expect(onlyBinaryEchoCounter.value == 1)
  403. #expect(allServicesCounter.value == 2)
  404. #expect(onlyHelloWorldCounter.value == 1)
  405. #expect(bothServicesCounter.value == 2)
  406. }
  407. }
  408. @Test("Interceptors are applied only to specified methods")
  409. @available(gRPCSwift 2.0, *)
  410. func testInterceptorsAreAppliedToSpecifiedMethods() async throws {
  411. let onlyBinaryEchoGetCounter = AtomicCounter()
  412. let onlyBinaryEchoCollectCounter = AtomicCounter()
  413. let bothBinaryEchoMethodsCounter = AtomicCounter()
  414. let allMethodsCounter = AtomicCounter()
  415. try await self.withInProcessClientConnectedToServer(
  416. services: [BinaryEcho()],
  417. interceptorPipeline: [
  418. .apply(
  419. .requestCounter(onlyBinaryEchoGetCounter),
  420. to: .methods([BinaryEcho.Methods.get])
  421. ),
  422. .apply(.requestCounter(allMethodsCounter), to: .all),
  423. .apply(
  424. .requestCounter(onlyBinaryEchoCollectCounter),
  425. to: .methods([BinaryEcho.Methods.collect])
  426. ),
  427. .apply(
  428. .requestCounter(bothBinaryEchoMethodsCounter),
  429. to: .methods([BinaryEcho.Methods.get, BinaryEcho.Methods.collect])
  430. ),
  431. ]
  432. ) { client, _ in
  433. // Make a request to the `BinaryEcho/get` method and assert that only
  434. // the counters associated to interceptors that apply to it are incremented.
  435. try await client.withStream(
  436. descriptor: BinaryEcho.Methods.get,
  437. options: .defaults
  438. ) { stream, _ in
  439. try await stream.outbound.write(.metadata([:]))
  440. try await stream.outbound.write(.message(Array("hello".utf8)))
  441. await stream.outbound.finish()
  442. var responseParts = stream.inbound.makeAsyncIterator()
  443. let metadata = try await responseParts.next()
  444. self.assertMetadata(metadata)
  445. let message = try await responseParts.next()
  446. self.assertMessage(message) {
  447. #expect($0 == Array("hello".utf8))
  448. }
  449. let status = try await responseParts.next()
  450. self.assertStatus(status) { status, _ in
  451. #expect(status.code == .ok, Comment(rawValue: status.description))
  452. }
  453. }
  454. #expect(onlyBinaryEchoGetCounter.value == 1)
  455. #expect(allMethodsCounter.value == 1)
  456. #expect(onlyBinaryEchoCollectCounter.value == 0)
  457. #expect(bothBinaryEchoMethodsCounter.value == 1)
  458. // Now, make a request to the `BinaryEcho/collect` method and assert that only
  459. // the counters associated to interceptors that apply to it are incremented.
  460. try await client.withStream(
  461. descriptor: BinaryEcho.Methods.collect,
  462. options: .defaults
  463. ) { stream, _ in
  464. try await stream.outbound.write(.metadata([:]))
  465. try await stream.outbound.write(.message(Array("hello".utf8)))
  466. await stream.outbound.finish()
  467. var responseParts = stream.inbound.makeAsyncIterator()
  468. let metadata = try await responseParts.next()
  469. self.assertMetadata(metadata)
  470. let message = try await responseParts.next()
  471. self.assertMessage(message) {
  472. #expect($0 == Array("hello".utf8))
  473. }
  474. let status = try await responseParts.next()
  475. self.assertStatus(status) { status, _ in
  476. #expect(status.code == .ok, Comment(rawValue: status.description))
  477. }
  478. }
  479. #expect(onlyBinaryEchoGetCounter.value == 1)
  480. #expect(allMethodsCounter.value == 2)
  481. #expect(onlyBinaryEchoCollectCounter.value == 1)
  482. #expect(bothBinaryEchoMethodsCounter.value == 2)
  483. }
  484. }
  485. @available(gRPCSwift 2.0, *)
  486. func withInProcessClientConnectedToServer(
  487. services: [any RegistrableRPCService],
  488. interceptorPipeline: [ConditionalInterceptor<any ServerInterceptor>] = [],
  489. _ body: (InProcessTransport.Client, GRPCServer<InProcessTransport.Server>) async throws -> Void
  490. ) async throws {
  491. let inProcess = InProcessTransport()
  492. let server = GRPCServer(
  493. transport: inProcess.server,
  494. services: services,
  495. interceptorPipeline: interceptorPipeline
  496. )
  497. try await withThrowingTaskGroup(of: Void.self) { group in
  498. group.addTask {
  499. try await server.serve()
  500. }
  501. group.addTask {
  502. try await inProcess.client.connect()
  503. }
  504. try await body(inProcess.client, server)
  505. inProcess.client.beginGracefulShutdown()
  506. server.beginGracefulShutdown()
  507. }
  508. }
  509. @available(gRPCSwift 2.0, *)
  510. func assertMetadata<Bytes: GRPCContiguousBytes>(
  511. _ part: RPCResponsePart<Bytes>?,
  512. metadataHandler: (Metadata) -> Void = { _ in }
  513. ) {
  514. switch part {
  515. case .some(.metadata(let metadata)):
  516. metadataHandler(metadata)
  517. default:
  518. Issue.record("Expected '.metadata' but found '\(String(describing: part))'")
  519. }
  520. }
  521. @available(gRPCSwift 2.0, *)
  522. func assertMessage<Bytes: GRPCContiguousBytes>(
  523. _ part: RPCResponsePart<Bytes>?,
  524. messageHandler: (Bytes) -> Void = { _ in }
  525. ) {
  526. switch part {
  527. case .some(.message(let message)):
  528. messageHandler(message)
  529. default:
  530. Issue.record("Expected '.message' but found '\(String(describing: part))'")
  531. }
  532. }
  533. @available(gRPCSwift 2.0, *)
  534. func assertStatus<Bytes: GRPCContiguousBytes>(
  535. _ part: RPCResponsePart<Bytes>?,
  536. statusHandler: (Status, Metadata) -> Void = { _, _ in }
  537. ) {
  538. switch part {
  539. case .some(.status(let status, let metadata)):
  540. statusHandler(status, metadata)
  541. default:
  542. Issue.record("Expected '.status' but found '\(String(describing: part))'")
  543. }
  544. }
  545. }