Explorar o código

Add missing v2 interop tests (#1928)

* Add missing v2 interop tests

* PR changes
Gustavo Cairo hai 1 ano
pai
achega
ae8207457d

+ 25 - 2
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -192,6 +192,24 @@ private enum GRPCStreamStateMachineState {
 
 
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
     var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
 
 
+    /// This transition should only happen on the client-side.
+    /// It can happen if the request times out before the client outbound can be opened, or if the stream is
+    /// unexpectedly closed for some other reason on the client before it can transition to open.
+    init(previousState: ClientIdleServerIdleState) {
+      self.maximumPayloadSize = previousState.maximumPayloadSize
+      // We don't need a compressor since we won't be sending any messages.
+      self.framer = GRPCMessageFramer()
+      self.compressor = nil
+      self.outboundCompression = .none
+
+      // We haven't received anything from the server.
+      self.deframer = nil
+      self.decompressor = nil
+
+      self.inboundMessageBuffer = .init()
+    }
+
+    /// This transition should only happen on the server-side.
     /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
     /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
     /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
     /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
     /// any more messages from the client anyways, as it's closed.
     /// any more messages from the client anyways, as it's closed.
@@ -625,8 +643,8 @@ extension GRPCStreamStateMachine {
 
 
   private mutating func clientCloseOutbound() throws {
   private mutating func clientCloseOutbound() throws {
     switch self.state {
     switch self.state {
-    case .clientIdleServerIdle:
-      try self.invalidState("Client not yet open.")
+    case .clientIdleServerIdle(let state):
+      self.state = .clientClosedServerIdle(.init(previousState: state))
     case .clientOpenServerIdle(let state):
     case .clientOpenServerIdle(let state):
       self.state = .clientClosedServerIdle(.init(previousState: state))
       self.state = .clientClosedServerIdle(.init(previousState: state))
     case .clientOpenServerOpen(let state):
     case .clientOpenServerOpen(let state):
@@ -645,16 +663,19 @@ extension GRPCStreamStateMachine {
     switch self.state {
     switch self.state {
     case .clientIdleServerIdle:
     case .clientIdleServerIdle:
       try self.invalidState("Client is not open yet.")
       try self.invalidState("Client is not open yet.")
+
     case .clientOpenServerIdle(var state):
     case .clientOpenServerIdle(var state):
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerIdle(state)
       self.state = .clientOpenServerIdle(state)
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
         ?? .awaitMoreMessages
         ?? .awaitMoreMessages
+
     case .clientOpenServerOpen(var state):
     case .clientOpenServerOpen(var state):
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerOpen(state)
       self.state = .clientOpenServerOpen(state)
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
       return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
         ?? .awaitMoreMessages
         ?? .awaitMoreMessages
+
     case .clientClosedServerIdle(var state):
     case .clientClosedServerIdle(var state):
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerIdle(state)
       self.state = .clientClosedServerIdle(state)
@@ -663,6 +684,7 @@ extension GRPCStreamStateMachine {
       } else {
       } else {
         return .noMoreMessages
         return .noMoreMessages
       }
       }
+
     case .clientClosedServerOpen(var state):
     case .clientClosedServerOpen(var state):
       let request = try state.framer.next(compressor: state.compressor)
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerOpen(state)
       self.state = .clientClosedServerOpen(state)
@@ -671,6 +693,7 @@ extension GRPCStreamStateMachine {
       } else {
       } else {
         return .noMoreMessages
         return .noMoreMessages
       }
       }
+
     case .clientOpenServerClosed, .clientClosedServerClosed:
     case .clientOpenServerClosed, .clientClosedServerClosed:
       // No point in sending any more requests if the server is closed.
       // No point in sending any more requests if the server is closed.
       return .noMoreMessages
       return .noMoreMessages

+ 18 - 9
Sources/InteroperabilityTests/InteroperabilityTestCase.swift

@@ -27,23 +27,23 @@ public protocol InteroperabilityTest {
   func run(client: GRPCClient) async throws
   func run(client: GRPCClient) async throws
 }
 }
 
 
-/// Test cases as listed by the [gRPC interoperability test description
-/// specification](https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md).
+/// Test cases as listed by the [gRPC interoperability test description specification]
+/// (https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md).
 ///
 ///
 /// This is not a complete list, the following tests have not been implemented:
 /// This is not a complete list, the following tests have not been implemented:
-/// - cacheable_unary
-/// - client-compressed-unary
-/// - server-compressed-unary
-/// - client_compressed_streaming
-/// - server_compressed_streaming
+/// - cacheable_unary (caching not supported)
+/// - cancel_after_begin (if the client cancels the task running the request, there's no response to be
+/// received, so we can't check we got back a Cancelled status code)
+/// - cancel_after_first_response (same reason as above)
+/// - client_compressed_streaming (we don't support per-message compression, so we can't implement this)
 /// - compute_engine_creds
 /// - compute_engine_creds
 /// - jwt_token_creds
 /// - jwt_token_creds
 /// - oauth2_auth_token
 /// - oauth2_auth_token
 /// - per_rpc_creds
 /// - per_rpc_creds
 /// - google_default_credentials
 /// - google_default_credentials
 /// - compute_engine_channel_credentials
 /// - compute_engine_channel_credentials
-/// - cancel_after_begin
-/// - cancel_after_first_response
+/// - timeout_on_sleeping_server (timeouts end up being surfaced as `CancellationError`s, so we
+/// can't really implement this test)
 ///
 ///
 /// Note: Tests for compression have not been implemented yet as compression is
 /// Note: Tests for compression have not been implemented yet as compression is
 /// not supported. Once the API which allows for compression will be implemented
 /// not supported. Once the API which allows for compression will be implemented
@@ -51,8 +51,11 @@ public protocol InteroperabilityTest {
 public enum InteroperabilityTestCase: String, CaseIterable {
 public enum InteroperabilityTestCase: String, CaseIterable {
   case emptyUnary = "empty_unary"
   case emptyUnary = "empty_unary"
   case largeUnary = "large_unary"
   case largeUnary = "large_unary"
+  case clientCompressedUnary = "client_compressed_unary"
+  case serverCompressedUnary = "server_compressed_unary"
   case clientStreaming = "client_streaming"
   case clientStreaming = "client_streaming"
   case serverStreaming = "server_streaming"
   case serverStreaming = "server_streaming"
+  case serverCompressedStreaming = "server_compressed_streaming"
   case pingPong = "ping_pong"
   case pingPong = "ping_pong"
   case emptyStream = "empty_stream"
   case emptyStream = "empty_stream"
   case customMetadata = "custom_metadata"
   case customMetadata = "custom_metadata"
@@ -75,10 +78,16 @@ extension InteroperabilityTestCase {
       return EmptyUnary()
       return EmptyUnary()
     case .largeUnary:
     case .largeUnary:
       return LargeUnary()
       return LargeUnary()
+    case .clientCompressedUnary:
+      return ClientCompressedUnary()
+    case .serverCompressedUnary:
+      return ServerCompressedUnary()
     case .clientStreaming:
     case .clientStreaming:
       return ClientStreaming()
       return ClientStreaming()
     case .serverStreaming:
     case .serverStreaming:
       return ServerStreaming()
       return ServerStreaming()
+    case .serverCompressedStreaming:
+      return ServerCompressedStreaming()
     case .pingPong:
     case .pingPong:
       return PingPong()
       return PingPong()
     case .emptyStream:
     case .emptyStream:

+ 313 - 12
Sources/InteroperabilityTests/InteroperabilityTestCases.swift

@@ -88,6 +88,214 @@ struct LargeUnary: InteroperabilityTest {
   }
   }
 }
 }
 
 
+/// This test verifies the client can compress unary messages by sending two unary calls, for
+/// compressed and uncompressed payloads. It also sends an initial probing request to verify
+/// whether the server supports the CompressedRequest feature by checking if the probing call
+/// fails with an `INVALID_ARGUMENT` status.
+///
+/// Server features:
+/// - UnaryCall
+/// - CompressedRequest
+///
+/// Procedure:
+/// 1. Client calls UnaryCall with the feature probe, an *uncompressed* message:
+///    ```
+///    {
+///      expect_compressed:{
+///        value: true
+///      }
+///      response_size: 314159
+///      payload:{
+///        body: 271828 bytes of zeros
+///      }
+///    }
+///    ```
+/// 2. Client calls UnaryCall with the *compressed* message:
+///    ```
+///    {
+///      expect_compressed:{
+///        value: true
+///      }
+///      response_size: 314159
+///      payload:{
+///        body: 271828 bytes of zeros
+///      }
+///    }
+///    ```
+/// 3. Client calls UnaryCall with the *uncompressed* message:
+///    ```
+///    {
+///      expect_compressed:{
+///        value: false
+///      }
+///      response_size: 314159
+///      payload:{
+///        body: 271828 bytes of zeros
+///      }
+///    }
+///    ```
+///
+/// Client asserts:
+/// - First call failed with `INVALID_ARGUMENT` status.
+/// - Subsequent calls were successful.
+/// - Response payload body is 314159 bytes in size.
+/// - Clients are free to assert that the response payload body contents are zeros and comparing the
+///   entire response message against a golden response.
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+class ClientCompressedUnary: InteroperabilityTest {
+  func run(client: GRPCClient) async throws {
+    let testServiceClient = Grpc_Testing_TestService.Client(client: client)
+    let compressedRequest = Grpc_Testing_SimpleRequest.with { request in
+      request.expectCompressed = .with { $0.value = true }
+      request.responseSize = 314_159
+      request.payload = .with { $0.body = Data(repeating: 0, count: 271_828) }
+    }
+
+    var uncompressedRequest = compressedRequest
+    uncompressedRequest.expectCompressed = .with { $0.value = false }
+
+    // For unary RPCs we disable compression at the call level.
+    var options = CallOptions.defaults
+
+    // With compression expected but *disabled*.
+    options.compression = CompressionAlgorithm.none
+    try await testServiceClient.unaryCall(
+      request: ClientRequest.Single(message: compressedRequest),
+      options: options
+    ) { response in
+      switch response.accepted {
+      case .success:
+        throw AssertionFailure(message: "The result should be an error.")
+      case .failure(let error):
+        try assertEqual(error.code, .invalidArgument)
+      }
+    }
+
+    // With compression expected and enabled.
+    options.compression = .gzip
+
+    try await testServiceClient.unaryCall(
+      request: ClientRequest.Single(message: compressedRequest),
+      options: options
+    ) { response in
+      switch response.accepted {
+      case .success(let success):
+        try assertEqual(success.message.get().payload.body, Data(repeating: 0, count: 314_159))
+      case .failure:
+        throw AssertionFailure(message: "Response should have been accepted.")
+      }
+    }
+
+    // With compression not expected and disabled.
+    options.compression = CompressionAlgorithm.none
+    try await testServiceClient.unaryCall(
+      request: ClientRequest.Single(message: uncompressedRequest),
+      options: options
+    ) { response in
+      switch response.accepted {
+      case .success(let success):
+        try assertEqual(success.message.get().payload.body, Data(repeating: 0, count: 314_159))
+      case .failure:
+        throw AssertionFailure(message: "Response should have been accepted.")
+      }
+    }
+  }
+}
+
+/// This test verifies the server can compress unary messages. It sends two unary
+/// requests, expecting the server's response to be compressed or not according to
+/// the `response_compressed` boolean.
+///
+/// Whether compression was actually performed is determined by the compression bit
+/// in the response's message flags. *Note that some languages may not have access
+/// to the message flags, in which case the client will be unable to verify that
+/// the `response_compressed` boolean is obeyed by the server*.
+///
+///
+/// Server features:
+/// - UnaryCall
+/// - CompressedResponse
+///
+/// Procedure:
+/// 1. Client calls UnaryCall with `SimpleRequest`:
+///    ```
+///    {
+///      response_compressed:{
+///        value: true
+///      }
+///      response_size: 314159
+///      payload:{
+///        body: 271828 bytes of zeros
+///      }
+///    }
+///    ```
+///    ```
+///    {
+///      response_compressed:{
+///        value: false
+///      }
+///      response_size: 314159
+///      payload:{
+///        body: 271828 bytes of zeros
+///      }
+///    }
+///    ```
+///
+/// Client asserts:
+/// - call was successful
+/// - if supported by the implementation, when `response_compressed` is true, the response MUST have
+///   the compressed message flag set.
+/// - if supported by the implementation, when `response_compressed` is false, the response MUST NOT
+///   have the compressed message flag set.
+/// - response payload body is 314159 bytes in size in both cases.
+/// - clients are free to assert that the response payload body contents are zero and comparing the
+///   entire response message against a golden response
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+class ServerCompressedUnary: InteroperabilityTest {
+  func run(client: GRPCClient) async throws {
+    let testServiceClient = Grpc_Testing_TestService.Client(client: client)
+
+    let compressedRequest = Grpc_Testing_SimpleRequest.with { request in
+      request.responseCompressed = .with { $0.value = true }
+      request.responseSize = 314_159
+      request.payload = .with { $0.body = Data(repeating: 0, count: 271_828) }
+    }
+
+    try await testServiceClient.unaryCall(
+      request: ClientRequest.Single(message: compressedRequest)
+    ) { response in
+      // We can't verify that the compression bit was set, instead we verify that the encoding header
+      // was sent by the server. This isn't quite the same since as it can still be set but the
+      // compression may _not_ be set.
+      try assertTrue(response.metadata["grpc-encoding"].contains { $0 != "identity" })
+
+      switch response.accepted {
+      case .success(let success):
+        try assertEqual(success.message.get().payload.body, Data(repeating: 0, count: 314_159))
+      case .failure:
+        throw AssertionFailure(message: "Response should have been accepted.")
+      }
+    }
+
+    var uncompressedRequest = compressedRequest
+    uncompressedRequest.responseCompressed.value = false
+    try await testServiceClient.unaryCall(
+      request: ClientRequest.Single(message: compressedRequest)
+    ) { response in
+      // We can't even check for the 'grpc-encoding' header here since it could be set with the
+      // compression bit on the message not set.
+      switch response.accepted {
+      case .success(let success):
+        try assertEqual(success.message.get().payload.body, Data(repeating: 0, count: 314_159))
+      case .failure:
+        throw AssertionFailure(
+          message: "Response should have been accepted."
+        )
+      }
+    }
+  }
+}
+
 /// This test verifies that client-only streaming succeeds.
 /// This test verifies that client-only streaming succeeds.
 ///
 ///
 /// Server features:
 /// Server features:
@@ -217,6 +425,103 @@ struct ServerStreaming: InteroperabilityTest {
   }
   }
 }
 }
 
 
+/// This test verifies that the server can compress streaming messages and disable compression on
+/// individual messages, expecting the server's response to be compressed or not according to the
+/// `response_compressed` boolean.
+///
+/// Whether compression was actually performed is determined by the compression bit in the
+/// response's message flags. *Note that some languages may not have access to the message flags, in
+/// which case the client will be unable to verify that the `response_compressed` boolean is obeyed
+/// by the server*.
+///
+/// Server features:
+/// - StreamingOutputCall
+/// - CompressedResponse
+///
+/// Procedure:
+///  1. Client calls StreamingOutputCall with `StreamingOutputCallRequest`:
+///     ```
+///     {
+///       response_parameters:{
+///         compressed: {
+///           value: true
+///         }
+///         size: 31415
+///       }
+///       response_parameters:{
+///         compressed: {
+///           value: false
+///         }
+///         size: 92653
+///       }
+///     }
+///     ```
+///
+/// Client asserts:
+/// - call was successful
+/// - exactly two responses
+/// - if supported by the implementation, when `response_compressed` is false, the response's
+///   messages MUST NOT have the compressed message flag set.
+/// - if supported by the implementation, when `response_compressed` is true, the response's
+///   messages MUST have the compressed message flag set.
+/// - response payload bodies are sized (in order): 31415, 92653
+/// - clients are free to assert that the response payload body contents are zero and comparing the
+///   entire response messages against golden responses
+class ServerCompressedStreaming: InteroperabilityTest {
+  @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+  func run(client: GRPCClient) async throws {
+    let testServiceClient = Grpc_Testing_TestService.Client(client: client)
+    let request: Grpc_Testing_StreamingOutputCallRequest = .with { request in
+      request.responseParameters = [
+        .with {
+          $0.compressed = .with { $0.value = true }
+          $0.size = 31415
+        },
+        .with {
+          $0.compressed = .with { $0.value = false }
+          $0.size = 92653
+        },
+      ]
+    }
+    let responseSizes = [31415, 92653]
+
+    try await testServiceClient.streamingOutputCall(
+      request: ClientRequest.Single(message: request)
+    ) { response in
+      var payloads = [Grpc_Testing_Payload]()
+
+      switch response.accepted {
+      case .success(let success):
+        // We can't verify that the compression bit was set, instead we verify that the encoding header
+        // was sent by the server. This isn't quite the same since as it can still be set but the
+        // compression may be not set.
+        try assertTrue(success.metadata["grpc-encoding"].contains { $0 != "identity" })
+
+        for try await part in success.bodyParts {
+          switch part {
+          case .message(let message):
+            payloads.append(message.payload)
+          case .trailingMetadata:
+            ()
+          }
+        }
+
+      case .failure:
+        throw AssertionFailure(message: "Response should have been accepted.")
+      }
+
+      try assertEqual(
+        payloads,
+        responseSizes.map { size in
+          Grpc_Testing_Payload.with {
+            $0.body = Data(repeating: 0, count: size)
+          }
+        }
+      )
+    }
+  }
+}
+
 /// This test verifies that full duplex bidi is supported.
 /// This test verifies that full duplex bidi is supported.
 ///
 ///
 /// Server features:
 /// Server features:
@@ -478,7 +783,7 @@ struct CustomMetadata: InteroperabilityTest {
             try self.checkTrailingMetadata(receivedTrailingMetadata)
             try self.checkTrailingMetadata(receivedTrailingMetadata)
           }
           }
         }
         }
-      case .failure(_):
+      case .failure:
         throw AssertionFailure(
         throw AssertionFailure(
           message: "The client should have received a response from the server."
           message: "The client should have received a response from the server."
         )
         )
@@ -541,7 +846,7 @@ struct StatusCodeAndMessage: InteroperabilityTest {
       case .failure(let error):
       case .failure(let error):
         try assertEqual(error.code.rawValue, self.expectedCode)
         try assertEqual(error.code.rawValue, self.expectedCode)
         try assertEqual(error.message, self.expectedMessage)
         try assertEqual(error.message, self.expectedMessage)
-      case .success(_):
+      case .success:
         throw AssertionFailure(
         throw AssertionFailure(
           message:
           message:
             "The client should receive an error with the status code and message sent by the client."
             "The client should receive an error with the status code and message sent by the client."
@@ -613,7 +918,7 @@ struct SpecialStatusMessage: InteroperabilityTest {
       request: ClientRequest.Single(message: message)
       request: ClientRequest.Single(message: message)
     ) { response in
     ) { response in
       switch response.accepted {
       switch response.accepted {
-      case .success(_):
+      case .success:
         throw AssertionFailure(
         throw AssertionFailure(
           message: "The response should be an error with the error code 2."
           message: "The response should be an error with the error code 2."
         )
         )
@@ -647,9 +952,8 @@ struct UnimplementedMethod: InteroperabilityTest {
     try await testServiceClient.unimplementedCall(
     try await testServiceClient.unimplementedCall(
       request: ClientRequest.Single(message: Grpc_Testing_Empty())
       request: ClientRequest.Single(message: Grpc_Testing_Empty())
     ) { response in
     ) { response in
-      let result = response.accepted
-      switch result {
-      case .success(_):
+      switch response.accepted {
+      case .success:
         throw AssertionFailure(
         throw AssertionFailure(
           message: "The result should be an error."
           message: "The result should be an error."
         )
         )
@@ -681,12 +985,9 @@ struct UnimplementedService: InteroperabilityTest {
     try await unimplementedServiceClient.unimplementedCall(
     try await unimplementedServiceClient.unimplementedCall(
       request: ClientRequest.Single(message: Grpc_Testing_Empty())
       request: ClientRequest.Single(message: Grpc_Testing_Empty())
     ) { response in
     ) { response in
-      let result = response.accepted
-      switch result {
-      case .success(_):
-        throw AssertionFailure(
-          message: "The result should be an error."
-        )
+      switch response.accepted {
+      case .success:
+        throw AssertionFailure(message: "The result should be an error.")
       case .failure(let error):
       case .failure(let error):
         try assertEqual(error.code, .unimplemented)
         try assertEqual(error.code, .unimplemented)
       }
       }

+ 5 - 0
Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

@@ -1067,6 +1067,11 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
   }
 
 
+  func testClientClosesBeforeItCanOpen() throws {
+    var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
+  }
+
   func testClientClosesBeforeServerOpens() throws {
   func testClientClosesBeforeServerOpens() throws {
     var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
     var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)