浏览代码

Handle write promises correctly (#1843)

Gustavo Cairo 1 年之前
父节点
当前提交
97994e1617

+ 7 - 13
Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift

@@ -143,24 +143,18 @@ extension GRPCClientStreamHandler {
       do {
         self.flushPending = true
         let headers = try self.stateMachine.send(metadata: metadata)
-        context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
-        // TODO: move the promise handling into the state machine
-        promise?.succeed()
+        context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
       } catch {
-        context.fireErrorCaught(error)
-        // TODO: move the promise handling into the state machine
         promise?.fail(error)
+        context.fireErrorCaught(error)
       }
 
     case .message(let message):
       do {
-        try self.stateMachine.send(message: message)
-        // TODO: move the promise handling into the state machine
-        promise?.succeed()
+        try self.stateMachine.send(message: message, promise: promise)
       } catch {
-        context.fireErrorCaught(error)
-        // TODO: move the promise handling into the state machine
         promise?.fail(error)
+        context.fireErrorCaught(error)
       }
     }
   }
@@ -197,12 +191,12 @@ extension GRPCClientStreamHandler {
   private func _flush(context: ChannelHandlerContext) {
     do {
       loop: while true {
-        switch try self.stateMachine.nextOutboundMessage() {
-        case .sendMessage(let byteBuffer):
+        switch try self.stateMachine.nextOutboundFrame() {
+        case .sendFrame(let byteBuffer, let promise):
           self.flushPending = true
           context.write(
             self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
-            promise: nil
+            promise: promise
           )
 
         case .noMoreMessages:

+ 15 - 7
Sources/GRPCHTTP2Core/GRPCMessageFramer.swift

@@ -32,7 +32,7 @@ struct GRPCMessageFramer {
   /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
   static let maximumWriteBufferLength = 65_536
 
-  private var pendingMessages: OneOrManyQueue<[UInt8]>
+  private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise<Void>?)>
 
   private var writeBuffer: ByteBuffer
 
@@ -44,8 +44,8 @@ struct GRPCMessageFramer {
 
   /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
   /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
-  mutating func append(_ bytes: [UInt8]) {
-    self.pendingMessages.append(bytes)
+  mutating func append(_ bytes: [UInt8], promise: EventLoopPromise<Void>?) {
+    self.pendingMessages.append((bytes, promise))
   }
 
   /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
@@ -53,7 +53,9 @@ struct GRPCMessageFramer {
   /// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise
   /// they'll be framed as-is.
   /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
-  mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? {
+  mutating func next(
+    compressor: Zlib.Compressor? = nil
+  ) throws -> (bytes: ByteBuffer, promise: EventLoopPromise<Void>?)? {
     if self.pendingMessages.isEmpty {
       // Nothing pending: exit early.
       return nil
@@ -69,15 +71,21 @@ struct GRPCMessageFramer {
 
     var requiredCapacity = 0
     for message in self.pendingMessages {
-      requiredCapacity += message.count + Self.metadataLength
+      requiredCapacity += message.bytes.count + Self.metadataLength
     }
     self.writeBuffer.clear(minimumCapacity: requiredCapacity)
 
+    var pendingWritePromise: EventLoopPromise<Void>?
     while let message = self.pendingMessages.pop() {
-      try self.encode(message, compressor: compressor)
+      try self.encode(message.bytes, compressor: compressor)
+      if let existingPendingWritePromise = pendingWritePromise {
+        existingPendingWritePromise.futureResult.cascade(to: message.promise)
+      } else {
+        pendingWritePromise = message.promise
+      }
     }
 
-    return self.writeBuffer
+    return (bytes: self.writeBuffer, promise: pendingWritePromise)
   }
 
   private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws {

+ 35 - 28
Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift

@@ -325,12 +325,12 @@ struct GRPCStreamStateMachine {
     }
   }
 
-  mutating func send(message: [UInt8]) throws {
+  mutating func send(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
     switch self.configuration {
     case .client:
-      try self.clientSend(message: message)
+      try self.clientSend(message: message, promise: promise)
     case .server:
-      try self.serverSend(message: message)
+      try self.serverSend(message: message, promise: promise)
     }
   }
 
@@ -397,23 +397,26 @@ struct GRPCStreamStateMachine {
     }
   }
 
-  /// The result of requesting the next outbound message.
-  enum OnNextOutboundMessage: Equatable {
-    /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done
+  /// The result of requesting the next outbound frame, which may contain multiple messages.
+  enum OnNextOutboundFrame {
+    /// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done
     /// writing messages (i.e. we are now closed).
     case noMoreMessages
-    /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
+    /// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying.
     case awaitMoreMessages
-    /// A message is ready to be sent.
-    case sendMessage(ByteBuffer)
+    /// A frame is ready to be sent.
+    case sendFrame(
+      frame: ByteBuffer,
+      promise: EventLoopPromise<Void>?
+    )
   }
 
-  mutating func nextOutboundMessage() throws -> OnNextOutboundMessage {
+  mutating func nextOutboundFrame() throws -> OnNextOutboundFrame {
     switch self.configuration {
     case .client:
-      return try self.clientNextOutboundMessage()
+      return try self.clientNextOutboundFrame()
     case .server:
-      return try self.serverNextOutboundMessage()
+      return try self.serverNextOutboundFrame()
     }
   }
 
@@ -540,15 +543,15 @@ extension GRPCStreamStateMachine {
     }
   }
 
-  private mutating func clientSend(message: [UInt8]) throws {
+  private mutating func clientSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
     switch self.state {
     case .clientIdleServerIdle:
       try self.invalidState("Client not yet open.")
     case .clientOpenServerIdle(var state):
-      state.framer.append(message)
+      state.framer.append(message, promise: promise)
       self.state = .clientOpenServerIdle(state)
     case .clientOpenServerOpen(var state):
-      state.framer.append(message)
+      state.framer.append(message, promise: promise)
       self.state = .clientOpenServerOpen(state)
     case .clientOpenServerClosed:
       // The server has closed, so it makes no sense to send the rest of the request.
@@ -577,23 +580,25 @@ extension GRPCStreamStateMachine {
 
   /// Returns the client's next request to the server.
   /// - Returns: The request to be made to the server.
-  private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage {
+  private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame {
     switch self.state {
     case .clientIdleServerIdle:
       try self.invalidState("Client is not open yet.")
     case .clientOpenServerIdle(var state):
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerIdle(state)
-      return request.map { .sendMessage($0) } ?? .awaitMoreMessages
+      return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
+        ?? .awaitMoreMessages
     case .clientOpenServerOpen(var state):
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerOpen(state)
-      return request.map { .sendMessage($0) } ?? .awaitMoreMessages
+      return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
+        ?? .awaitMoreMessages
     case .clientClosedServerIdle(var state):
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerIdle(state)
       if let request {
-        return .sendMessage(request)
+        return .sendFrame(frame: request.bytes, promise: request.promise)
       } else {
         return .noMoreMessages
       }
@@ -601,7 +606,7 @@ extension GRPCStreamStateMachine {
       let request = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerOpen(state)
       if let request {
-        return .sendMessage(request)
+        return .sendFrame(frame: request.bytes, promise: request.promise)
       } else {
         return .noMoreMessages
       }
@@ -1003,17 +1008,17 @@ extension GRPCStreamStateMachine {
     }
   }
 
-  private mutating func serverSend(message: [UInt8]) throws {
+  private mutating func serverSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
     switch self.state {
     case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
       try self.invalidState(
         "Server must have sent initial metadata before sending a message."
       )
     case .clientOpenServerOpen(var state):
-      state.framer.append(message)
+      state.framer.append(message, promise: promise)
       self.state = .clientOpenServerOpen(state)
     case .clientClosedServerOpen(var state):
-      state.framer.append(message)
+      state.framer.append(message, promise: promise)
       self.state = .clientClosedServerOpen(state)
     case .clientOpenServerClosed, .clientClosedServerClosed:
       try self.invalidState(
@@ -1351,23 +1356,25 @@ extension GRPCStreamStateMachine {
     }
   }
 
-  private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage {
+  private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
     switch self.state {
     case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
       try self.invalidState("Server is not open yet.")
     case .clientOpenServerOpen(var state):
       let response = try state.framer.next(compressor: state.compressor)
       self.state = .clientOpenServerOpen(state)
-      return response.map { .sendMessage($0) } ?? .awaitMoreMessages
+      return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
+        ?? .awaitMoreMessages
     case .clientClosedServerOpen(var state):
       let response = try state.framer.next(compressor: state.compressor)
       self.state = .clientClosedServerOpen(state)
-      return response.map { .sendMessage($0) } ?? .awaitMoreMessages
+      return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
+        ?? .awaitMoreMessages
     case .clientOpenServerClosed(var state):
       let response = try state.framer?.next(compressor: state.compressor)
       self.state = .clientOpenServerClosed(state)
       if let response {
-        return .sendMessage(response)
+        return .sendFrame(frame: response.bytes, promise: response.promise)
       } else {
         return .noMoreMessages
       }
@@ -1375,7 +1382,7 @@ extension GRPCStreamStateMachine {
       let response = try state.framer?.next(compressor: state.compressor)
       self.state = .clientClosedServerClosed(state)
       if let response {
-        return .sendMessage(response)
+        return .sendFrame(frame: response.bytes, promise: response.promise)
       } else {
         return .noMoreMessages
       }

+ 15 - 20
Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift

@@ -34,7 +34,8 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler {
   // We buffer the final status + trailers to avoid reordering issues (i.e.,
   // if there are messages still not written into the channel because flush has
   // not been called, but the server sends back trailers).
-  private var pendingTrailers: HTTP2Frame.FramePayload?
+  private var pendingTrailers:
+    (trailers: HTTP2Frame.FramePayload, promise: EventLoopPromise<Void>?)?
 
   init(
     scheme: Scheme,
@@ -142,37 +143,28 @@ extension GRPCServerStreamHandler {
       do {
         self.flushPending = true
         let headers = try self.stateMachine.send(metadata: metadata)
-        context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
-        // TODO: move the promise handling into the state machine
-        promise?.succeed()
+        context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
       } catch {
-        context.fireErrorCaught(error)
-        // TODO: move the promise handling into the state machine
         promise?.fail(error)
+        context.fireErrorCaught(error)
       }
 
     case .message(let message):
       do {
-        try self.stateMachine.send(message: message)
-        // TODO: move the promise handling into the state machine
-        promise?.succeed()
+        try self.stateMachine.send(message: message, promise: promise)
       } catch {
-        context.fireErrorCaught(error)
-        // TODO: move the promise handling into the state machine
         promise?.fail(error)
+        context.fireErrorCaught(error)
       }
 
     case .status(let status, let metadata):
       do {
         let headers = try self.stateMachine.send(status: status, metadata: metadata)
         let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true))
-        self.pendingTrailers = response
-        // TODO: move the promise handling into the state machine
-        promise?.succeed()
+        self.pendingTrailers = (response, promise)
       } catch {
-        context.fireErrorCaught(error)
-        // TODO: move the promise handling into the state machine
         promise?.fail(error)
+        context.fireErrorCaught(error)
       }
     }
   }
@@ -185,19 +177,22 @@ extension GRPCServerStreamHandler {
 
     do {
       loop: while true {
-        switch try self.stateMachine.nextOutboundMessage() {
-        case .sendMessage(let byteBuffer):
+        switch try self.stateMachine.nextOutboundFrame() {
+        case .sendFrame(let byteBuffer, let promise):
           self.flushPending = true
           context.write(
             self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
-            promise: nil
+            promise: promise
           )
 
         case .noMoreMessages:
           if let pendingTrailers = self.pendingTrailers {
             self.flushPending = true
             self.pendingTrailers = nil
-            context.write(self.wrapOutboundOut(pendingTrailers), promise: nil)
+            context.write(
+              self.wrapOutboundOut(pendingTrailers.trailers),
+              promise: pendingTrailers.promise
+            )
           }
           break loop
 

+ 9 - 9
Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift

@@ -129,19 +129,19 @@ final class GRPCMessageDeframerTests: XCTestCase {
     }
 
     let firstMessage = try {
-      framer.append(Array(repeating: 42, count: 100))
+      framer.append(Array(repeating: 42, count: 100), promise: nil)
       return try framer.next(compressor: compressor)!
     }()
 
     let secondMessage = try {
-      framer.append(Array(repeating: 43, count: 110))
+      framer.append(Array(repeating: 43, count: 110), promise: nil)
       return try framer.next(compressor: compressor)!
     }()
 
     try ByteToMessageDecoderVerifier.verifyDecoder(
       inputOutputPairs: [
-        (firstMessage, [Array(repeating: 42, count: 100)]),
-        (secondMessage, [Array(repeating: 43, count: 110)]),
+        (firstMessage.bytes, [Array(repeating: 42, count: 100)]),
+        (secondMessage.bytes, [Array(repeating: 43, count: 110)]),
       ]) {
         GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor)
       }
@@ -164,12 +164,12 @@ final class GRPCMessageDeframerTests: XCTestCase {
       compressor.end()
     }
 
-    framer.append(Array(repeating: 42, count: 100))
+    framer.append(Array(repeating: 42, count: 100), promise: nil)
     let framedMessage = try framer.next(compressor: compressor)!
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try processor.process(buffer: framedMessage) { _ in
+      try processor.process(buffer: framedMessage.bytes) { _ in
         XCTFail("No message should be produced.")
       }
     ) { error in
@@ -178,7 +178,7 @@ final class GRPCMessageDeframerTests: XCTestCase {
         error.message,
         """
         Message has exceeded the configured maximum payload size \
-        (max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength))
+        (max: 1, actual: \(framedMessage.bytes.readableBytes - GRPCMessageDeframer.metadataLength))
         """
       )
     }
@@ -195,12 +195,12 @@ final class GRPCMessageDeframerTests: XCTestCase {
       compressor.end()
     }
 
-    framer.append(Array(repeating: 42, count: 101))
+    framer.append(Array(repeating: 42, count: 101), promise: nil)
     let framedMessage = try framer.next(compressor: compressor)!
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try processor.process(buffer: framedMessage) { _ in
+      try processor.process(buffer: framedMessage.bytes) { _ in
         XCTFail("No message should be produced.")
       }
     ) { error in

+ 35 - 13
Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift

@@ -15,6 +15,7 @@
  */
 
 import NIOCore
+import NIOEmbedded
 import XCTest
 
 @testable import GRPCHTTP2Core
@@ -22,9 +23,9 @@ import XCTest
 final class GRPCMessageFramerTests: XCTestCase {
   func testSingleWrite() throws {
     var framer = GRPCMessageFramer()
-    framer.append(Array(repeating: 42, count: 128))
+    framer.append(Array(repeating: 42, count: 128), promise: nil)
 
-    var buffer = try XCTUnwrap(framer.next())
+    var buffer = try XCTUnwrap(framer.next()).bytes
     let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
     XCTAssertFalse(compressed)
     XCTAssertEqual(length, 128)
@@ -43,7 +44,7 @@ final class GRPCMessageFramerTests: XCTestCase {
     var framer = GRPCMessageFramer()
 
     let message = [UInt8](repeating: 42, count: 128)
-    framer.append(message)
+    framer.append(message, promise: nil)
 
     var buffer = ByteBuffer()
     let testCompressor = Zlib.Compressor(method: compressionMethod)
@@ -53,7 +54,7 @@ final class GRPCMessageFramerTests: XCTestCase {
       testCompressor.end()
     }
 
-    buffer = try XCTUnwrap(framer.next(compressor: compressor))
+    buffer = try XCTUnwrap(framer.next(compressor: compressor)).bytes
     let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
     XCTAssertTrue(compressed)
     XCTAssertEqual(length, UInt32(compressedSize))
@@ -74,26 +75,47 @@ final class GRPCMessageFramerTests: XCTestCase {
 
   func testMultipleWrites() throws {
     var framer = GRPCMessageFramer()
-
-    let messages = 100
-    for _ in 0 ..< messages {
-      framer.append(Array(repeating: 42, count: 128))
+    let eventLoop = EmbeddedEventLoop()
+
+    // Create 100 messages and link a different promise with each of them.
+    let messagesCount = 100
+    var promises = [EventLoopPromise<Void>]()
+    promises.reserveCapacity(messagesCount)
+    for _ in 0 ..< messagesCount {
+      let promise = eventLoop.makePromise(of: Void.self)
+      promises.append(promise)
+      framer.append(Array(repeating: 42, count: 128), promise: promise)
     }
 
-    var buffer = try XCTUnwrap(framer.next())
-    for _ in 0 ..< messages {
+    let nextFrame = try XCTUnwrap(framer.next())
+
+    // Assert the messages have been framed all together in the same frame.
+    var buffer = nextFrame.bytes
+    for _ in 0 ..< messagesCount {
       let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
       XCTAssertFalse(compressed)
       XCTAssertEqual(length, 128)
       XCTAssertEqual(buffer.readSlice(length: Int(length)), ByteBuffer(repeating: 42, count: 128))
     }
-
     XCTAssertEqual(buffer.readableBytes, 0)
 
-    // No more bufers.
+    // Assert the promise returned from the framer is the promise linked to the
+    // first message appended to the framer.
+    let returnedPromise = nextFrame.promise
+    XCTAssertEqual(returnedPromise?.futureResult, promises.first?.futureResult)
+
+    // Succeed the returned promise to simulate a write into the channel
+    // succeeding, and assert that all other promises have been chained and are
+    // also succeeded as a result.
+    returnedPromise?.succeed()
+    XCTAssertEqual(promises.count, messagesCount)
+    for promise in promises {
+      try promise.futureResult.assertSuccess().wait()
+    }
+
+    // No more frames.
     XCTAssertNil(try framer.next())
   }
-
 }
 
 extension ByteBuffer {

+ 156 - 99
Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift

@@ -16,6 +16,7 @@
 
 import GRPCCore
 import NIOCore
+import NIOEmbedded
 import NIOHPACK
 import XCTest
 
@@ -238,7 +239,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     // Try to send a message without opening (i.e. without sending initial metadata)
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Client not yet open.")
@@ -252,7 +253,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       var stateMachine = self.makeClientStateMachine(targetState: targetState)
 
       // Now send a message
-      XCTAssertNoThrow(try stateMachine.send(message: []))
+      XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil))
     }
   }
 
@@ -266,7 +267,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       // Try sending another message: it should fail
       XCTAssertThrowsError(
         ofType: RPCError.self,
-        try stateMachine.send(message: [])
+        try stateMachine.send(message: [], promise: nil)
       ) { error in
         XCTAssertEqual(error.code, .internalError)
         XCTAssertEqual(error.message, "Client is closed, cannot send a message.")
@@ -624,7 +625,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.nextOutboundMessage()
+      try stateMachine.nextOutboundFrame()
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Client is not open yet.")
@@ -635,9 +636,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen] {
       var stateMachine = self.makeClientStateMachine(targetState: targetState)
 
-      XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+      XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
-      XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+      XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
 
       let expectedBytes: [UInt8] = [
         0,  // compression flag: unset
@@ -645,12 +646,12 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
         42, 42,  // original message
       ]
       XCTAssertEqual(
-        try stateMachine.nextOutboundMessage(),
-        .sendMessage(ByteBuffer(bytes: expectedBytes))
+        try stateMachine.nextOutboundFrame(),
+        .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil)
       )
 
       // And then make sure that nothing else is returned anymore
-      XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+      XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
     }
   }
 
@@ -660,14 +661,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       compressionEnabled: true
     )
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
-    XCTAssertNoThrow(try stateMachine.send(message: originalMessage))
+    XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))
 
-    let request = try stateMachine.nextOutboundMessage()
+    let request = try stateMachine.nextOutboundFrame()
     let framedMessage = try self.frameMessage(originalMessage, compress: true)
-    XCTAssertEqual(request, .sendMessage(framedMessage))
+    XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil))
   }
 
   func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws {
@@ -676,74 +677,74 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       compressionEnabled: true
     )
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
-    XCTAssertNoThrow(try stateMachine.send(message: originalMessage))
+    XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))
 
-    let request = try stateMachine.nextOutboundMessage()
+    let request = try stateMachine.nextOutboundFrame()
     let framedMessage = try self.frameMessage(originalMessage, compress: true)
-    XCTAssertEqual(request, .sendMessage(framedMessage))
+    XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil))
   }
 
   func testNextOutboundMessageWhenClientOpenAndServerClosed() throws {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed)
 
     // No more messages to send
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
 
     // Queue a message, but assert the action is .noMoreMessages nevertheless,
     // because the server is closed.
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
   }
 
   func testNextOutboundMessageWhenClientClosedAndServerIdle() throws {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle)
 
     // Send a message and close client
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
     XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Make sure that getting the next outbound message _does_ return the message
     // we have enqueued.
-    let request = try stateMachine.nextOutboundMessage()
+    let request = try stateMachine.nextOutboundFrame()
     let expectedBytes: [UInt8] = [
       0,  // compression flag: unset
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ]
-    XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes)))
+    XCTAssertEqual(request, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil))
 
     // And then make sure that nothing else is returned anymore
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
   }
 
   func testNextOutboundMessageWhenClientClosedAndServerOpen() throws {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
 
     // Send a message and close client
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
     XCTAssertNoThrow(try stateMachine.closeOutbound())
 
     // Make sure that getting the next outbound message _does_ return the message
     // we have enqueued.
-    let request = try stateMachine.nextOutboundMessage()
+    let request = try stateMachine.nextOutboundFrame()
     let expectedBytes: [UInt8] = [
       0,  // compression flag: unset
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ]
-    XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes)))
+    XCTAssertEqual(request, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil))
 
     // And then make sure that nothing else is returned anymore
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
   }
 
   func testNextOutboundMessageWhenClientClosedAndServerClosed() throws {
     var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
     // Send a message
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
 
     // Close server
     XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true))
@@ -753,7 +754,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
 
     // Even though we have enqueued a message, don't send it, because the server
     // is closed.
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
   }
 
   // - MARK: Next inbound message
@@ -887,13 +888,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     )
 
     // Client sends messages
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     let message = [UInt8]([1, 2, 3, 4])
     let framedMessage = try self.frameMessage(message, compress: false)
-    try stateMachine.send(message: message)
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    try stateMachine.send(message: message, promise: nil)
+    XCTAssertEqual(
+      try stateMachine.nextOutboundFrame(),
+      .sendFrame(frame: framedMessage, promise: nil)
+    )
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     // Server sends response
     XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
@@ -929,7 +933,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata)
     )
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 
@@ -951,14 +955,17 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     )
 
     // Client sends messages and ends
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     let message = [UInt8]([1, 2, 3, 4])
     let framedMessage = try self.frameMessage(message, compress: false)
-    XCTAssertNoThrow(try stateMachine.send(message: message))
+    XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil))
     XCTAssertNoThrow(try stateMachine.closeOutbound())
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(
+      try stateMachine.nextOutboundFrame(),
+      .sendFrame(frame: framedMessage, promise: nil)
+    )
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
 
     // Server sends initial metadata
     let serverInitialHeadersAction = try stateMachine.receive(
@@ -1005,7 +1012,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata)
     )
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 
@@ -1027,13 +1034,16 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
     )
 
     // Client sends messages
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     let message = [UInt8]([1, 2, 3, 4])
     let framedMessage = try self.frameMessage(message, compress: false)
