LoadBalancerTest.swift 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @_spi(Package) @testable import GRPCHTTP2Core
  17. import XCTest
  18. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  19. enum LoadBalancerTest {
  20. struct Context {
  21. let servers: [(server: TestServer, address: GRPCHTTP2Core.SocketAddress)]
  22. let loadBalancer: LoadBalancer
  23. }
  24. static func roundRobin(
  25. servers serverCount: Int,
  26. connector: any HTTP2Connector,
  27. backoff: ConnectionBackoff = .defaults,
  28. timeout: Duration = .seconds(10),
  29. function: String = #function,
  30. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  31. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void = { _ in }
  32. ) async throws {
  33. try await Self.run(
  34. servers: serverCount,
  35. timeout: timeout,
  36. function: function,
  37. handleEvent: handleEvent,
  38. verifyEvents: verifyEvents
  39. ) {
  40. let roundRobin = RoundRobinLoadBalancer(
  41. connector: connector,
  42. backoff: backoff,
  43. defaultCompression: .none,
  44. enabledCompression: .none
  45. )
  46. return .roundRobin(roundRobin)
  47. }
  48. }
  49. private static func run(
  50. servers serverCount: Int,
  51. timeout: Duration,
  52. function: String,
  53. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  54. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void = { _ in },
  55. makeLoadBalancer: @escaping @Sendable () -> LoadBalancer
  56. ) async throws {
  57. enum TestEvent {
  58. case timedOut
  59. case completed(Result<Void, Error>)
  60. }
  61. try await withThrowingTaskGroup(of: TestEvent.self) { group in
  62. group.addTask {
  63. try? await Task.sleep(for: timeout)
  64. return .timedOut
  65. }
  66. group.addTask {
  67. do {
  68. try await Self._run(
  69. servers: serverCount,
  70. handleEvent: handleEvent,
  71. verifyEvents: verifyEvents,
  72. makeLoadBalancer: makeLoadBalancer
  73. )
  74. return .completed(.success(()))
  75. } catch {
  76. return .completed(.failure(error))
  77. }
  78. }
  79. let result = try await group.next()!
  80. group.cancelAll()
  81. switch result {
  82. case .timedOut:
  83. XCTFail("'\(function)' timed out after \(timeout)")
  84. case .completed(let result):
  85. try result.get()
  86. }
  87. }
  88. }
  89. private static func _run(
  90. servers serverCount: Int,
  91. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  92. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void,
  93. makeLoadBalancer: @escaping @Sendable () -> LoadBalancer
  94. ) async throws {
  95. try await withThrowingTaskGroup(of: Void.self) { group in
  96. // Create the test servers.
  97. var servers = [(server: TestServer, address: GRPCHTTP2Core.SocketAddress)]()
  98. for _ in 0 ..< serverCount {
  99. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  100. let address = try await server.bind()
  101. servers.append((server, address))
  102. group.addTask {
  103. try await server.run { _, _ in
  104. XCTFail("Unexpected stream")
  105. }
  106. }
  107. }
  108. // Create the load balancer.
  109. let loadBalancer = makeLoadBalancer()
  110. group.addTask {
  111. await loadBalancer.run()
  112. }
  113. let context = Context(servers: servers, loadBalancer: loadBalancer)
  114. var events = [LoadBalancerEvent]()
  115. for await event in loadBalancer.events {
  116. events.append(event)
  117. try await handleEvent(context, event)
  118. }
  119. verifyEvents(events)
  120. group.cancelAll()
  121. }
  122. }
  123. }
  124. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  125. extension LoadBalancerTest.Context {
  126. var roundRobin: RoundRobinLoadBalancer? {
  127. switch self.loadBalancer {
  128. case .roundRobin(let loadBalancer):
  129. return loadBalancer
  130. }
  131. }
  132. }