SubchannelTests.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. @_spi(Package) @testable import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOHTTP2
  20. import NIOPosix
  21. import XCTest
  22. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  23. final class SubchannelTests: XCTestCase {
  24. func testMakeStreamOnIdleSubchannel() async throws {
  25. let subchannel = self.makeSubchannel(
  26. address: .unixDomainSocket(path: "ignored"),
  27. connector: .never
  28. )
  29. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  30. try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  31. } errorHandler: { error in
  32. XCTAssertEqual(error.code, .unavailable)
  33. }
  34. subchannel.close()
  35. }
  36. func testMakeStreamOnShutdownSubchannel() async throws {
  37. #if compiler(<5.9)
  38. throw XCTSkip("Occasionally crashes due to a Swift 5.8 concurrency runtime bug")
  39. #endif
  40. let subchannel = self.makeSubchannel(
  41. address: .unixDomainSocket(path: "ignored"),
  42. connector: .never
  43. )
  44. subchannel.close()
  45. await subchannel.run()
  46. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  47. try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  48. } errorHandler: { error in
  49. XCTAssertEqual(error.code, .unavailable)
  50. }
  51. }
  52. func testMakeStreamOnReadySubchannel() async throws {
  53. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  54. let address = try await server.bind()
  55. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  56. try await withThrowingTaskGroup(of: Void.self) { group in
  57. group.addTask {
  58. try await server.run { inbound, outbound in
  59. for try await part in inbound {
  60. switch part {
  61. case .metadata:
  62. try await outbound.write(.metadata([:]))
  63. case .message(let message):
  64. try await outbound.write(.message(message))
  65. }
  66. }
  67. try await outbound.write(.status(Status(code: .ok, message: ""), [:]))
  68. }
  69. }
  70. group.addTask {
  71. await subchannel.run()
  72. }
  73. subchannel.connect()
  74. for await event in subchannel.events {
  75. switch event {
  76. case .connectivityStateChanged(.ready):
  77. let stream = try await subchannel.makeStream(descriptor: .echoGet, options: .defaults)
  78. try await stream.execute { inbound, outbound in
  79. try await outbound.write(.metadata([:]))
  80. try await outbound.write(.message([0, 1, 2]))
  81. outbound.finish()
  82. for try await part in inbound {
  83. switch part {
  84. case .metadata:
  85. () // Don't validate, contains http/2 specific metadata too.
  86. case .message(let message):
  87. XCTAssertEqual(message, [0, 1, 2])
  88. case .status(let status, _):
  89. XCTAssertEqual(status.code, .ok)
  90. XCTAssertEqual(status.message, "")
  91. }
  92. }
  93. }
  94. subchannel.close()
  95. default:
  96. ()
  97. }
  98. }
  99. group.cancelAll()
  100. }
  101. }
  102. func testConnectEventuallySucceeds() async throws {
  103. let path = "test-connect-eventually-succeeds"
  104. let subchannel = self.makeSubchannel(
  105. address: .unixDomainSocket(path: path),
  106. connector: .posix(),
  107. backoff: .fixed(at: .milliseconds(100))
  108. )
  109. await withThrowingTaskGroup(of: Void.self) { group in
  110. group.addTask { await subchannel.run() }
  111. var hasServer = false
  112. var events = [Subchannel.Event]()
  113. for await event in subchannel.events {
  114. events.append(event)
  115. switch event {
  116. case .connectivityStateChanged(.idle):
  117. subchannel.connect()
  118. case .connectivityStateChanged(.transientFailure):
  119. // Don't start more than one server.
  120. if hasServer { continue }
  121. hasServer = true
  122. group.addTask {
  123. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  124. _ = try await server.bind(to: .uds(path))
  125. try await server.run { _, _ in
  126. XCTFail("Unexpected stream")
  127. }
  128. }
  129. case .connectivityStateChanged(.ready):
  130. subchannel.close()
  131. case .connectivityStateChanged(.shutdown):
  132. group.cancelAll()
  133. default:
  134. ()
  135. }
  136. }
  137. // First four events are known:
  138. XCTAssertEqual(
  139. Array(events.prefix(4)),
  140. [
  141. .connectivityStateChanged(.idle),
  142. .connectivityStateChanged(.connecting),
  143. .connectivityStateChanged(.transientFailure),
  144. .connectivityStateChanged(.connecting),
  145. ]
  146. )
  147. // Because there is backoff timing involved, the subchannel may flip from transient failure
  148. // to connecting multiple times. Just check that it eventually becomes ready and is then
  149. // shutdown.
  150. XCTAssertEqual(
  151. Array(events.suffix(2)),
  152. [
  153. .connectivityStateChanged(.ready),
  154. .connectivityStateChanged(.shutdown),
  155. ]
  156. )
  157. }
  158. }
  159. func testConnectIteratesThroughAddresses() async throws {
  160. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  161. let address = try await server.bind()
  162. let subchannel = self.makeSubchannel(
  163. addresses: [
  164. .unixDomainSocket(path: "not-listening-1"),
  165. .unixDomainSocket(path: "not-listening-2"),
  166. address,
  167. ],
  168. connector: .posix()
  169. )
  170. await withThrowingTaskGroup(of: Void.self) { group in
  171. group.addTask {
  172. try await server.run { _, _ in
  173. XCTFail("Unexpected stream")
  174. }
  175. }
  176. group.addTask {
  177. await subchannel.run()
  178. }
  179. for await event in subchannel.events {
  180. switch event {
  181. case .connectivityStateChanged(.idle):
  182. subchannel.connect()
  183. case .connectivityStateChanged(.ready):
  184. subchannel.close()
  185. case .connectivityStateChanged(.shutdown):
  186. group.cancelAll()
  187. default:
  188. ()
  189. }
  190. }
  191. }
  192. }
  193. func testConnectIteratesThroughAddressesWithBackoff() async throws {
  194. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  195. let udsPath = "test-wrap-around-addrs"
  196. let subchannel = self.makeSubchannel(
  197. addresses: [
  198. .unixDomainSocket(path: "not-listening-1"),
  199. .unixDomainSocket(path: "not-listening-2"),
  200. .unixDomainSocket(path: udsPath),
  201. ],
  202. connector: .posix(),
  203. backoff: .fixed(at: .zero) // Skip the backoff period
  204. )
  205. try await withThrowingTaskGroup(of: Void.self) { group in
  206. group.addTask {
  207. await subchannel.run()
  208. }
  209. var isServerRunning = false
  210. for await event in subchannel.events {
  211. switch event {
  212. case .connectivityStateChanged(.idle):
  213. subchannel.connect()
  214. case .connectivityStateChanged(.transientFailure):
  215. // The subchannel enters the transient failure state when all addresses have been tried.
  216. // Bind the server now so that the next attempts succeeds.
  217. if isServerRunning { break }
  218. isServerRunning = true
  219. let address = try await server.bind(to: .uds(udsPath))
  220. XCTAssertEqual(address, .unixDomainSocket(path: udsPath))
  221. group.addTask {
  222. try await server.run { _, _ in
  223. XCTFail("Unexpected stream")
  224. }
  225. }
  226. case .connectivityStateChanged(.ready):
  227. subchannel.close()
  228. case .connectivityStateChanged(.shutdown):
  229. group.cancelAll()
  230. default:
  231. ()
  232. }
  233. }
  234. }
  235. }
  236. func testConnectedReceivesGoAway() async throws {
  237. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  238. let address = try await server.bind()
  239. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  240. try await withThrowingTaskGroup(of: Void.self) { group in
  241. group.addTask {
  242. try await server.run { _, _ in
  243. XCTFail("Unexpected stream")
  244. }
  245. }
  246. group.addTask {
  247. await subchannel.run()
  248. }
  249. var events = [Subchannel.Event]()
  250. for await event in subchannel.events {
  251. events.append(event)
  252. switch event {
  253. case .connectivityStateChanged(.idle):
  254. subchannel.connect()
  255. case .connectivityStateChanged(.ready):
  256. // Now the subchannel is ready, send a GOAWAY from the server.
  257. let channel = try XCTUnwrap(server.clients.first)
  258. let goAway = HTTP2Frame(
  259. streamID: .rootStream,
  260. payload: .goAway(lastStreamID: 0, errorCode: .cancel, opaqueData: nil)
  261. )
  262. try await channel.writeAndFlush(goAway)
  263. case .connectivityStateChanged(.shutdown):
  264. group.cancelAll()
  265. default:
  266. ()
  267. }
  268. }
  269. let expectedEvents: [Subchannel.Event] = [
  270. // Normal connect flow.
  271. .connectivityStateChanged(.idle),
  272. .connectivityStateChanged(.connecting),
  273. .connectivityStateChanged(.ready),
  274. // GOAWAY triggers name resolution too.
  275. .goingAway,
  276. .requiresNameResolution,
  277. // Finally, shutdown.
  278. .connectivityStateChanged(.shutdown),
  279. ]
  280. XCTAssertEqual(expectedEvents, events)
  281. }
  282. }
  283. func testCancelReadySubchannel() async throws {
  284. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  285. let address = try await server.bind()
  286. let subchannel = self.makeSubchannel(address: address, connector: .posix())
  287. await withThrowingTaskGroup(of: Void.self) { group in
  288. group.addTask {
  289. try await server.run { _, _ in
  290. XCTFail("Unexpected stream")
  291. }
  292. }
  293. group.addTask {
  294. subchannel.connect()
  295. await subchannel.run()
  296. }
  297. for await event in subchannel.events {
  298. switch event {
  299. case .connectivityStateChanged(.ready):
  300. group.cancelAll()
  301. default:
  302. ()
  303. }
  304. }
  305. }
  306. }
  307. private func makeSubchannel(
  308. addresses: [GRPCHTTP2Core.SocketAddress],
  309. connector: any HTTP2Connector,
  310. backoff: ConnectionBackoff? = nil
  311. ) -> Subchannel {
  312. return Subchannel(
  313. endpoint: Endpoint(addresses: addresses),
  314. id: SubchannelID(),
  315. connector: connector,
  316. backoff: backoff ?? .defaults,
  317. defaultCompression: .none,
  318. enabledCompression: .none
  319. )
  320. }
  321. private func makeSubchannel(
  322. address: GRPCHTTP2Core.SocketAddress,
  323. connector: any HTTP2Connector,
  324. backoff: ConnectionBackoff? = nil
  325. ) -> Subchannel {
  326. self.makeSubchannel(addresses: [address], connector: connector, backoff: backoff)
  327. }
  328. }
  329. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  330. extension ConnectionBackoff {
  331. static func fixed(at interval: Duration, jitter: Double = 0.0) -> Self {
  332. return Self(initial: interval, max: interval, multiplier: 1.0, jitter: jitter)
  333. }
  334. static var defaults: Self {
  335. ConnectionBackoff(initial: .seconds(10), max: .seconds(120), multiplier: 1.6, jitter: 1.2)
  336. }
  337. }