|
|
@@ -167,7 +167,7 @@ extension GRPCWebToHTTP2ServerCodec {
|
|
|
case fullyOpen(InboundState, OutboundState)
|
|
|
|
|
|
/// The server has closed the response stream, we may receive other request parts from the client.
|
|
|
- case clientOpenServerClosed
|
|
|
+ case clientOpenServerClosed(InboundState)
|
|
|
|
|
|
/// The client has sent everything, the server still needs to close the response stream.
|
|
|
case clientClosedServerOpen(OutboundState)
|
|
|
@@ -304,40 +304,15 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
|
|
|
preconditionFailure("Invalid state: haven't received request head")
|
|
|
|
|
|
case .fullyOpen(var inbound, let outbound):
|
|
|
- if inbound.requestBuffer == nil {
|
|
|
- // We're not dealing with gRPC Web Text: just forward the buffer.
|
|
|
- return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
|
|
|
- }
|
|
|
-
|
|
|
- if inbound.requestBuffer!.readableBytes == 0 {
|
|
|
- inbound.requestBuffer = buffer
|
|
|
- } else {
|
|
|
- inbound.requestBuffer!.writeBuffer(&buffer)
|
|
|
- }
|
|
|
-
|
|
|
- let readableBytes = inbound.requestBuffer!.readableBytes
|
|
|
- // The length of base64 encoded data must be a multiple of 4.
|
|
|
- let bytesToRead = readableBytes - (readableBytes % 4)
|
|
|
-
|
|
|
- let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
|
|
|
-
|
|
|
- if bytesToRead > 0,
|
|
|
- let base64Encoded = inbound.requestBuffer!.readString(length: bytesToRead),
|
|
|
- let base64Decoded = Data(base64Encoded: base64Encoded) {
|
|
|
- // Recycle the input buffer and restore the request buffer.
|
|
|
- buffer.clear()
|
|
|
- buffer.writeContiguousBytes(base64Decoded)
|
|
|
- action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
|
|
|
- } else {
|
|
|
- action = .none
|
|
|
- }
|
|
|
-
|
|
|
+ let action = inbound.processInboundData(buffer: &buffer)
|
|
|
self = .fullyOpen(inbound, outbound)
|
|
|
return action
|
|
|
|
|
|
- case .clientOpenServerClosed:
|
|
|
- // The server is already done; so drop the request.
|
|
|
- return .none
|
|
|
+ case var .clientOpenServerClosed(inbound):
|
|
|
+ // The server is already done, but it's not our place to drop the request.
|
|
|
+ let action = inbound.processInboundData(buffer: &buffer)
|
|
|
+ self = .clientOpenServerClosed(inbound)
|
|
|
+ return action
|
|
|
|
|
|
case .clientClosedServerOpen:
|
|
|
preconditionFailure("End of request stream already received")
|
|
|
@@ -366,9 +341,13 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
|
|
|
preconditionFailure("End of request stream already received")
|
|
|
|
|
|
case .clientOpenServerClosed:
|
|
|
- // Both sides are closed now, back to idle.
|
|
|
+ // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
|
|
|
+ // it's necessary to communicate to the other peers that the response is done.
|
|
|
self = .idle
|
|
|
- return .none
|
|
|
+
|
|
|
+ // Send an empty DATA frame with the end stream flag set.
|
|
|
+ let empty = allocator.buffer(capacity: 0)
|
|
|
+ return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
|
|
|
|
|
|
case ._modifying:
|
|
|
preconditionFailure("Left in modifying state")
|
|
|
@@ -388,12 +367,12 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
|
|
|
case .idle:
|
|
|
preconditionFailure("Invalid state: haven't received request head")
|
|
|
|
|
|
- case var .fullyOpen(_, outbound):
|
|
|
+ case .fullyOpen(let inbound, var outbound):
|
|
|
// Double check these are trailers.
|
|
|
assert(outbound.responseHeadersSent)
|
|
|
|
|
|
// We haven't seen the end of the request stream yet.
|
|
|
- self = .clientOpenServerClosed
|
|
|
+ self = .clientOpenServerClosed(inbound)
|
|
|
|
|
|
// Avoid CoW-ing the buffers.
|
|
|
let responseBuffers = outbound.responseBuffer
|
|
|
@@ -467,9 +446,9 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
|
|
|
case .idle:
|
|
|
preconditionFailure("Invalid state: haven't received request head")
|
|
|
|
|
|
- case let .fullyOpen(_, outbound):
|
|
|
+ case let .fullyOpen(inbound, outbound):
|
|
|
// We still haven't seen the end of the request stream.
|
|
|
- self = .clientOpenServerClosed
|
|
|
+ self = .clientOpenServerClosed(inbound)
|
|
|
|
|
|
let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
|
|
|
hpackHeaders: trailers,
|
|
|
@@ -703,6 +682,42 @@ extension GRPCWebToHTTP2ServerCodec {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
|
|
|
+ fileprivate mutating func processInboundData(
|
|
|
+ buffer: inout ByteBuffer
|
|
|
+ ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
|
|
|
+ if self.requestBuffer == nil {
|
|
|
+ // We're not dealing with gRPC Web Text: just forward the buffer.
|
|
|
+ return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.requestBuffer!.readableBytes == 0 {
|
|
|
+ self.requestBuffer = buffer
|
|
|
+ } else {
|
|
|
+ self.requestBuffer!.writeBuffer(&buffer)
|
|
|
+ }
|
|
|
+
|
|
|
+ let readableBytes = self.requestBuffer!.readableBytes
|
|
|
+ // The length of base64 encoded data must be a multiple of 4.
|
|
|
+ let bytesToRead = readableBytes - (readableBytes % 4)
|
|
|
+
|
|
|
+ let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
|
|
|
+
|
|
|
+ if bytesToRead > 0,
|
|
|
+ let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
|
|
|
+ let base64Decoded = Data(base64Encoded: base64Encoded) {
|
|
|
+ // Recycle the input buffer and restore the request buffer.
|
|
|
+ buffer.clear()
|
|
|
+ buffer.writeContiguousBytes(base64Decoded)
|
|
|
+ action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
|
|
|
+ } else {
|
|
|
+ action = .none
|
|
|
+ }
|
|
|
+
|
|
|
+ return action
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
extension HTTPHeaders {
|
|
|
fileprivate init(hpackHeaders headers: HPACKHeaders) {
|
|
|
self.init()
|