-    try stateMachine.send(message: message)
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    try stateMachine.send(message: message, promise: nil)
+    XCTAssertEqual(
+      try stateMachine.nextOutboundFrame(),
+      .sendFrame(frame: framedMessage, promise: nil)
+    )
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     // Server sends initial metadata
     let serverInitialHeadersAction = try stateMachine.receive(
@@ -1083,7 +1093,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
       .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata)
     )
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 }
@@ -1242,7 +1252,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(
@@ -1258,7 +1268,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Now send a message
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(
@@ -1272,7 +1282,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
     // Now send a message
-    XCTAssertNoThrow(try stateMachine.send(message: []))
+    XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil))
   }
 
   func testSendMessageWhenClientOpenAndServerClosed() {
@@ -1281,7 +1291,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1293,7 +1303,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(
@@ -1308,7 +1318,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     // Try sending a message: even though client is closed, we should send it
     // because it may be expecting a response.
-    XCTAssertNoThrow(try stateMachine.send(message: []))
+    XCTAssertNoThrow(try stateMachine.send(message: [], promise: nil))
   }
 
   func testSendMessageWhenClientClosedAndServerClosed() {
@@ -1317,7 +1327,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1363,7 +1373,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1385,7 +1395,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1429,7 +1439,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1451,7 +1461,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Try sending another message: it should fail because server is now closed.
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.send(message: [])
+      try stateMachine.send(message: [], promise: nil)
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
@@ -1873,7 +1883,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.nextOutboundMessage()
+      try stateMachine.nextOutboundFrame()
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server is not open yet.")
@@ -1885,7 +1895,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.nextOutboundMessage()
+      try stateMachine.nextOutboundFrame()
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server is not open yet.")
@@ -1897,7 +1907,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.nextOutboundMessage()
+      try stateMachine.nextOutboundFrame()
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server is not open yet.")
@@ -1907,20 +1917,20 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
   func testNextOutboundMessageWhenClientOpenAndServerOpen() throws {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
 
-    let response = try stateMachine.nextOutboundMessage()
+    let response = try stateMachine.nextOutboundFrame()
     let expectedBytes: [UInt8] = [
       0,  // compression flag: unset
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ]
-    XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
+    XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil))
 
     // And then make sure that nothing else is returned
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
   }
 
   func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws {
@@ -1929,21 +1939,21 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       compressionEnabled: true
     )
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
 
     let originalMessage = [UInt8]([42, 42, 43, 43])
