Browse Source

Add a closeOutbound func to GRPCStreamStateMachine and remove endStream param from send(message:) (#1837)

Gustavo Cairo 1 year ago
parent
commit
bbd3b9370d

+ 33 - 24
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -325,20 +325,24 @@ struct GRPCStreamStateMachine {
     }
   }
 
-  mutating func send(message: [UInt8], endStream: Bool) throws {
+  mutating func send(message: [UInt8]) throws {
     switch self.configuration {
     case .client:
-      try self.clientSend(message: message, endStream: endStream)
+      try self.clientSend(message: message)
     case .server:
-      if endStream {
-        try self.invalidState(
-          "Can't end response stream by sending a message - send(status:metadata:) must be called"
-        )
-      }
       try self.serverSend(message: message)
     }
   }
 
+  mutating func closeOutbound() throws {
+    switch self.configuration {
+    case .client:
+      try self.clientCloseOutbound()
+    case .server:
+      try self.invalidState("Server cannot call close: it must send status and trailers.")
+    }
+  }
+
   mutating func send(
     status: Status,
     metadata: Metadata
@@ -532,31 +536,36 @@ extension GRPCStreamStateMachine {
     }
   }
 
-  private mutating func clientSend(message: [UInt8], endStream: Bool) throws {
-    // Client sends message.
+  private mutating func clientSend(message: [UInt8]) throws {
     switch self.state {
     case .clientIdleServerIdle:
       try self.invalidState("Client not yet open.")
     case .clientOpenServerIdle(var state):
       state.framer.append(message)
-      if endStream {
-        self.state = .clientClosedServerIdle(.init(previousState: state))
-      } else {
-        self.state = .clientOpenServerIdle(state)
-      }
+      self.state = .clientOpenServerIdle(state)
     case .clientOpenServerOpen(var state):
       state.framer.append(message)
-      if endStream {
-        self.state = .clientClosedServerOpen(.init(previousState: state))
-      } else {
-        self.state = .clientOpenServerOpen(state)
-      }
-    case .clientOpenServerClosed(let state):
+      self.state = .clientOpenServerOpen(state)
+    case .clientOpenServerClosed:
       // The server has closed, so it makes no sense to send the rest of the request.
-      // However, do close if endStream is set.
-      if endStream {
-        self.state = .clientClosedServerClosed(.init(previousState: state))
-      }
+      ()
+    case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
+      try self.invalidState(
+        "Client is closed, cannot send a message."
+      )
+    }
+  }
+
+  private mutating func clientCloseOutbound() throws {
+    switch self.state {
+    case .clientIdleServerIdle:
+      try self.invalidState("Client not yet open.")
+    case .clientOpenServerIdle(let state):
+      self.state = .clientClosedServerIdle(.init(previousState: state))
+    case .clientOpenServerOpen(let state):
+      self.state = .clientClosedServerOpen(.init(previousState: state))
+    case .clientOpenServerClosed(let state):
+      self.state = .clientClosedServerClosed(.init(previousState: state))
     case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
       try self.invalidState(
         "Client is closed, cannot send a message."

+ 1 - 1
Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

@@ -153,7 +153,7 @@ extension GRPCServerStreamHandler {
 
     case .message(let message):
       do {
-        try self.stateMachine.send(message: message, endStream: false)
+        try self.stateMachine.send(message: message)
         // TODO: move the promise handling into the state machine
         promise?.succeed()
       } catch {

+ 48 - 45
Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

@@ -170,21 +170,21 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       // Open client
       XCTAssertNoThrow(try stateMachine.send(metadata: []))
       // Close client
-      XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
+      XCTAssertNoThrow(try stateMachine.closeOutbound())
     case .clientClosedServerOpen:
       // Open client
       XCTAssertNoThrow(try stateMachine.send(metadata: []))
       // Open server
       XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false))
       // Close client
-      XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
+      XCTAssertNoThrow(try stateMachine.closeOutbound())
     case .clientClosedServerClosed:
       // Open client
       XCTAssertNoThrow(try stateMachine.send(metadata: []))
       // Open server
       XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false))
       // Close client
-      XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
+      XCTAssertNoThrow(try stateMachine.closeOutbound())
       // Close server
       XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true))
     }
@@ -238,7 +238,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     // Try to send a message without opening (i.e. without sending initial metadata)
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Client not yet open.")
@@ -252,7 +252,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       var stateMachine = self.makeClientStateMachine(targetState: targetState)
 
       // Now send a message
-      XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false))
+      XCTAssertNoThrow(try stateMachine.send(message: []))
     }
   }
 
@@ -266,7 +266,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       // Try sending another message: it should fail
       XCTAssertThrowsError(
         ofType: RPCError.self,
-        try stateMachine.send(message: [], endStream: false)
+        try stateMachine.send(message: [])
       ) { error in
         XCTAssertEqual(error.code, .internalError)
         XCTAssertEqual(error.message, "Client is closed, cannot send a message.")
@@ -637,7 +637,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
       XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
-      XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+      XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
 
       let expectedBytes: [UInt8] = [
         0,  // compression flag: unset
@@ -663,7 +663,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
-    XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: originalMessage))
 
     let request = try stateMachine.nextOutboundMessage()
     let framedMessage = try self.frameMessage(originalMessage, compress: true)
