2
0

HTTPProtocolSwitcher.swift 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. /// Channel handler that creates different processing pipelines depending on whether
  22. /// the incoming request is HTTP 1 or 2.
  23. internal class HTTPProtocolSwitcher {
  24. private let handlersInitializer: (Channel, Logger) -> EventLoopFuture<Void>
  25. private let errorDelegate: ServerErrorDelegate?
  26. private let logger: Logger
  27. private let httpTargetWindowSize: Int
  28. private let keepAlive: ServerConnectionKeepalive
  29. private let idleTimeout: TimeAmount
  30. private let scheme: String
  31. // We could receive additional data after the initial data and before configuring
  32. // the pipeline; buffer it and fire it down the pipeline once it is configured.
  33. private enum State {
  34. case notConfigured
  35. case configuring
  36. case configured
  37. }
  38. private var state: State = .notConfigured
  39. private var bufferedData: [NIOAny] = []
  40. init(
  41. errorDelegate: ServerErrorDelegate?,
  42. httpTargetWindowSize: Int = 65535,
  43. keepAlive: ServerConnectionKeepalive,
  44. idleTimeout: TimeAmount,
  45. scheme: String,
  46. logger: Logger,
  47. handlersInitializer: @escaping (Channel, Logger) -> EventLoopFuture<Void>
  48. ) {
  49. self.errorDelegate = errorDelegate
  50. self.httpTargetWindowSize = httpTargetWindowSize
  51. self.keepAlive = keepAlive
  52. self.idleTimeout = idleTimeout
  53. self.scheme = scheme
  54. self.logger = logger
  55. self.handlersInitializer = handlersInitializer
  56. }
  57. }
  58. extension HTTPProtocolSwitcher: ChannelInboundHandler, RemovableChannelHandler {
  59. typealias InboundIn = ByteBuffer
  60. typealias InboundOut = ByteBuffer
  61. enum HTTPProtocolVersionError: Error {
  62. /// Raised when it wasn't possible to detect HTTP Protocol version.
  63. case invalidHTTPProtocolVersion
  64. var localizedDescription: String {
  65. switch self {
  66. case .invalidHTTPProtocolVersion:
  67. return "Could not identify HTTP Protocol Version"
  68. }
  69. }
  70. }
  71. /// HTTP Protocol Version type
  72. enum HTTPProtocolVersion {
  73. case http1
  74. case http2
  75. }
  76. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  77. switch self.state {
  78. case .notConfigured:
  79. self.logger.debug("determining http protocol version")
  80. self.state = .configuring
  81. self.logger.debug("buffering data", metadata: ["data": "\(data)"])
  82. self.bufferedData.append(data)
  83. // Detect the HTTP protocol version for the incoming request, or error out if it
  84. // couldn't be detected.
  85. var inBuffer = self.unwrapInboundIn(data)
  86. guard let initialData = inBuffer.readString(length: inBuffer.readableBytes),
  87. let firstLine = initialData.split(
  88. separator: "\r\n",
  89. maxSplits: 1,
  90. omittingEmptySubsequences: true
  91. ).first else {
  92. self.logger.error("unable to determine http version")
  93. context.fireErrorCaught(HTTPProtocolVersionError.invalidHTTPProtocolVersion)
  94. return
  95. }
  96. let version: HTTPProtocolVersion
  97. if firstLine.contains("HTTP/2") {
  98. version = .http2
  99. } else if firstLine.contains("HTTP/1") {
  100. version = .http1
  101. } else {
  102. self.logger.error("unable to determine http version")
  103. context.fireErrorCaught(HTTPProtocolVersionError.invalidHTTPProtocolVersion)
  104. return
  105. }
  106. self.logger.debug("determined http version", metadata: ["http_version": "\(version)"])
  107. // Once configured remove ourself from the pipeline, or handle the error.
  108. let pipelineConfigured: EventLoopPromise<Void> = context.eventLoop.makePromise()
  109. pipelineConfigured.futureResult.whenComplete { result in
  110. switch result {
  111. case .success:
  112. context.pipeline.removeHandler(context: context, promise: nil)
  113. case let .failure(error):
  114. self.state = .notConfigured
  115. self.errorCaught(context: context, error: error)
  116. }
  117. }
  118. // Depending on whether it is HTTP1 or HTTP2, create different processing pipelines.
  119. // Inbound handlers in handlersInitializer should expect HTTPServerRequestPart objects
  120. // and outbound handlers should return HTTPServerResponsePart objects.
  121. switch version {
  122. case .http1:
  123. // Upgrade connections are not handled since gRPC connections already arrive in HTTP2,
  124. // while gRPC-Web does not support HTTP2 at all, so there are no compelling use cases
  125. // to support this.
  126. context.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
  127. context.pipeline.addHandlers([
  128. WebCORSHandler(),
  129. GRPCWebToHTTP2ServerCodec(scheme: self.scheme),
  130. ])
  131. }.flatMap {
  132. self.handlersInitializer(context.channel, self.logger)
  133. }.cascade(to: pipelineConfigured)
  134. case .http2:
  135. context.channel.configureHTTP2Pipeline(
  136. mode: .server,
  137. targetWindowSize: self.httpTargetWindowSize
  138. ) { streamChannel in
  139. var logger = self.logger
  140. // Grab the streamID from the channel.
  141. return streamChannel.getOption(HTTP2StreamChannelOptions.streamID).map { streamID in
  142. logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
  143. return logger
  144. }.recover { _ in
  145. logger[metadataKey: MetadataKey.h2StreamID] = "<unknown>"
  146. return logger
  147. }.flatMap { logger in
  148. self.handlersInitializer(streamChannel, logger)
  149. }
  150. }.flatMap { multiplexer -> EventLoopFuture<Void> in
  151. // Add a keepalive and idle handlers between the two HTTP2 handlers.
  152. let keepaliveHandler = GRPCServerKeepaliveHandler(configuration: self.keepAlive)
  153. let idleHandler = GRPCIdleHandler(
  154. mode: .server,
  155. logger: self.logger,
  156. idleTimeout: self.idleTimeout
  157. )
  158. return context.channel.pipeline.addHandlers(
  159. [keepaliveHandler, idleHandler],
  160. position: .before(multiplexer)
  161. )
  162. }
  163. .cascade(to: pipelineConfigured)
  164. }
  165. case .configuring:
  166. self.logger.debug("buffering data", metadata: ["data": "\(data)"])
  167. self.bufferedData.append(data)
  168. case .configured:
  169. self.logger
  170. .critical(
  171. "unexpectedly received data; this handler should have been removed from the pipeline"
  172. )
  173. assertionFailure(
  174. "unexpectedly received data; this handler should have been removed from the pipeline"
  175. )
  176. }
  177. }
  178. func removeHandler(
  179. context: ChannelHandlerContext,
  180. removalToken: ChannelHandlerContext.RemovalToken
  181. ) {
  182. self.logger.debug("unbuffering data")
  183. self.bufferedData.forEach {
  184. context.fireChannelRead($0)
  185. }
  186. context.leavePipeline(removalToken: removalToken)
  187. self.state = .configured
  188. }
  189. func errorCaught(context: ChannelHandlerContext, error: Error) {
  190. switch self.state {
  191. case .notConfigured, .configuring:
  192. let baseError: Error
  193. if let errorWithContext = error as? GRPCError.WithContext {
  194. baseError = errorWithContext.error
  195. } else {
  196. baseError = error
  197. }
  198. self.errorDelegate?.observeLibraryError(baseError)
  199. context.close(mode: .all, promise: nil)
  200. case .configured:
  201. // If we're configured we will rely on a handler further down the pipeline.
  202. context.fireErrorCaught(error)
  203. }
  204. }
  205. }