HTTP2TransportRegressionTests.swift 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. /*
  2. * Copyright 2025, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import GRPCNIOTransportHTTP2
  18. import Testing
  19. struct HTTP2TransportRegressionTests {
  20. @Test
  21. @available(gRPCSwiftNIOTransport 2.2, *)
  22. func testCancelledServerDoesntWedge() async throws {
  23. // Checks that a gRPC server with an active RPC shuts down when the server task
  24. // is cancelled. The flavour of transport doesn't matter here so long as it's HTTP/2.
  25. // Yield a signal so that we know when to cancel the server task. Then sleep
  26. // so that the RPC is still running when the server task is cancelled.
  27. let signal = AsyncStream.makeStream(of: Void.self)
  28. let helloWorld = HelloWorldService { request, _ in
  29. signal.continuation.yield()
  30. try await Task.sleep(for: .seconds(60))
  31. return HelloResponse(message: "Hello, \(request.name)!")
  32. }
  33. let server = GRPCServer(
  34. transport: .http2NIOPosix(
  35. address: .ipv4(host: "127.0.0.1", port: 0),
  36. transportSecurity: .plaintext
  37. ),
  38. services: [helloWorld]
  39. )
  40. let serverTask = Task {
  41. try await server.serve()
  42. }
  43. let address = try await server.listeningAddress
  44. let port = try #require(address?.ipv4?.port)
  45. try await withGRPCClient(
  46. transport: .http2NIOPosix(
  47. target: .ipv4(address: "127.0.0.1", port: port),
  48. transportSecurity: .plaintext
  49. )
  50. ) { client in
  51. let helloWorld = HelloWorld.Client(wrapping: client)
  52. // Kick this off then wait for the signal.
  53. let clientTask = Task {
  54. try await helloWorld.sayHello(HelloRequest(name: "World"))
  55. }
  56. for await _ in signal.stream {
  57. break
  58. }
  59. // The RPC is in progress, so cancel the server.
  60. serverTask.cancel()
  61. // Now the client should complete.
  62. #if compiler(>=6.1)
  63. let error = await #expect(throws: RPCError.self) {
  64. try await clientTask.value
  65. }
  66. #expect(error?.code == .unavailable)
  67. #else
  68. await #expect(throws: RPCError.self) {
  69. try await clientTask.value
  70. }
  71. #endif
  72. }
  73. }
  74. @Test
  75. @available(gRPCSwiftNIOTransport 2.2, *)
  76. func throwingResolverDoesNotShutdownClient() async throws {
  77. // This is a test for: https://github.com/grpc/grpc-swift-2/issues/25
  78. //
  79. // The client gets wedged if a working channel re-resolves and the resolver throws an error.
  80. struct CustomResolver: NameResolverFactory {
  81. struct Target: ResolvableTarget {
  82. let stream: AsyncThrowingStream<NameResolutionResult, any Error>
  83. }
  84. func resolver(for target: Target) -> NameResolver {
  85. NameResolver(names: RPCAsyncSequence(wrapping: target.stream), updateMode: .push)
  86. }
  87. }
  88. var registry = NameResolverRegistry()
  89. registry.registerFactory(CustomResolver())
  90. try await withGRPCServer(
  91. transport: .http2NIOPosix(
  92. address: .ipv4(host: "127.0.0.1", port: 0),
  93. transportSecurity: .plaintext
  94. ),
  95. services: [HelloWorldService()]
  96. ) { server in
  97. let address = try #require(try await server.listeningAddress)
  98. let resolver = AsyncThrowingStream.makeStream(of: NameResolutionResult.self)
  99. // Send in the server address.
  100. resolver.continuation.yield(
  101. NameResolutionResult(endpoints: [Endpoint(addresses: [address])], serviceConfig: nil)
  102. )
  103. try await withGRPCClient(
  104. transport: .http2NIOPosix(
  105. target: CustomResolver.Target(stream: resolver.stream),
  106. transportSecurity: .plaintext,
  107. resolverRegistry: registry
  108. )
  109. ) { rawClient in
  110. let helloWorld = HelloWorld.Client(wrapping: rawClient)
  111. let reply1 = try await helloWorld.sayHello(HelloRequest(name: "World"))
  112. #expect(reply1.message == "Hello, World!")
  113. // Push a failure to the resolver.
  114. struct ResolutionFailure: Error {}
  115. resolver.continuation.finish(throwing: ResolutionFailure())
  116. // Wait a moment for the error to propagate.
  117. try await Task.sleep(for: .milliseconds(50))
  118. let reply2 = try await helloWorld.sayHello(HelloRequest(name: "World"))
  119. #expect(reply2.message == "Hello, World!")
  120. }
  121. }
  122. }
  123. }