@@ -679,7 +679,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
-    XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: originalMessage))
 
     let request = try stateMachine.nextOutboundMessage()
     let framedMessage = try self.frameMessage(originalMessage, compress: true)
@@ -694,7 +694,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     // Queue a message, but assert the action is .noMoreMessages nevertheless,
     // because the server is closed.
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
   }
 
@@ -702,7 +702,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle)
 
     // Send a message and close client
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Make sure that getting the next outbound message _does_ return the message
     // we have enqueued.
@@ -722,7 +723,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
 
     // Send a message and close client
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Make sure that getting the next outbound message _does_ return the message
     // we have enqueued.
@@ -741,13 +743,13 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
   func testNextOutboundMessageWhenClientClosedAndServerClosed() throws {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
     // Send a message
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
 
     // Close server
     XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true))
 
     // Close client
-    XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Even though we have enqueued a message, don't send it, because the server
     // is closed.
@@ -821,7 +823,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     try stateMachine.receive(buffer: receivedBytes, endStream: false)
 
     // Close client
-    XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Even though the client is closed, because it received a message while open,
     // we must get the message now.
@@ -843,7 +845,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true))
 
     // Close client
-    XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Even though the client is closed, because it received a message while open,
     // we must get the message now.
@@ -889,7 +891,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     let message = [UInt8]([1, 2, 3, 4])
     let framedMessage = try self.frameMessage(message, compress: false)
-    try stateMachine.send(message: message, endStream: false)
+    try stateMachine.send(message: message)
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
@@ -909,7 +911,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
 
     // Client sends end
-    try stateMachine.send(message: [], endStream: true)
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Server ends
     let metadataReceivedAction = try stateMachine.receive(
@@ -953,7 +955,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     let message = [UInt8]([1, 2, 3, 4])
     let framedMessage = try self.frameMessage(message, compress: false)
-    try stateMachine.send(message: message, endStream: true)
+    XCTAssertNoThrow(try stateMachine.send(message: message))
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
 
@@ -1028,7 +1031,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     let message = [UInt8]([1, 2, 3, 4])
     let framedMessage = try self.frameMessage(message, compress: false)
-    try stateMachine.send(message: message, endStream: false)
+    try stateMachine.send(message: message)
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
@@ -1046,8 +1049,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       ])
     )
 
-    // Client sends end
-    try stateMachine.send(message: [], endStream: true)
+    // Client closes
+    XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Server sends response
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
@@ -1239,7 +1242,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(
@@ -1255,7 +1258,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Now send a message
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(
@@ -1269,7 +1272,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
     // Now send a message
-    XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: []))
   }
 
   func testSendMessageWhenClientOpenAndServerClosed() {
@@ -1278,7 +1281,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1290,7 +1293,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(
@@ -1305,7 +1308,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     // Try sending a message: even though client is closed, we should send it
     // because it may be expecting a response.
-    XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: []))
   }
 
   func testSendMessageWhenClientClosedAndServerClosed() {
@@ -1314,7 +1317,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1360,7 +1363,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1382,7 +1385,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1426,7 +1429,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1448,7 +1451,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [], endStream: false)
+      try stateMachine.send(message: [])
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1906,7 +1909,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
 
     let response = try stateMachine.nextOutboundMessage()
     let expectedBytes: [UInt8] = [
@@ -1929,7 +1932,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
-    XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: originalMessage))
 
     let response = try stateMachine.nextOutboundMessage()
     let framedMessage = try self.frameMessage(originalMessage, compress: true)
@@ -1940,7 +1943,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
     // Send message and close server
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
     XCTAssertNoThrow(
       try stateMachine.send(
         status: .init(code: .ok, message: ""),
@@ -1976,13 +1979,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
     // Send a message
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
 
     // Close client
     XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true))
 
     // Send another message
-    XCTAssertNoThrow(try stateMachine.send(message: [43, 43], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [43, 43]))
 
     // Make sure that getting the next outbound message _does_ return the message
     // we have enqueued.
@@ -2006,7 +2009,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
 
     // Send a message and close server
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
     XCTAssertNoThrow(
       try stateMachine.send(
         status: .init(code: .ok, message: ""),
@@ -2185,8 +2188,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let firstResponse = [UInt8]([5, 6, 7])
     let secondResponse = [UInt8]([8, 9, 10])
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
-    try stateMachine.send(message: firstResponse, endStream: false)
-    try stateMachine.send(message: secondResponse, endStream: false)
+    try stateMachine.send(message: firstResponse)
+    try stateMachine.send(message: secondResponse)
 
     // Make sure messages are outbound
     let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
@@ -2250,8 +2253,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let firstResponse = [UInt8]([5, 6, 7])
     let secondResponse = [UInt8]([8, 9, 10])
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
-    try stateMachine.send(message: firstResponse, endStream: false)
-    try stateMachine.send(message: secondResponse, endStream: false)
+    try stateMachine.send(message: firstResponse)
+    try stateMachine.send(message: secondResponse)
 
     // Make sure messages are outbound
     let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
@@ -2312,8 +2315,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     let firstResponse = [UInt8]([5, 6, 7])
     let secondResponse = [UInt8]([8, 9, 10])
     XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
-    try stateMachine.send(message: firstResponse, endStream: false)
-    try stateMachine.send(message: secondResponse, endStream: false)
+    try stateMachine.send(message: firstResponse)
+    try stateMachine.send(message: secondResponse)
 
     // Make sure messages are outbound
     let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)