SubchannelTests.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. @_spi(Package) @testable import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOHTTP2
  20. import NIOPosix
  21. import XCTest
  22. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  23. final class SubchannelTests: XCTestCase {
  24. func testMakeStreamOnIdleSubchannel() async throws {
  25. let subchannel = self.makeSubchannel(
  26. address: .unixDomainSocket(path: "ignored"),
  27. connector: .never
  28. )
  29. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  30. try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  31. } errorHandler: { error in
  32. XCTAssertEqual(error.code, .unavailable)
  33. }
  34. subchannel.shutDown()
  35. }
  36. func testMakeStreamOnShutdownSubchannel() async throws {
  37. #if compiler(<5.9)
  38. throw XCTSkip("Occasionally crashes due to a Swift 5.8 concurrency runtime bug")
  39. #endif
  40. let subchannel = self.makeSubchannel(
  41. address: .unixDomainSocket(path: "ignored"),
  42. connector: .never
  43. )
  44. subchannel.shutDown()
  45. await subchannel.run()
  46. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  47. try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  48. } errorHandler: { error in
  49. XCTAssertEqual(error.code, .unavailable)
  50. }
  51. }
  52. func testMakeStreamOnReadySubchannel() async throws {
  53. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  54. let address = try await server.bind()
  55. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  56. try await withThrowingTaskGroup(of: Void.self) { group in
  57. group.addTask {
  58. try await server.run { inbound, outbound in
  59. for try await part in inbound {
  60. switch part {
  61. case .metadata:
  62. try await outbound.write(.metadata([:]))
  63. case .message(let message):
  64. try await outbound.write(.message(message))
  65. }
  66. }
  67. try await outbound.write(.status(Status(code: .ok, message: ""), [:]))
  68. }
  69. }
  70. group.addTask {
  71. await subchannel.run()
  72. }
  73. subchannel.connect()
  74. for await event in subchannel.events {
  75. switch event {
  76. case .connectivityStateChanged(.ready):
  77. let stream = try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  78. try await stream.execute { inbound, outbound in
  79. try await outbound.write(.metadata([:]))
  80. try await outbound.write(.message([0, 1, 2]))
  81. outbound.finish()
  82. for try await part in inbound {
  83. switch part {
  84. case .metadata:
  85. () // Don't validate, contains http/2 specific metadata too.
  86. case .message(let message):
  87. XCTAssertEqual(message, [0, 1, 2])
  88. case .status(let status, _):
  89. XCTAssertEqual(status.code, .ok)
  90. XCTAssertEqual(status.message, "")
  91. }
  92. }
  93. }
  94. subchannel.shutDown()
  95. default:
  96. ()
  97. }
  98. }
  99. group.cancelAll()
  100. }
  101. }
  102. func testConnectEventuallySucceeds() async throws {
  103. let path = "test-connect-eventually-succeeds"
  104. let subchannel = self.makeSubchannel(
  105. address: .unixDomainSocket(path: path),
  106. connector: .posix(),
  107. backoff: .fixed(at: .milliseconds(10))
  108. )
  109. await withThrowingTaskGroup(of: Void.self) { group in
  110. group.addTask { await subchannel.run() }
  111. var hasServer = false
  112. var events = [Subchannel.Event]()
  113. for await event in subchannel.events {
  114. events.append(event)
  115. switch event {
  116. case .connectivityStateChanged(.idle):
  117. subchannel.connect()
  118. case .connectivityStateChanged(.transientFailure):
  119. // Don't start more than one server.
  120. if hasServer { continue }
  121. hasServer = true
  122. group.addTask {
  123. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  124. _ = try await server.bind(to: .uds(path))
  125. try await server.run { _, _ in
  126. XCTFail("Unexpected stream")
  127. }
  128. }
  129. case .connectivityStateChanged(.ready):
  130. subchannel.shutDown()
  131. case .connectivityStateChanged(.shutdown):
  132. group.cancelAll()
  133. default:
  134. ()
  135. }
  136. }
  137. // First four events are known:
  138. XCTAssertEqual(
  139. Array(events.prefix(4)),
  140. [
  141. .connectivityStateChanged(.idle),
  142. .connectivityStateChanged(.connecting),
  143. .connectivityStateChanged(.transientFailure),
  144. .connectivityStateChanged(.connecting),
  145. ]
  146. )
  147. // Because there is backoff timing involved, the subchannel may flip from transient failure
  148. // to connecting multiple times. Just check that it eventually becomes ready and is then
  149. // shutdown.
  150. XCTAssertEqual(
  151. Array(events.suffix(2)),
  152. [
  153. .connectivityStateChanged(.ready),
  154. .connectivityStateChanged(.shutdown),
  155. ]
  156. )
  157. }
  158. }
  159. func testConnectIteratesThroughAddresses() async throws {
  160. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  161. let address = try await server.bind()
  162. let subchannel = self.makeSubchannel(
  163. addresses: [
  164. .unixDomainSocket(path: "not-listening-1"),
  165. .unixDomainSocket(path: "not-listening-2"),
  166. address,
  167. ],
  168. connector: .posix()
  169. )
  170. await withThrowingTaskGroup(of: Void.self) { group in
  171. group.addTask {
  172. try await server.run { _, _ in
  173. XCTFail("Unexpected stream")
  174. }
  175. }
  176. group.addTask {
  177. await subchannel.run()
  178. }
  179. for await event in subchannel.events {
  180. switch event {
  181. case .connectivityStateChanged(.idle):
  182. subchannel.connect()
  183. case .connectivityStateChanged(.ready):
  184. subchannel.shutDown()
  185. case .connectivityStateChanged(.shutdown):
  186. group.cancelAll()
  187. default:
  188. ()
  189. }
  190. }
  191. }
  192. }
  193. func testConnectIteratesThroughAddressesWithBackoff() async throws {
  194. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  195. let udsPath = "test-wrap-around-addrs"
  196. let subchannel = self.makeSubchannel(
  197. addresses: [
  198. .unixDomainSocket(path: "not-listening-1"),
  199. .unixDomainSocket(path: "not-listening-2"),
  200. .unixDomainSocket(path: udsPath),
  201. ],
  202. connector: .posix(),
  203. backoff: .fixed(at: .zero) // Skip the backoff period
  204. )
  205. try await withThrowingTaskGroup(of: Void.self) { group in
  206. group.addTask {
  207. await subchannel.run()
  208. }
  209. var isServerRunning = false
  210. for await event in subchannel.events {
  211. switch event {
  212. case .connectivityStateChanged(.idle):
  213. subchannel.connect()
  214. case .connectivityStateChanged(.transientFailure):
  215. // The subchannel enters the transient failure state when all addresses have been tried.
  216. // Bind the server now so that the next attempts succeeds.
  217. if isServerRunning { break }
  218. isServerRunning = true
  219. let address = try await server.bind(to: .uds(udsPath))
  220. XCTAssertEqual(address, .unixDomainSocket(path: udsPath))
  221. group.addTask {
  222. try await server.run { _, _ in
  223. XCTFail("Unexpected stream")
  224. }
  225. }
  226. case .connectivityStateChanged(.ready):
  227. subchannel.shutDown()
  228. case .connectivityStateChanged(.shutdown):
  229. group.cancelAll()
  230. default:
  231. ()
  232. }
  233. }
  234. }
  235. }
  236. func testIdleTimeout() async throws {
  237. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  238. let address = try await server.bind()
  239. let subchannel = self.makeSubchannel(
  240. address: address,
  241. connector: .posix(maxIdleTime: .milliseconds(1)) // Aggressively idle
  242. )
  243. await withThrowingTaskGroup(of: Void.self) { group in
  244. group.addTask {
  245. await subchannel.run()
  246. }
  247. group.addTask {
  248. try await server.run { _, _ in
  249. XCTFail("Unexpected stream")
  250. }
  251. }
  252. var idleCount = 0
  253. var events = [Subchannel.Event]()
  254. for await event in subchannel.events {
  255. events.append(event)
  256. switch event {
  257. case .connectivityStateChanged(.idle):
  258. idleCount += 1
  259. if idleCount == 1 {
  260. subchannel.connect()
  261. } else {
  262. subchannel.shutDown()
  263. }
  264. case .connectivityStateChanged(.shutdown):
  265. group.cancelAll()
  266. default:
  267. ()
  268. }
  269. }
  270. let expected: [Subchannel.Event] = [
  271. .connectivityStateChanged(.idle),
  272. .connectivityStateChanged(.connecting),
  273. .connectivityStateChanged(.ready),
  274. .connectivityStateChanged(.idle),
  275. .connectivityStateChanged(.shutdown),
  276. ]
  277. XCTAssertEqual(events, expected)
  278. }
  279. }
  280. func testConnectionDropWhenIdle() async throws {
  281. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  282. let address = try await server.bind()
  283. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  284. await withThrowingTaskGroup(of: Void.self) { group in
  285. group.addTask {
  286. await subchannel.run()
  287. }
  288. group.addTask {
  289. try await server.run { _, _ in
  290. XCTFail("Unexpected RPC")
  291. }
  292. }
  293. var events = [Subchannel.Event]()
  294. var idleCount = 0
  295. for await event in subchannel.events {
  296. events.append(event)
  297. switch event {
  298. case .connectivityStateChanged(.idle):
  299. idleCount += 1
  300. switch idleCount {
  301. case 1:
  302. subchannel.connect()
  303. case 2:
  304. subchannel.shutDown()
  305. default:
  306. XCTFail("Unexpected idle")
  307. }
  308. case .connectivityStateChanged(.ready):
  309. // Close the connection without a GOAWAY.
  310. server.clients.first?.close(mode: .all, promise: nil)
  311. case .connectivityStateChanged(.shutdown):
  312. group.cancelAll()
  313. default:
  314. ()
  315. }
  316. }
  317. let expected: [Subchannel.Event] = [
  318. .connectivityStateChanged(.idle),
  319. .connectivityStateChanged(.connecting),
  320. .connectivityStateChanged(.ready),
  321. .connectivityStateChanged(.idle),
  322. .connectivityStateChanged(.shutdown),
  323. ]
  324. XCTAssertEqual(events, expected)
  325. }
  326. }
  327. func testConnectionDropWithOpenStreams() async throws {
  328. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  329. let address = try await server.bind()
  330. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  331. try await withThrowingTaskGroup(of: Void.self) { group in
  332. group.addTask {
  333. await subchannel.run()
  334. }
  335. group.addTask {
  336. try await server.run(.echo)
  337. }
  338. var events = [Subchannel.Event]()
  339. var readyCount = 0
  340. for await event in subchannel.events {
  341. events.append(event)
  342. switch event {
  343. case .connectivityStateChanged(.idle):
  344. subchannel.connect()
  345. case .connectivityStateChanged(.ready):
  346. readyCount += 1
  347. // When the connection becomes ready the first time, open a stream and forcibly close the
  348. // channel. This will result in an automatic reconnect. Close the subchannel when that
  349. // happens.
  350. if readyCount == 1 {
  351. let stream = try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  352. try await stream.execute { inbound, outbound in
  353. try await outbound.write(.metadata([:]))
  354. // Wait for the metadata to be echo'd back.
  355. var iterator = inbound.makeAsyncIterator()
  356. let _ = try await iterator.next()
  357. // Stream is definitely open. Bork the connection.
  358. server.clients.first?.close(mode: .all, promise: nil)
  359. // Wait for the next message which won't arrive, client won't send a message. The
  360. // stream should fail
  361. let _ = try await iterator.next()
  362. }
  363. } else if readyCount == 2 {
  364. subchannel.shutDown()
  365. }
  366. case .connectivityStateChanged(.shutdown):
  367. group.cancelAll()
  368. default:
  369. ()
  370. }
  371. }
  372. let expected: [Subchannel.Event] = [
  373. .connectivityStateChanged(.idle),
  374. .connectivityStateChanged(.connecting),
  375. .connectivityStateChanged(.ready),
  376. .connectivityStateChanged(.transientFailure),
  377. .requiresNameResolution,
  378. .connectivityStateChanged(.connecting),
  379. .connectivityStateChanged(.ready),
  380. .connectivityStateChanged(.shutdown),
  381. ]
  382. XCTAssertEqual(events, expected)
  383. }
  384. }
  385. func testConnectedReceivesGoAway() async throws {
  386. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  387. let address = try await server.bind()
  388. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  389. try await withThrowingTaskGroup(of: Void.self) { group in
  390. group.addTask {
  391. try await server.run { _, _ in
  392. XCTFail("Unexpected stream")
  393. }
  394. }
  395. group.addTask {
  396. await subchannel.run()
  397. }
  398. var events = [Subchannel.Event]()
  399. var idleCount = 0
  400. for await event in subchannel.events {
  401. events.append(event)
  402. switch event {
  403. case .connectivityStateChanged(.idle):
  404. idleCount += 1
  405. if idleCount == 1 {
  406. subchannel.connect()
  407. } else if idleCount == 2 {
  408. subchannel.shutDown()
  409. }
  410. case .connectivityStateChanged(.ready):
  411. // Now the subchannel is ready, send a GOAWAY from the server.
  412. let channel = try XCTUnwrap(server.clients.first)
  413. let goAway = HTTP2Frame(
  414. streamID: .rootStream,
  415. payload: .goAway(lastStreamID: 0, errorCode: .cancel, opaqueData: nil)
  416. )
  417. try await channel.writeAndFlush(goAway)
  418. case .connectivityStateChanged(.shutdown):
  419. group.cancelAll()
  420. default:
  421. ()
  422. }
  423. }
  424. let expectedEvents: [Subchannel.Event] = [
  425. // Normal connect flow.
  426. .connectivityStateChanged(.idle),
  427. .connectivityStateChanged(.connecting),
  428. .connectivityStateChanged(.ready),
  429. // GOAWAY triggers name resolution and idling.
  430. .goingAway,
  431. .requiresNameResolution,
  432. .connectivityStateChanged(.idle),
  433. // The second idle triggers a close.
  434. .connectivityStateChanged(.shutdown),
  435. ]
  436. XCTAssertEqual(expectedEvents, events)
  437. }
  438. }
  439. func testCancelReadySubchannel() async throws {
  440. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  441. let address = try await server.bind()
  442. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  443. await withThrowingTaskGroup(of: Void.self) { group in
  444. group.addTask {
  445. try await server.run { _, _ in
  446. XCTFail("Unexpected stream")
  447. }
  448. }
  449. group.addTask {
  450. subchannel.connect()
  451. await subchannel.run()
  452. }
  453. for await event in subchannel.events {
  454. switch event {
  455. case .connectivityStateChanged(.ready):
  456. group.cancelAll()
  457. default:
  458. ()
  459. }
  460. }
  461. }
  462. }
  463. private func makeSubchannel(
  464. addresses: [GRPCHTTP2Core.SocketAddress],
  465. connector: any HTTP2Connector,
  466. backoff: ConnectionBackoff? = nil
  467. ) -> Subchannel {
  468. return Subchannel(
  469. endpoint: Endpoint(addresses: addresses),
  470. id: SubchannelID(),
  471. connector: connector,
  472. backoff: backoff ?? .defaults,
  473. defaultCompression: .none,
  474. enabledCompression: .none
  475. )
  476. }
  477. private func makeSubchannel(
  478. address: GRPCHTTP2Core.SocketAddress,
  479. connector: any HTTP2Connector,
  480. backoff: ConnectionBackoff? = nil
  481. ) -> Subchannel {
  482. self.makeSubchannel(addresses: [address], connector: connector, backoff: backoff)
  483. }
  484. }
  485. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  486. extension ConnectionBackoff {
  487. static func fixed(at interval: Duration, jitter: Double = 0.0) -> Self {
  488. return Self(initial: interval, max: interval, multiplier: 1.0, jitter: jitter)
  489. }
  490. static var defaults: Self {
  491. ConnectionBackoff(initial: .seconds(10), max: .seconds(120), multiplier: 1.6, jitter: 1.2)
  492. }
  493. }