-    XCTAssertNoThrow(try stateMachine.send(message: originalMessage))
+    XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))
 
-    let response = try stateMachine.nextOutboundMessage()
+    let response = try stateMachine.nextOutboundFrame()
     let framedMessage = try self.frameMessage(originalMessage, compress: true)
-    XCTAssertEqual(response, .sendMessage(framedMessage))
+    XCTAssertEqual(response, .sendFrame(frame: framedMessage, promise: nil))
   }
 
   func testNextOutboundMessageWhenClientOpenAndServerClosed() throws {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
     // Send message and close server
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
     XCTAssertNoThrow(
       try stateMachine.send(
         status: .init(code: .ok, message: ""),
@@ -1951,16 +1961,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       )
     )
 
-    let response = try stateMachine.nextOutboundMessage()
+    let response = try stateMachine.nextOutboundFrame()
     let expectedBytes: [UInt8] = [
       0,  // compression flag: unset
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ]
-    XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
+    XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil))
 
     // And then make sure that nothing else is returned anymore
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
   }
 
   func testNextOutboundMessageWhenClientClosedAndServerIdle() throws {
@@ -1968,7 +1978,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     XCTAssertThrowsError(
       ofType: RPCError.self,
-      try stateMachine.nextOutboundMessage()
+      try stateMachine.nextOutboundFrame()
     ) { error in
       XCTAssertEqual(error.code, .internalError)
       XCTAssertEqual(error.message, "Server is not open yet.")
@@ -1979,17 +1989,17 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
 
     // Send a message
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
 
     // Close client
     XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true))
 
     // Send another message
