HTTP2ToRawGRPCServerCodec.swift 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler {
  21. typealias InboundIn = HTTP2Frame.FramePayload
  22. typealias InboundOut = GRPCServerRequestPart<ByteBuffer>
  23. typealias OutboundOut = HTTP2Frame.FramePayload
  24. typealias OutboundIn = GRPCServerResponsePart<ByteBuffer>
  25. private var logger: Logger
  26. private var state: HTTP2ToRawGRPCStateMachine
  27. private let errorDelegate: ServerErrorDelegate?
  28. init(
  29. servicesByName: [Substring: CallHandlerProvider],
  30. encoding: ServerMessageEncoding,
  31. errorDelegate: ServerErrorDelegate?,
  32. normalizeHeaders: Bool,
  33. logger: Logger
  34. ) {
  35. self.logger = logger
  36. self.errorDelegate = errorDelegate
  37. self.state = HTTP2ToRawGRPCStateMachine(
  38. services: servicesByName,
  39. encoding: encoding,
  40. normalizeHeaders: normalizeHeaders
  41. )
  42. }
  43. /// Called when the pipeline has finished configuring.
  44. private func configured(context: ChannelHandlerContext) {
  45. self.act(on: self.state.pipelineConfigured(), with: context)
  46. }
  47. /// Act on an action returned from the state machine.
  48. private func act(
  49. on action: HTTP2ToRawGRPCStateMachine.Action,
  50. with context: ChannelHandlerContext
  51. ) {
  52. switch action {
  53. case .none:
  54. ()
  55. case let .configure(handler):
  56. context.channel.pipeline.addHandler(handler).whenSuccess {
  57. self.configured(context: context)
  58. }
  59. case let .errorCaught(error):
  60. context.fireErrorCaught(error)
  61. case let .forwardHeaders(metadata):
  62. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  63. case let .forwardMessage(buffer):
  64. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  65. case let .forwardMessageAndEnd(buffer):
  66. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  67. context.fireChannelRead(self.wrapInboundOut(.end))
  68. case let .forwardHeadersThenReadNextMessage(metadata):
  69. context.fireChannelRead(self.wrapInboundOut(.metadata(metadata)))
  70. self.act(on: self.state.readNextRequest(), with: context)
  71. case let .forwardMessageThenReadNextMessage(buffer):
  72. context.fireChannelRead(self.wrapInboundOut(.message(buffer)))
  73. self.act(on: self.state.readNextRequest(), with: context)
  74. case .forwardEnd:
  75. context.fireChannelRead(self.wrapInboundOut(.end))
  76. case .readNextRequest:
  77. self.act(on: self.state.readNextRequest(), with: context)
  78. case let .write(part, promise, insertFlush):
  79. context.write(self.wrapOutboundOut(part), promise: promise)
  80. if insertFlush {
  81. context.flush()
  82. }
  83. case let .completePromise(promise, result):
  84. promise?.completeWith(result)
  85. }
  86. }
  87. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  88. let payload = self.unwrapInboundIn(data)
  89. switch payload {
  90. case let .headers(payload):
  91. let action = self.state.receive(
  92. headers: payload.headers,
  93. eventLoop: context.eventLoop,
  94. errorDelegate: self.errorDelegate,
  95. logger: self.logger
  96. )
  97. self.act(on: action, with: context)
  98. case let .data(payload):
  99. switch payload.data {
  100. case var .byteBuffer(buffer):
  101. let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
  102. self.act(on: action, with: context)
  103. case .fileRegion:
  104. preconditionFailure("Unexpected IOData.fileRegion")
  105. }
  106. // Ignored.
  107. case .alternativeService,
  108. .goAway,
  109. .origin,
  110. .ping,
  111. .priority,
  112. .pushPromise,
  113. .rstStream,
  114. .settings,
  115. .windowUpdate:
  116. ()
  117. }
  118. }
  119. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  120. let responsePart = self.unwrapOutboundIn(data)
  121. let action: HTTP2ToRawGRPCStateMachine.Action
  122. switch responsePart {
  123. case let .metadata(headers):
  124. action = self.state.send(headers: headers, promise: promise)
  125. case let .message(buffer, metadata):
  126. action = self.state.send(
  127. buffer: buffer,
  128. allocator: context.channel.allocator,
  129. compress: metadata.compress,
  130. promise: promise
  131. )
  132. case let .end(status, trailers):
  133. action = self.state.send(status: status, trailers: trailers, promise: promise)
  134. }
  135. self.act(on: action, with: context)
  136. }
  137. }