| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import GRPCHealth
- import GRPCInProcessTransport
- import XCTest
- @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
- final class HealthTests: XCTestCase {
- private func withHealthClient(
- _ body: @Sendable (Grpc_Health_V1_HealthClient, Health.Provider) async throws -> Void
- ) async throws {
- let health = Health()
- let inProcess = InProcessTransport.makePair()
- let server = GRPCServer(transport: inProcess.server, services: [health.service])
- let client = GRPCClient(transport: inProcess.client)
- let healthClient = Grpc_Health_V1_HealthClient(wrapping: client)
- try await withThrowingDiscardingTaskGroup { group in
- group.addTask {
- try await server.serve()
- }
- group.addTask {
- try await client.run()
- }
- do {
- try await body(healthClient, health.provider)
- } catch {
- XCTFail("Unexpected error: \(error)")
- }
- group.cancelAll()
- }
- }
- func testCheckOnKnownService() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let testServiceDescriptor = ServiceDescriptor.testService
- healthProvider.updateStatus(.serving, forService: testServiceDescriptor)
- let message = Grpc_Health_V1_HealthCheckRequest.with {
- $0.service = testServiceDescriptor.fullyQualifiedService
- }
- try await healthClient.check(request: ClientRequest.Single(message: message)) { response in
- try XCTAssertEqual(response.message.status, .serving)
- }
- }
- }
- func testCheckOnUnknownService() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let message = Grpc_Health_V1_HealthCheckRequest.with {
- $0.service = "does.not.Exist"
- }
- try await healthClient.check(request: ClientRequest.Single(message: message)) { response in
- try XCTAssertThrowsError(ofType: RPCError.self, response.message) { error in
- XCTAssertEqual(error.code, .notFound)
- }
- }
- }
- }
- func testCheckOnServer() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- // An unspecified service refers to the server.
- healthProvider.updateStatus(.notServing, forService: "")
- let message = Grpc_Health_V1_HealthCheckRequest()
- try await healthClient.check(request: ClientRequest.Single(message: message)) { response in
- try XCTAssertEqual(response.message.status, .notServing)
- }
- }
- }
- func testWatchOnKnownService() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let testServiceDescriptor = ServiceDescriptor.testService
- let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving]
- // Before watching the service, make the status of the service known to the Health service.
- healthProvider.updateStatus(statusesToBeSent[0], forService: testServiceDescriptor)
- let message = Grpc_Health_V1_HealthCheckRequest.with {
- $0.service = testServiceDescriptor.fullyQualifiedService
- }
- try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in
- var responseStreamIterator = response.messages.makeAsyncIterator()
- for i in 0 ..< statusesToBeSent.count {
- let next = try await responseStreamIterator.next()
- let message = try XCTUnwrap(next)
- let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i])
- XCTAssertEqual(message.status, expectedStatus)
- if i < statusesToBeSent.count - 1 {
- healthProvider.updateStatus(statusesToBeSent[i + 1], forService: testServiceDescriptor)
- }
- }
- }
- }
- }
- func testWatchOnUnknownServiceDoesNotTerminateTheRPC() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let testServiceDescriptor = ServiceDescriptor.testService
- let message = Grpc_Health_V1_HealthCheckRequest.with {
- $0.service = testServiceDescriptor.fullyQualifiedService
- }
- try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in
- var responseStreamIterator = response.messages.makeAsyncIterator()
- var next = try await responseStreamIterator.next()
- var message = try XCTUnwrap(next)
- // As the service was watched before being updated, the first status received should be
- // .serviceUnknown.
- XCTAssertEqual(message.status, .serviceUnknown)
- healthProvider.updateStatus(.notServing, forService: testServiceDescriptor)
- next = try await responseStreamIterator.next()
- message = try XCTUnwrap(next)
- // The RPC was not terminated and a status update was received successfully.
- XCTAssertEqual(message.status, .notServing)
- }
- }
- }
- func testMultipleWatchOnTheSameService() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let testServiceDescriptor = ServiceDescriptor.testService
- let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving]
- try await withThrowingTaskGroup(
- of: [Grpc_Health_V1_HealthCheckResponse.ServingStatus].self
- ) { group in
- let message = Grpc_Health_V1_HealthCheckRequest.with {
- $0.service = testServiceDescriptor.fullyQualifiedService
- }
- // The continuation of this stream will be used to signal when the watch response streams
- // are up and ready.
- let signal = AsyncStream.makeStream(of: Void.self)
- let numberOfWatches = 2
- for _ in 0 ..< numberOfWatches {
- group.addTask {
- return try await healthClient.watch(
- request: ClientRequest.Single(message: message)
- ) { response in
- signal.continuation.yield() // Make signal
- var statuses = [Grpc_Health_V1_HealthCheckResponse.ServingStatus]()
- var responseStreamIterator = response.messages.makeAsyncIterator()
- // Since responseStreamIterator.next() will never be nil (ideally, as the response
- // stream is always open), the iteration cannot be based on when
- // responseStreamIterator.next() is nil. Else, the iteration infinitely awaits and the
- // test never finishes. Hence, it is based on the expected number of statuses to be
- // received.
- for _ in 0 ..< statusesToBeSent.count + 1 {
- // As the service will be watched before being updated, the first status received
- // should be .serviceUnknown. Hence, the range of this iteration is increased by 1.
- let next = try await responseStreamIterator.next()
- let message = try XCTUnwrap(next)
- statuses.append(message.status)
- }
- return statuses
- }
- }
- }
- // Wait until all the watch streams are up and ready.
- for await _ in signal.stream.prefix(numberOfWatches) {}
- for status in statusesToBeSent {
- healthProvider.updateStatus(status, forService: testServiceDescriptor)
- }
- for try await receivedStatuses in group {
- XCTAssertEqual(receivedStatuses[0], .serviceUnknown)
- for i in 0 ..< statusesToBeSent.count {
- let sentStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i])
- XCTAssertEqual(sentStatus, receivedStatuses[i + 1])
- }
- }
- }
- }
- }
- func testWatchWithUnchangingStatusUpdates() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let testServiceDescriptor = ServiceDescriptor.testService
- let statusesToBeSent: [ServingStatus] = [.notServing, .notServing, .notServing, .serving]
- // The repeated .notServing updates should be received only once. Also, as the service will
- // be watched before being updated, the first status received should be .serviceUnknown.
- let expectedStatuses: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [
- .serviceUnknown,
- .notServing,
- .serving,
- ]
- let message = Grpc_Health_V1_HealthCheckRequest.with {
- $0.service = testServiceDescriptor.fullyQualifiedService
- }
- try await healthClient.watch(
- request: ClientRequest.Single(message: message)
- ) { response in
- // Send all status updates.
- for status in statusesToBeSent {
- healthProvider.updateStatus(status, forService: testServiceDescriptor)
- }
- var responseStreamIterator = response.messages.makeAsyncIterator()
- for i in 0 ..< expectedStatuses.count {
- let next = try await responseStreamIterator.next()
- let message = try XCTUnwrap(next)
- XCTAssertEqual(message.status, expectedStatuses[i])
- }
- }
- }
- }
- func testWatchOnServer() async throws {
- try await withHealthClient { (healthClient, healthProvider) in
- let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving]
- // An unspecified service refers to the server.
- healthProvider.updateStatus(statusesToBeSent[0], forService: "")
- let message = Grpc_Health_V1_HealthCheckRequest()
- try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in
- var responseStreamIterator = response.messages.makeAsyncIterator()
- for i in 0 ..< statusesToBeSent.count {
- let next = try await responseStreamIterator.next()
- let message = try XCTUnwrap(next)
- let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i])
- XCTAssertEqual(message.status, expectedStatus)
- if i < statusesToBeSent.count - 1 {
- healthProvider.updateStatus(statusesToBeSent[i + 1], forService: "")
- }
- }
- }
- }
- }
- }
- extension ServiceDescriptor {
- fileprivate static let testService = ServiceDescriptor(package: "test", service: "Service")
- }
|