|
|
@@ -18,12 +18,9 @@ import NIO
|
|
|
import NIOHPACK
|
|
|
import NIOHTTP2
|
|
|
|
|
|
-internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServerResponseWriter {
|
|
|
+internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
|
|
|
typealias InboundIn = HTTP2Frame.FramePayload
|
|
|
- typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
|
|
|
-
|
|
|
typealias OutboundOut = HTTP2Frame.FramePayload
|
|
|
- typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
|
|
|
|
|
|
private var logger: Logger
|
|
|
private var state: HTTP2ToRawGRPCStateMachine
|
|
|
@@ -43,7 +40,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
|
|
|
private enum Mode {
|
|
|
case notConfigured
|
|
|
- case legacy
|
|
|
case handler(GRPCServerHandlerProtocol)
|
|
|
}
|
|
|
|
|
|
@@ -69,14 +65,13 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
|
|
|
internal func handlerRemoved(context: ChannelHandlerContext) {
|
|
|
self.context = nil
|
|
|
+ self.mode = .notConfigured
|
|
|
}
|
|
|
|
|
|
internal func errorCaught(context: ChannelHandlerContext, error: Error) {
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
context.close(mode: .all, promise: nil)
|
|
|
- case .legacy:
|
|
|
- context.fireErrorCaught(error)
|
|
|
case let .handler(hander):
|
|
|
hander.receiveError(error)
|
|
|
}
|
|
|
@@ -84,7 +79,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
|
|
|
internal func channelInactive(context: ChannelHandlerContext) {
|
|
|
switch self.mode {
|
|
|
- case .notConfigured, .legacy:
|
|
|
+ case .notConfigured:
|
|
|
context.fireChannelInactive()
|
|
|
case let .handler(handler):
|
|
|
handler.finish()
|
|
|
@@ -108,12 +103,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
)
|
|
|
|
|
|
switch receiveHeaders {
|
|
|
- case let .configureLegacy(handler):
|
|
|
- self.mode = .legacy
|
|
|
- context.channel.pipeline.addHandler(handler).whenSuccess {
|
|
|
- self.configured()
|
|
|
- }
|
|
|
-
|
|
|
case let .configure(handler):
|
|
|
self.mode = .handler(handler)
|
|
|
self.configured()
|
|
|
@@ -161,31 +150,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
context.fireChannelReadComplete()
|
|
|
}
|
|
|
|
|
|
- internal func write(
|
|
|
- context: ChannelHandlerContext,
|
|
|
- data: NIOAny,
|
|
|
- promise: EventLoopPromise<Void>?
|
|
|
- ) {
|
|
|
- let responsePart = self.unwrapOutboundIn(data)
|
|
|
-
|
|
|
- switch responsePart {
|
|
|
- case let .metadata(headers):
|
|
|
- // We're in 'write' so we're using the old type of RPC handler which emits its own flushes,
|
|
|
- // no need to emit an extra one.
|
|
|
- self.sendMetadata(headers, flush: false, promise: promise)
|
|
|
-
|
|
|
- case let .message(buffer, metadata):
|
|
|
- self.sendMessage(buffer, metadata: metadata, promise: promise)
|
|
|
-
|
|
|
- case let .end(status, trailers):
|
|
|
- self.sendEnd(status: status, trailers: trailers, promise: promise)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- internal func flush(context: ChannelHandlerContext) {
|
|
|
- self.markFlushPoint()
|
|
|
- }
|
|
|
-
|
|
|
/// Called when the pipeline has finished configuring.
|
|
|
private func configured() {
|
|
|
switch self.state.pipelineConfigured() {
|
|
|
@@ -193,8 +157,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
|
|
|
case let .handler(handler):
|
|
|
handler.receiveMetadata(headers)
|
|
|
}
|
|
|
@@ -203,8 +165,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.metadata(headers)))
|
|
|
case let .handler(handler):
|
|
|
handler.receiveMetadata(headers)
|
|
|
}
|
|
|
@@ -223,8 +183,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
|
|
|
case let .handler(handler):
|
|
|
handler.receiveMessage(buffer)
|
|
|
}
|
|
|
@@ -233,9 +191,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.end))
|
|
|
case let .handler(handler):
|
|
|
handler.receiveMessage(buffer)
|
|
|
handler.receiveEnd()
|
|
|
@@ -245,8 +200,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
|
|
|
case let .handler(handler):
|
|
|
handler.receiveMessage(buffer)
|
|
|
}
|
|
|
@@ -256,8 +209,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireChannelRead(self.wrapInboundOut(.end))
|
|
|
case let .handler(handler):
|
|
|
handler.receiveEnd()
|
|
|
}
|
|
|
@@ -266,8 +217,6 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
|
|
|
switch self.mode {
|
|
|
case .notConfigured:
|
|
|
preconditionFailure()
|
|
|
- case .legacy:
|
|
|
- self.context.fireErrorCaught(error)
|
|
|
case let .handler(handler):
|
|
|
handler.receiveError(error)
|
|
|
}
|