-    XCTAssertNoThrow(try stateMachine.send(message: [43, 43]))
+    XCTAssertNoThrow(try stateMachine.send(message: [43, 43], promise: nil))
 
     // Make sure that getting the next outbound message _does_ return the message
     // we have enqueued.
-    let response = try stateMachine.nextOutboundMessage()
+    let response = try stateMachine.nextOutboundFrame()
     let expectedBytes: [UInt8] = [
       0,  // compression flag: unset
       0, 0, 0, 2,  // message length: 2 bytes
@@ -1999,17 +2009,17 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
       0, 0, 0, 2,  // message length: 2 bytes
       43, 43,  // original message
     ]
-    XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
+    XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil))
 
     // And then make sure that nothing else is returned anymore
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
   }
 
   func testNextOutboundMessageWhenClientClosedAndServerClosed() throws {
     var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
 
     // Send a message and close server
-    XCTAssertNoThrow(try stateMachine.send(message: [42, 42]))
+    XCTAssertNoThrow(try stateMachine.send(message: [42, 42], promise: nil))
     XCTAssertNoThrow(
       try stateMachine.send(
         status: .init(code: .ok, message: ""),
@@ -2019,16 +2029,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
 
     // We have enqueued a message, make sure we return it even though server is closed,
     // because we haven't yet drained all of the pending messages.
-    let response = try stateMachine.nextOutboundMessage()
+    let response = try stateMachine.nextOutboundFrame()
     let expectedBytes: [UInt8] = [
       0,  // compression flag: unset
       0, 0, 0, 2,  // message length: 2 bytes
       42, 42,  // original message
     ]
-    XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
+    XCTAssertEqual(response, .sendFrame(frame: ByteBuffer(bytes: expectedBytes), promise: nil))
 
     // And then make sure that nothing else is returned anymore
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
   }
 
   // - MARK: Next inbound message
@@ -2185,15 +2195,35 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
 
     // Server sends response
+    let eventLoop = EmbeddedEventLoop()
+    let firstPromise = eventLoop.makePromise(of: Void.self)
+    let secondPromise = eventLoop.makePromise(of: Void.self)
+
     let firstResponse = [UInt8]([5, 6, 7])
     let secondResponse = [UInt8]([8, 9, 10])
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
-    try stateMachine.send(message: firstResponse)
-    try stateMachine.send(message: secondResponse)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
+
+    try stateMachine.send(message: firstResponse, promise: firstPromise)
+    try stateMachine.send(message: secondResponse, promise: secondPromise)
 
     // Make sure messages are outbound
     let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages))
