HTTP2ToRawGRPCServerCodec.swift 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  23. typealias OutboundOut = HTTP2Frame.FramePayload
  24. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  25. private var logger: Logger
  26. private var state: HTTP2ToRawGRPCStateMachine
  27. init(
  28. servicesByName: [Substring: CallHandlerProvider],
  29. encoding: ServerMessageEncoding,
  30. normalizeHeaders: Bool,
  31. logger: Logger
  32. ) {
  33. self.logger = logger
  34. self.state = HTTP2ToRawGRPCStateMachine(
  35. services: servicesByName,
  36. encoding: encoding,
  37. normalizeHeaders: normalizeHeaders
  38. )
  39. }
  40. /// Called when the pipeline has finished configuring.
  41. private func configured(context: ChannelHandlerContext) {
  42. self.act(on: self.state.pipelineConfigured(), with: context)
  43. }
  44. /// Act on an action returned from the state machine.
  45. private func act(
  46. on action: HTTP2ToRawGRPCStateMachine.Action,
  47. with context: ChannelHandlerContext
  48. ) {
  49. switch action {
  50. case .none:
  51. ()
  52. case let .configure(handler):
  53. context.channel.pipeline.addHandler(handler).whenSuccess {
  54. self.configured(context: context)
  55. }
  56. case let .errorCaught(error):
  57. context.fireErrorCaught(error)
  58. case let .forwardHeaders(metadata):
  59. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  60. case let .forwardMessage(buffer):
  61. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  62. case let .forwardMessageAndEnd(buffer):
  63. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  64. context.fireChannelRead(self.wrapInboundOut(.end))
  65. case let .forwardHeadersThenReadNextMessage(metadata):
  66. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  67. self.act(on: self.state.readNextRequest(), with: context)
  68. case let .forwardMessageThenReadNextMessage(buffer):
  69. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  70. self.act(on: self.state.readNextRequest(), with: context)
  71. case .forwardEnd:
  72. context.fireChannelRead(self.wrapInboundOut(.end))
  73. case .readNextRequest:
  74. self.act(on: self.state.readNextRequest(), with: context)
  75. case let .write(part, promise, insertFlush):
  76. context.write(self.wrapOutboundOut(part), promise: promise)
  77. if insertFlush {
  78. context.flush()
  79. }
  80. case let .completePromise(promise, result):
  81. promise?.completeWith(result)
  82. }
  83. }
  84. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  85. let payload = self.unwrapInboundIn(data)
  86. switch payload {
  87. case let .headers(payload):
  88. let action = self.state.receive(
  89. headers: payload.headers,
  90. eventLoop: context.eventLoop,
  91. errorDelegate: nil,
  92. logger: self.logger
  93. )
  94. self.act(on: action, with: context)
  95. case let .data(payload):
  96. switch payload.data {
  97. case var .byteBuffer(buffer):
  98. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  99. self.act(on: action, with: context)
  100. case .fileRegion:
  101. preconditionFailure("Unexpected IOData.fileRegion")
  102. }
  103. // Ignored.
  104. case .alternativeService,
  105. .goAway,
  106. .origin,
  107. .ping,
  108. .priority,
  109. .pushPromise,
  110. .rstStream,
  111. .settings,
  112. .windowUpdate:
  113. ()
  114. }
  115. }
  116. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  117. let responsePart = self.unwrapOutboundIn(data)
  118. let action: HTTP2ToRawGRPCStateMachine.Action
  119. switch responsePart {
  120. case let .metadata(headers):
  121. action = self.state.send(headers: headers, promise: promise)
  122. case let .message(buffer, metadata):
  123. action = self.state.send(
  124. buffer: buffer,
  125. allocator: context.channel.allocator,
  126. compress: metadata.compress,
  127. promise: promise
  128. )
  129. case let .end(status, trailers):
  130. action = self.state.send(status: status, trailers: trailers, promise: promise)
  131. }
  132. self.act(on: action, with: context)
  133. }
  134. }