LoadBalancerTest.swift 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCHTTP2Core
  17. import XCTest
  18. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  19. enum LoadBalancerTest {
  20. struct Context {
  21. let servers: [(server: TestServer, address: GRPCHTTP2Core.SocketAddress)]
  22. let loadBalancer: LoadBalancer
  23. }
  24. static func pickFirst(
  25. servers serverCount: Int,
  26. connector: any HTTP2Connector,
  27. backoff: ConnectionBackoff = .defaults,
  28. timeout: Duration = .seconds(10),
  29. function: String = #function,
  30. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  31. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void = { _ in }
  32. ) async throws {
  33. try await Self.run(
  34. servers: serverCount,
  35. timeout: timeout,
  36. function: function,
  37. handleEvent: handleEvent,
  38. verifyEvents: verifyEvents
  39. ) {
  40. let pickFirst = PickFirstLoadBalancer(
  41. connector: connector,
  42. backoff: backoff,
  43. defaultCompression: .none,
  44. enabledCompression: .none
  45. )
  46. return .pickFirst(pickFirst)
  47. }
  48. }
  49. static func roundRobin(
  50. servers serverCount: Int,
  51. connector: any HTTP2Connector,
  52. backoff: ConnectionBackoff = .defaults,
  53. timeout: Duration = .seconds(10),
  54. function: String = #function,
  55. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  56. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void = { _ in }
  57. ) async throws {
  58. try await Self.run(
  59. servers: serverCount,
  60. timeout: timeout,
  61. function: function,
  62. handleEvent: handleEvent,
  63. verifyEvents: verifyEvents
  64. ) {
  65. let roundRobin = RoundRobinLoadBalancer(
  66. connector: connector,
  67. backoff: backoff,
  68. defaultCompression: .none,
  69. enabledCompression: .none
  70. )
  71. return .roundRobin(roundRobin)
  72. }
  73. }
  74. private static func run(
  75. servers serverCount: Int,
  76. timeout: Duration,
  77. function: String,
  78. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  79. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void = { _ in },
  80. makeLoadBalancer: @escaping @Sendable () -> LoadBalancer
  81. ) async throws {
  82. enum TestEvent {
  83. case timedOut
  84. case completed(Result<Void, any Error>)
  85. }
  86. try await withThrowingTaskGroup(of: TestEvent.self) { group in
  87. group.addTask {
  88. try? await Task.sleep(for: timeout)
  89. return .timedOut
  90. }
  91. group.addTask {
  92. do {
  93. try await Self._run(
  94. servers: serverCount,
  95. handleEvent: handleEvent,
  96. verifyEvents: verifyEvents,
  97. makeLoadBalancer: makeLoadBalancer
  98. )
  99. return .completed(.success(()))
  100. } catch {
  101. return .completed(.failure(error))
  102. }
  103. }
  104. let result = try await group.next()!
  105. group.cancelAll()
  106. switch result {
  107. case .timedOut:
  108. XCTFail("'\(function)' timed out after \(timeout)")
  109. case .completed(let result):
  110. try result.get()
  111. }
  112. }
  113. }
  114. private static func _run(
  115. servers serverCount: Int,
  116. handleEvent: @escaping @Sendable (Context, LoadBalancerEvent) async throws -> Void,
  117. verifyEvents: @escaping @Sendable ([LoadBalancerEvent]) -> Void,
  118. makeLoadBalancer: @escaping @Sendable () -> LoadBalancer
  119. ) async throws {
  120. try await withThrowingTaskGroup(of: Void.self) { group in
  121. // Create the test servers.
  122. var servers = [(server: TestServer, address: GRPCHTTP2Core.SocketAddress)]()
  123. for _ in 0 ..< serverCount {
  124. let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
  125. let address = try await server.bind()
  126. servers.append((server, address))
  127. group.addTask {
  128. try await server.run { _, _ in
  129. XCTFail("Unexpected stream")
  130. }
  131. }
  132. }
  133. // Create the load balancer.
  134. let loadBalancer = makeLoadBalancer()
  135. group.addTask {
  136. await loadBalancer.run()
  137. }
  138. let context = Context(servers: servers, loadBalancer: loadBalancer)
  139. var events = [LoadBalancerEvent]()
  140. for await event in loadBalancer.events {
  141. events.append(event)
  142. try await handleEvent(context, event)
  143. }
  144. verifyEvents(events)
  145. group.cancelAll()
  146. }
  147. }
  148. }
  149. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  150. extension LoadBalancerTest.Context {
  151. var roundRobin: RoundRobinLoadBalancer? {
  152. switch self.loadBalancer {
  153. case .roundRobin(let loadBalancer):
  154. return loadBalancer
  155. case .pickFirst:
  156. return nil
  157. }
  158. }
  159. var pickFirst: PickFirstLoadBalancer? {
  160. switch self.loadBalancer {
  161. case .roundRobin:
  162. return nil
  163. case .pickFirst(let loadBalancer):
  164. return loadBalancer
  165. }
  166. }
  167. }