+
+    guard
+      case .sendFrame(let nextOutboundByteBuffer, let nextOutboundPromise) =
+        try stateMachine.nextOutboundFrame()
+    else {
+      XCTFail("Should have received .sendMessage")
+      return
+    }
+    XCTAssertEqual(nextOutboundByteBuffer, framedMessages)
+    XCTAssertTrue(firstPromise.futureResult === nextOutboundPromise?.futureResult)
+
+    // Make sure that the promises associated with each sent message are chained
+    // together: when succeeding the one returned by the state machine on
+    // `nextOutboundMessage()`, the others should also be succeeded.
+    firstPromise.succeed()
+    try secondPromise.futureResult.assertSuccess().wait()
 
     // Client sends end
     try stateMachine.receive(buffer: ByteBuffer(), endStream: true)
@@ -2205,7 +2235,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(response, ["grpc-status": "0"])
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 
@@ -2252,13 +2282,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Server sends response
     let firstResponse = [UInt8]([5, 6, 7])
     let secondResponse = [UInt8]([8, 9, 10])
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
-    try stateMachine.send(message: firstResponse)
-    try stateMachine.send(message: secondResponse)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
+    try stateMachine.send(message: firstResponse, promise: nil)
+    try stateMachine.send(message: secondResponse, promise: nil)
 
     // Make sure messages are outbound
     let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages))
