HealthTests.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCHealth
  17. import GRPCInProcessTransport
  18. import XCTest
  19. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  20. final class HealthTests: XCTestCase {
  21. private func withHealthClient(
  22. _ body: @Sendable (Grpc_Health_V1_HealthClient, Health.Provider) async throws -> Void
  23. ) async throws {
  24. let health = Health()
  25. let inProcess = InProcessTransport.makePair()
  26. let server = GRPCServer(transport: inProcess.server, services: [health.service])
  27. let client = GRPCClient(transport: inProcess.client)
  28. let healthClient = Grpc_Health_V1_HealthClient(wrapping: client)
  29. try await withThrowingDiscardingTaskGroup { group in
  30. group.addTask {
  31. try await server.serve()
  32. }
  33. group.addTask {
  34. try await client.run()
  35. }
  36. do {
  37. try await body(healthClient, health.provider)
  38. } catch {
  39. XCTFail("Unexpected error: \(error)")
  40. }
  41. group.cancelAll()
  42. }
  43. }
  44. func testCheckOnKnownService() async throws {
  45. try await withHealthClient { (healthClient, healthProvider) in
  46. let testServiceDescriptor = ServiceDescriptor.testService
  47. healthProvider.updateStatus(.serving, forService: testServiceDescriptor)
  48. let message = Grpc_Health_V1_HealthCheckRequest.with {
  49. $0.service = testServiceDescriptor.fullyQualifiedService
  50. }
  51. try await healthClient.check(request: ClientRequest.Single(message: message)) { response in
  52. try XCTAssertEqual(response.message.status, .serving)
  53. }
  54. }
  55. }
  56. func testCheckOnUnknownService() async throws {
  57. try await withHealthClient { (healthClient, healthProvider) in
  58. let message = Grpc_Health_V1_HealthCheckRequest.with {
  59. $0.service = "does.not.Exist"
  60. }
  61. try await healthClient.check(request: ClientRequest.Single(message: message)) { response in
  62. try XCTAssertThrowsError(ofType: RPCError.self, response.message) { error in
  63. XCTAssertEqual(error.code, .notFound)
  64. }
  65. }
  66. }
  67. }
  68. func testCheckOnServer() async throws {
  69. try await withHealthClient { (healthClient, healthProvider) in
  70. // An unspecified service refers to the server.
  71. healthProvider.updateStatus(.notServing, forService: "")
  72. let message = Grpc_Health_V1_HealthCheckRequest()
  73. try await healthClient.check(request: ClientRequest.Single(message: message)) { response in
  74. try XCTAssertEqual(response.message.status, .notServing)
  75. }
  76. }
  77. }
  78. func testWatchOnKnownService() async throws {
  79. try await withHealthClient { (healthClient, healthProvider) in
  80. let testServiceDescriptor = ServiceDescriptor.testService
  81. let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving]
  82. // Before watching the service, make the status of the service known to the Health service.
  83. healthProvider.updateStatus(statusesToBeSent[0], forService: testServiceDescriptor)
  84. let message = Grpc_Health_V1_HealthCheckRequest.with {
  85. $0.service = testServiceDescriptor.fullyQualifiedService
  86. }
  87. try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in
  88. var responseStreamIterator = response.messages.makeAsyncIterator()
  89. for i in 0 ..< statusesToBeSent.count {
  90. let next = try await responseStreamIterator.next()
  91. let message = try XCTUnwrap(next)
  92. let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i])
  93. XCTAssertEqual(message.status, expectedStatus)
  94. if i < statusesToBeSent.count - 1 {
  95. healthProvider.updateStatus(statusesToBeSent[i + 1], forService: testServiceDescriptor)
  96. }
  97. }
  98. }
  99. }
  100. }
  101. func testWatchOnUnknownServiceDoesNotTerminateTheRPC() async throws {
  102. try await withHealthClient { (healthClient, healthProvider) in
  103. let testServiceDescriptor = ServiceDescriptor.testService
  104. let message = Grpc_Health_V1_HealthCheckRequest.with {
  105. $0.service = testServiceDescriptor.fullyQualifiedService
  106. }
  107. try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in
  108. var responseStreamIterator = response.messages.makeAsyncIterator()
  109. var next = try await responseStreamIterator.next()
  110. var message = try XCTUnwrap(next)
  111. // As the service was watched before being updated, the first status received should be
  112. // .serviceUnknown.
  113. XCTAssertEqual(message.status, .serviceUnknown)
  114. healthProvider.updateStatus(.notServing, forService: testServiceDescriptor)
  115. next = try await responseStreamIterator.next()
  116. message = try XCTUnwrap(next)
  117. // The RPC was not terminated and a status update was received successfully.
  118. XCTAssertEqual(message.status, .notServing)
  119. }
  120. }
  121. }
  122. func testMultipleWatchOnTheSameService() async throws {
  123. try await withHealthClient { (healthClient, healthProvider) in
  124. let testServiceDescriptor = ServiceDescriptor.testService
  125. let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving]
  126. try await withThrowingTaskGroup(
  127. of: [Grpc_Health_V1_HealthCheckResponse.ServingStatus].self
  128. ) { group in
  129. let message = Grpc_Health_V1_HealthCheckRequest.with {
  130. $0.service = testServiceDescriptor.fullyQualifiedService
  131. }
  132. // The continuation of this stream will be used to signal when the watch response streams
  133. // are up and ready.
  134. let signal = AsyncStream.makeStream(of: Void.self)
  135. let numberOfWatches = 2
  136. for _ in 0 ..< numberOfWatches {
  137. group.addTask {
  138. return try await healthClient.watch(
  139. request: ClientRequest.Single(message: message)
  140. ) { response in
  141. signal.continuation.yield() // Make signal
  142. var statuses = [Grpc_Health_V1_HealthCheckResponse.ServingStatus]()
  143. var responseStreamIterator = response.messages.makeAsyncIterator()
  144. // Since responseStreamIterator.next() will never be nil (ideally, as the response
  145. // stream is always open), the iteration cannot be based on when
  146. // responseStreamIterator.next() is nil. Else, the iteration infinitely awaits and the
  147. // test never finishes. Hence, it is based on the expected number of statuses to be
  148. // received.
  149. for _ in 0 ..< statusesToBeSent.count + 1 {
  150. // As the service will be watched before being updated, the first status received
  151. // should be .serviceUnknown. Hence, the range of this iteration is increased by 1.
  152. let next = try await responseStreamIterator.next()
  153. let message = try XCTUnwrap(next)
  154. statuses.append(message.status)
  155. }
  156. return statuses
  157. }
  158. }
  159. }
  160. // Wait until all the watch streams are up and ready.
  161. for await _ in signal.stream.prefix(numberOfWatches) {}
  162. for status in statusesToBeSent {
  163. healthProvider.updateStatus(status, forService: testServiceDescriptor)
  164. }
  165. for try await receivedStatuses in group {
  166. XCTAssertEqual(receivedStatuses[0], .serviceUnknown)
  167. for i in 0 ..< statusesToBeSent.count {
  168. let sentStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i])
  169. XCTAssertEqual(sentStatus, receivedStatuses[i + 1])
  170. }
  171. }
  172. }
  173. }
  174. }
  175. func testWatchWithUnchangingStatusUpdates() async throws {
  176. try await withHealthClient { (healthClient, healthProvider) in
  177. let testServiceDescriptor = ServiceDescriptor.testService
  178. let statusesToBeSent: [ServingStatus] = [.notServing, .notServing, .notServing, .serving]
  179. // The repeated .notServing updates should be received only once. Also, as the service will
  180. // be watched before being updated, the first status received should be .serviceUnknown.
  181. let expectedStatuses: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [
  182. .serviceUnknown,
  183. .notServing,
  184. .serving,
  185. ]
  186. let message = Grpc_Health_V1_HealthCheckRequest.with {
  187. $0.service = testServiceDescriptor.fullyQualifiedService
  188. }
  189. try await healthClient.watch(
  190. request: ClientRequest.Single(message: message)
  191. ) { response in
  192. // Send all status updates.
  193. for status in statusesToBeSent {
  194. healthProvider.updateStatus(status, forService: testServiceDescriptor)
  195. }
  196. var responseStreamIterator = response.messages.makeAsyncIterator()
  197. for i in 0 ..< expectedStatuses.count {
  198. let next = try await responseStreamIterator.next()
  199. let message = try XCTUnwrap(next)
  200. XCTAssertEqual(message.status, expectedStatuses[i])
  201. }
  202. }
  203. }
  204. }
  205. func testWatchOnServer() async throws {
  206. try await withHealthClient { (healthClient, healthProvider) in
  207. let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving]
  208. // An unspecified service refers to the server.
  209. healthProvider.updateStatus(statusesToBeSent[0], forService: "")
  210. let message = Grpc_Health_V1_HealthCheckRequest()
  211. try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in
  212. var responseStreamIterator = response.messages.makeAsyncIterator()
  213. for i in 0 ..< statusesToBeSent.count {
  214. let next = try await responseStreamIterator.next()
  215. let message = try XCTUnwrap(next)
  216. let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i])
  217. XCTAssertEqual(message.status, expectedStatus)
  218. if i < statusesToBeSent.count - 1 {
  219. healthProvider.updateStatus(statusesToBeSent[i + 1], forService: "")
  220. }
  221. }
  222. }
  223. }
  224. }
  225. }
  226. extension ServiceDescriptor {
  227. fileprivate static let testService = ServiceDescriptor(package: "test", service: "Service")
  228. }