|
|
@@ -192,6 +192,24 @@ private enum GRPCStreamStateMachineState {
|
|
|
|
|
|
var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
|
|
|
|
|
|
+ /// This transition should only happen on the client-side.
|
|
|
+ /// It can happen if the request times out before the client outbound can be opened, or if the stream is
|
|
|
+ /// unexpectedly closed for some other reason on the client before it can transition to open.
|
|
|
+ init(previousState: ClientIdleServerIdleState) {
|
|
|
+ self.maximumPayloadSize = previousState.maximumPayloadSize
|
|
|
+ // We don't need a compressor since we won't be sending any messages.
|
|
|
+ self.framer = GRPCMessageFramer()
|
|
|
+ self.compressor = nil
|
|
|
+ self.outboundCompression = .none
|
|
|
+
|
|
|
+ // We haven't received anything from the server.
|
|
|
+ self.deframer = nil
|
|
|
+ self.decompressor = nil
|
|
|
+
|
|
|
+ self.inboundMessageBuffer = .init()
|
|
|
+ }
|
|
|
+
|
|
|
+ /// This transition should only happen on the server-side.
|
|
|
/// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
|
|
|
/// initial metadata). We don't need to know a decompression algorithm, since we won't receive
|
|
|
/// any more messages from the client anyways, as it's closed.
|
|
|
@@ -625,8 +643,8 @@ extension GRPCStreamStateMachine {
|
|
|
|
|
|
private mutating func clientCloseOutbound() throws {
|
|
|
switch self.state {
|
|
|
- case .clientIdleServerIdle:
|
|
|
- try self.invalidState("Client not yet open.")
|
|
|
+ case .clientIdleServerIdle(let state):
|
|
|
+ self.state = .clientClosedServerIdle(.init(previousState: state))
|
|
|
case .clientOpenServerIdle(let state):
|
|
|
self.state = .clientClosedServerIdle(.init(previousState: state))
|
|
|
case .clientOpenServerOpen(let state):
|
|
|
@@ -645,16 +663,19 @@ extension GRPCStreamStateMachine {
|
|
|
switch self.state {
|
|
|
case .clientIdleServerIdle:
|
|
|
try self.invalidState("Client is not open yet.")
|
|
|
+
|
|
|
case .clientOpenServerIdle(var state):
|
|
|
let request = try state.framer.next(compressor: state.compressor)
|
|
|
self.state = .clientOpenServerIdle(state)
|
|
|
return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
|
|
|
?? .awaitMoreMessages
|
|
|
+
|
|
|
case .clientOpenServerOpen(var state):
|
|
|
let request = try state.framer.next(compressor: state.compressor)
|
|
|
self.state = .clientOpenServerOpen(state)
|
|
|
return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
|
|
|
?? .awaitMoreMessages
|
|
|
+
|
|
|
case .clientClosedServerIdle(var state):
|
|
|
let request = try state.framer.next(compressor: state.compressor)
|
|
|
self.state = .clientClosedServerIdle(state)
|
|
|
@@ -663,6 +684,7 @@ extension GRPCStreamStateMachine {
|
|
|
} else {
|
|
|
return .noMoreMessages
|
|
|
}
|
|
|
+
|
|
|
case .clientClosedServerOpen(var state):
|
|
|
let request = try state.framer.next(compressor: state.compressor)
|
|
|
self.state = .clientClosedServerOpen(state)
|
|
|
@@ -671,6 +693,7 @@ extension GRPCStreamStateMachine {
|
|
|
} else {
|
|
|
return .noMoreMessages
|
|
|
}
|
|
|
+
|
|
|
case .clientOpenServerClosed, .clientClosedServerClosed:
|
|
|
// No point in sending any more requests if the server is closed.
|
|
|
return .noMoreMessages
|