+    XCTAssertEqual(
+      try stateMachine.nextOutboundFrame(),
+      .sendFrame(frame: framedMessages, promise: nil)
+    )
 
     // Server ends
     let response = try stateMachine.send(
@@ -2267,7 +2300,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(response, ["grpc-status": "0"])
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 
@@ -2314,13 +2347,16 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     // Server sends response
     let firstResponse = [UInt8]([5, 6, 7])
     let secondResponse = [UInt8]([8, 9, 10])
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
-    try stateMachine.send(message: firstResponse)
-    try stateMachine.send(message: secondResponse)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)
+    try stateMachine.send(message: firstResponse, promise: nil)
+    try stateMachine.send(message: secondResponse, promise: nil)
 
     // Make sure messages are outbound
     let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages))
+    XCTAssertEqual(
+      try stateMachine.nextOutboundFrame(),
+      .sendFrame(frame: framedMessages, promise: nil)
+    )
 
     // Server ends
     let response = try stateMachine.send(
@@ -2329,7 +2365,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
     )
     XCTAssertEqual(response, ["grpc-status": "0"])
 
-    XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
+    XCTAssertEqual(try stateMachine.nextOutboundFrame(), .noMoreMessages)
     XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
   }
 }
@@ -2362,8 +2398,29 @@ extension XCTestCase {
     }()
     defer { compressor?.end() }
     for message in messages {
-      framer.append(message)
+      framer.append(message, promise: nil)
+    }
+    return try XCTUnwrap(framer.next(compressor: compressor)).bytes
+  }
+}
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension GRPCStreamStateMachine.OnNextOutboundFrame: Equatable {
+  public static func == (
+    lhs: GRPCStreamStateMachine.OnNextOutboundFrame,
+    rhs: GRPCStreamStateMachine.OnNextOutboundFrame
+  ) -> Bool {
+    switch (lhs, rhs) {
+    case (.noMoreMessages, .noMoreMessages):
+      return true
+    case (.awaitMoreMessages, .awaitMoreMessages):
+      return true
+    case (.sendFrame(let lhsMessage, _), .sendFrame(let rhsMessage, _)):
+      // Note that we're not comparing the EventLoopPromises here, as they're
+      // not Equatable. This is fine though, since we only use this in tests.
+      return lhsMessage == rhsMessage
+    default:
+      return false
     }
-    return try XCTUnwrap(framer.next(compressor: compressor))
   }
 }