HTTP2ToRawGRPCServerCodec.swift 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  23. typealias OutboundOut = HTTP2Frame.FramePayload
  24. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  25. private var logger: Logger
  26. private var state: HTTP2ToRawGRPCStateMachine
  27. private let errorDelegate: ServerErrorDelegate?
  28. init(
  29. servicesByName: [Substring: CallHandlerProvider],
  30. encoding: ServerMessageEncoding,
  31. errorDelegate: ServerErrorDelegate?,
  32. normalizeHeaders: Bool,
  33. logger: Logger
  34. ) {
  35. self.logger = logger
  36. self.errorDelegate = errorDelegate
  37. self.state = HTTP2ToRawGRPCStateMachine(
  38. services: servicesByName,
  39. encoding: encoding,
  40. normalizeHeaders: normalizeHeaders
  41. )
  42. }
  43. /// Called when the pipeline has finished configuring.
  44. private func configured(context: ChannelHandlerContext) {
  45. self.act(on: self.state.pipelineConfigured(), with: context)
  46. }
  47. /// Act on an action returned from the state machine.
  48. private func act(
  49. on action: HTTP2ToRawGRPCStateMachine.Action,
  50. with context: ChannelHandlerContext
  51. ) {
  52. switch action {
  53. case .none:
  54. ()
  55. case let .configure(handler):
  56. context.channel.pipeline.addHandler(handler).whenSuccess {
  57. self.configured(context: context)
  58. }
  59. case let .errorCaught(error):
  60. context.fireErrorCaught(error)
  61. case let .forwardHeaders(metadata):
  62. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  63. case let .forwardMessage(buffer):
  64. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  65. case let .forwardMessageAndEnd(buffer):
  66. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  67. context.fireChannelRead(self.wrapInboundOut(.end))
  68. case let .forwardHeadersThenReadNextMessage(metadata):
  69. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  70. self.act(on: self.state.readNextRequest(), with: context)
  71. case let .forwardMessageThenReadNextMessage(buffer):
  72. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  73. self.act(on: self.state.readNextRequest(), with: context)
  74. case .forwardEnd:
  75. context.fireChannelRead(self.wrapInboundOut(.end))
  76. case .readNextRequest:
  77. self.act(on: self.state.readNextRequest(), with: context)
  78. case let .write(part, promise, insertFlush):
  79. context.write(self.wrapOutboundOut(part), promise: promise)
  80. if insertFlush {
  81. context.flush()
  82. }
  83. case let .completePromise(promise, result):
  84. promise?.completeWith(result)
  85. }
  86. }
  87. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  88. let payload = self.unwrapInboundIn(data)
  89. switch payload {
  90. case let .headers(payload):
  91. let action = self.state.receive(
  92. headers: payload.headers,
  93. eventLoop: context.eventLoop,
  94. errorDelegate: self.errorDelegate,
  95. remoteAddress: context.channel.remoteAddress,
  96. logger: self.logger
  97. )
  98. self.act(on: action, with: context)
  99. case let .data(payload):
  100. switch payload.data {
  101. case var .byteBuffer(buffer):
  102. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  103. self.act(on: action, with: context)
  104. case .fileRegion:
  105. preconditionFailure("Unexpected IOData.fileRegion")
  106. }
  107. // Ignored.
  108. case .alternativeService,
  109. .goAway,
  110. .origin,
  111. .ping,
  112. .priority,
  113. .pushPromise,
  114. .rstStream,
  115. .settings,
  116. .windowUpdate:
  117. ()
  118. }
  119. }
  120. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  121. let responsePart = self.unwrapOutboundIn(data)
  122. let action: HTTP2ToRawGRPCStateMachine.Action
  123. switch responsePart {
  124. case let .metadata(headers):
  125. action = self.state.send(headers: headers, promise: promise)
  126. case let .message(buffer, metadata):
  127. action = self.state.send(
  128. buffer: buffer,
  129. allocator: context.channel.allocator,
  130. compress: metadata.compress,
  131. promise: promise
  132. )
  133. case let .end(status, trailers):
  134. action = self.state.send(status: status, trailers: trailers, promise: promise)
  135. }
  136. self.act(on: action, with: context)
  137. }
  138. }