HTTP2ToRawGRPCServerCodec.swift 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServerResponseWriter {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  23. typealias OutboundOut = HTTP2Frame.FramePayload
  24. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  25. private var logger: Logger
  26. private var state: HTTP2ToRawGRPCStateMachine
  27. private let errorDelegate: ServerErrorDelegate?
  28. init(
  29. servicesByName: [Substring: CallHandlerProvider],
  30. encoding: ServerMessageEncoding,
  31. errorDelegate: ServerErrorDelegate?,
  32. normalizeHeaders: Bool,
  33. logger: Logger
  34. ) {
  35. self.logger = logger
  36. self.errorDelegate = errorDelegate
  37. self.state = HTTP2ToRawGRPCStateMachine(
  38. services: servicesByName,
  39. encoding: encoding,
  40. normalizeHeaders: normalizeHeaders
  41. )
  42. }
  43. /// Called when the pipeline has finished configuring.
  44. private func configured(context: ChannelHandlerContext) {
  45. self.act(on: self.state.pipelineConfigured(), with: context)
  46. }
  47. /// Act on an action returned from the state machine.
  48. private func act(
  49. on action: HTTP2ToRawGRPCStateMachine.Action,
  50. with context: ChannelHandlerContext
  51. ) {
  52. switch action {
  53. case .none:
  54. ()
  55. case let .configure(handler):
  56. context.channel.pipeline.addHandler(handler).whenSuccess {
  57. self.configured(context: context)
  58. }
  59. case let .errorCaught(error):
  60. context.fireErrorCaught(error)
  61. case let .forwardHeaders(metadata):
  62. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  63. case let .forwardMessage(buffer):
  64. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  65. case let .forwardMessageAndEnd(buffer):
  66. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  67. context.fireChannelRead(self.wrapInboundOut(.end))
  68. case let .forwardHeadersThenReadNextMessage(metadata):
  69. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  70. self.act(on: self.state.readNextRequest(), with: context)
  71. case let .forwardMessageThenReadNextMessage(buffer):
  72. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  73. self.act(on: self.state.readNextRequest(), with: context)
  74. case .forwardEnd:
  75. context.fireChannelRead(self.wrapInboundOut(.end))
  76. case .readNextRequest:
  77. self.act(on: self.state.readNextRequest(), with: context)
  78. case let .write(part, promise, insertFlush):
  79. context.write(self.wrapOutboundOut(part), promise: promise)
  80. if insertFlush {
  81. context.flush()
  82. }
  83. case let .completePromise(promise, result):
  84. promise?.completeWith(result)
  85. }
  86. }
  87. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  88. let payload = self.unwrapInboundIn(data)
  89. switch payload {
  90. case let .headers(payload):
  91. let action = self.state.receive(
  92. headers: payload.headers,
  93. eventLoop: context.eventLoop,
  94. errorDelegate: self.errorDelegate,
  95. remoteAddress: context.channel.remoteAddress,
  96. logger: self.logger,
  97. allocator: context.channel.allocator,
  98. responseWriter: self
  99. )
  100. self.act(on: action, with: context)
  101. case let .data(payload):
  102. switch payload.data {
  103. case var .byteBuffer(buffer):
  104. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  105. self.act(on: action, with: context)
  106. case .fileRegion:
  107. preconditionFailure("Unexpected IOData.fileRegion")
  108. }
  109. // Ignored.
  110. case .alternativeService,
  111. .goAway,
  112. .origin,
  113. .ping,
  114. .priority,
  115. .pushPromise,
  116. .rstStream,
  117. .settings,
  118. .windowUpdate:
  119. ()
  120. }
  121. }
  122. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  123. let responsePart = self.unwrapOutboundIn(data)
  124. let action: HTTP2ToRawGRPCStateMachine.Action
  125. switch responsePart {
  126. case let .metadata(headers):
  127. action = self.state.send(headers: headers, promise: promise)
  128. case let .message(buffer, metadata):
  129. action = self.state.send(
  130. buffer: buffer,
  131. allocator: context.channel.allocator,
  132. compress: metadata.compress,
  133. promise: promise
  134. )
  135. case let .end(status, trailers):
  136. action = self.state.send(status: status, trailers: trailers, promise: promise)
  137. }
  138. self.act(on: action, with: context)
  139. }
  140. internal func sendMetadata(
  141. _ metadata: HPACKHeaders,
  142. promise: EventLoopPromise<Void>?
  143. ) {
  144. fatalError("TODO: not used yet")
  145. }
  146. internal func sendMessage(
  147. _ bytes: ByteBuffer,
  148. metadata: MessageMetadata,
  149. promise: EventLoopPromise<Void>?
  150. ) {
  151. fatalError("TODO: not used yet")
  152. }
  153. internal func sendEnd(
  154. status: GRPCStatus,
  155. trailers: HPACKHeaders,
  156. promise: EventLoopPromise<Void>?
  157. ) {
  158. fatalError("TODO: not used yet")
  159. }
  160. }