GRPCServerPipelineConfigurator.swift 13 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import NIOTLS
  21. /// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
  22. /// version used for transport.
  23. ///
  24. /// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
  25. /// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
  26. /// configured then the HTTP version is determined by parsing the inbound byte stream.
  27. final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
  28. internal typealias InboundIn = ByteBuffer
  29. internal typealias InboundOut = ByteBuffer
  30. /// The server configuration.
  31. private let configuration: Server.Configuration
  32. /// Reads which we're holding on to before the pipeline is configured.
  33. private var bufferedReads = CircularBuffer<NIOAny>()
  34. /// The current state.
  35. private var state: State
  36. private enum ALPN {
  37. /// ALPN is expected. It may or may not be required, however.
  38. case expected(required: Bool)
  39. /// ALPN was expected but not required and no protocol was negotiated in the handshake. We may
  40. /// now fall back to parsing bytes on the connection.
  41. case expectedButFallingBack
  42. /// ALPN is not expected; this is a cleartext connection.
  43. case notExpected
  44. }
  45. private enum State {
  46. /// The pipeline isn't configured yet.
  47. case notConfigured(alpn: ALPN)
  48. /// We're configuring the pipeline.
  49. case configuring
  50. }
  51. init(configuration: Server.Configuration) {
  52. if let tls = configuration.tls {
  53. self.state = .notConfigured(alpn: .expected(required: tls.requireALPN))
  54. } else {
  55. self.state = .notConfigured(alpn: .notExpected)
  56. }
  57. self.configuration = configuration
  58. }
  59. /// Makes a gRPC Server keepalive handler.
  60. private func makeKeepaliveHandler() -> GRPCServerKeepaliveHandler {
  61. return .init(configuration: self.configuration.connectionKeepalive)
  62. }
  63. /// Makes a gRPC idle handler for the server..
  64. private func makeIdleHandler() -> GRPCIdleHandler {
  65. return .init(
  66. mode: .server,
  67. logger: self.configuration.logger,
  68. idleTimeout: self.configuration.connectionIdleTimeout
  69. )
  70. }
  71. /// Makes an HTTP/2 handler.
  72. private func makeHTTP2Handler() -> NIOHTTP2Handler {
  73. return .init(mode: .server)
  74. }
  75. /// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
  76. private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
  77. var logger = self.configuration.logger
  78. return .init(
  79. mode: .server,
  80. channel: channel,
  81. targetWindowSize: self.configuration.httpTargetWindowSize
  82. ) { stream in
  83. stream.getOption(HTTP2StreamChannelOptions.streamID).map { streamID -> Logger in
  84. logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
  85. return logger
  86. }.recover { _ in
  87. logger[metadataKey: MetadataKey.h2StreamID] = "<unknown>"
  88. return logger
  89. }.flatMap { logger in
  90. // TODO: provide user configuration for header normalization.
  91. let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
  92. return stream.pipeline.addHandler(handler)
  93. }
  94. }
  95. }
  96. /// Makes an HTTP/2 to raw gRPC server handler.
  97. private func makeHTTP2ToRawGRPCHandler(
  98. normalizeHeaders: Bool,
  99. logger: Logger
  100. ) -> HTTP2ToRawGRPCServerCodec {
  101. return HTTP2ToRawGRPCServerCodec(
  102. servicesByName: self.configuration.serviceProvidersByName,
  103. encoding: self.configuration.messageEncoding,
  104. errorDelegate: self.configuration.errorDelegate,
  105. normalizeHeaders: normalizeHeaders,
  106. logger: logger
  107. )
  108. }
  109. /// The pipeline finished configuring.
  110. private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
  111. switch result {
  112. case .success:
  113. context.pipeline.removeHandler(context: context, promise: nil)
  114. case let .failure(error):
  115. self.errorCaught(context: context, error: error)
  116. }
  117. }
  118. /// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
  119. private func configureHTTP2(context: ChannelHandlerContext) {
  120. // We're now configuring the pipeline.
  121. self.state = .configuring
  122. // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
  123. // to then insert our keepalive and idle handlers between. We can just add everything together.
  124. var handlers: [ChannelHandler] = []
  125. handlers.reserveCapacity(4)
  126. handlers.append(self.makeHTTP2Handler())
  127. handlers.append(self.makeKeepaliveHandler())
  128. handlers.append(self.makeIdleHandler())
  129. handlers.append(self.makeHTTP2Multiplexer(for: context.channel))
  130. // Now configure the pipeline with the handlers.
  131. context.channel.pipeline.addHandlers(handlers).whenComplete { result in
  132. self.configurationCompleted(result: result, context: context)
  133. }
  134. }
  135. /// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
  136. private func configureHTTP1(context: ChannelHandlerContext) {
  137. // We're now configuring the pipeline.
  138. self.state = .configuring
  139. context.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
  140. context.pipeline.addHandlers([
  141. WebCORSHandler(),
  142. GRPCWebToHTTP2ServerCodec(scheme: self.configuration.tls == nil ? "http" : "https"),
  143. // There's no need to normalize headers for HTTP/1.
  144. self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger),
  145. ])
  146. }.whenComplete { result in
  147. self.configurationCompleted(result: result, context: context)
  148. }
  149. }
  150. /// Attempts to determine the HTTP version from the buffer and then configure the pipeline
  151. /// appropriately. Closes the connection if the HTTP version could not be determined.
  152. private func determineHTTPVersionAndConfigurePipeline(
  153. buffer: ByteBuffer,
  154. context: ChannelHandlerContext
  155. ) {
  156. if HTTPVersionParser.prefixedWithHTTP2ConnectionPreface(buffer) {
  157. self.configureHTTP2(context: context)
  158. } else if HTTPVersionParser.prefixedWithHTTP1RequestLine(buffer) {
  159. self.configureHTTP1(context: context)
  160. } else {
  161. self.configuration.logger.error("Unable to determine http version, closing")
  162. context.close(mode: .all, promise: nil)
  163. }
  164. }
  165. /// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
  166. /// requests.
  167. private func handleHandshakeCompletedEvent(
  168. _ event: TLSUserEvent,
  169. alpnIsRequired: Bool,
  170. context: ChannelHandlerContext
  171. ) {
  172. switch event {
  173. case let .handshakeCompleted(negotiatedProtocol):
  174. self.configuration.logger.debug("TLS handshake completed", metadata: [
  175. "alpn": "\(negotiatedProtocol ?? "nil")",
  176. ])
  177. switch negotiatedProtocol {
  178. case let .some(negotiated):
  179. if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
  180. self.configureHTTP2(context: context)
  181. } else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
  182. self.configureHTTP1(context: context)
  183. } else {
  184. self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
  185. context.close(mode: .all, promise: nil)
  186. }
  187. case .none:
  188. if alpnIsRequired {
  189. self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
  190. context.close(mode: .all, promise: nil)
  191. } else {
  192. self.configuration.logger.warning("No ALPN protocol negotiated'")
  193. // We're now falling back to parsing bytes.
  194. self.state = .notConfigured(alpn: .expectedButFallingBack)
  195. self.tryParsingBufferedData(context: context)
  196. }
  197. }
  198. case .shutdownCompleted:
  199. // We don't care about this here.
  200. ()
  201. }
  202. }
  203. /// Try to parse the buffered data to determine whether or not HTTP/2 or HTTP/1 should be used.
  204. private func tryParsingBufferedData(context: ChannelHandlerContext) {
  205. guard let first = self.bufferedReads.first else {
  206. // No data buffered yet. We'll try when we read.
  207. return
  208. }
  209. let buffer = self.unwrapInboundIn(first)
  210. self.determineHTTPVersionAndConfigurePipeline(buffer: buffer, context: context)
  211. }
  212. // MARK: - Channel Handler
  213. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  214. if let delegate = self.configuration.errorDelegate {
  215. let baseError: Error
  216. if let errorWithContext = error as? GRPCError.WithContext {
  217. baseError = errorWithContext.error
  218. } else {
  219. baseError = error
  220. }
  221. delegate.observeLibraryError(baseError)
  222. }
  223. context.close(mode: .all, promise: nil)
  224. }
  225. internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  226. switch self.state {
  227. case let .notConfigured(alpn: .expected(required)):
  228. if let event = event as? TLSUserEvent {
  229. self.handleHandshakeCompletedEvent(event, alpnIsRequired: required, context: context)
  230. }
  231. case .notConfigured(alpn: .expectedButFallingBack),
  232. .notConfigured(alpn: .notExpected),
  233. .configuring:
  234. ()
  235. }
  236. context.fireUserInboundEventTriggered(event)
  237. }
  238. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  239. self.bufferedReads.append(data)
  240. switch self.state {
  241. case .notConfigured(alpn: .notExpected),
  242. .notConfigured(alpn: .expectedButFallingBack):
  243. // If ALPN isn't expected, or we didn't negotiate via ALPN and we don't require it then we
  244. // can try parsing the data we just buffered.
  245. self.tryParsingBufferedData(context: context)
  246. case .notConfigured(alpn: .expected),
  247. .configuring:
  248. // We expect ALPN or we're being configured, just buffer the data, we'll forward it later.
  249. ()
  250. }
  251. // Don't forward the reads: we'll do so when we have configured the pipeline.
  252. }
  253. internal func removeHandler(
  254. context: ChannelHandlerContext,
  255. removalToken: ChannelHandlerContext.RemovalToken
  256. ) {
  257. // Forward any buffered reads.
  258. while let read = self.bufferedReads.popFirst() {
  259. context.fireChannelRead(read)
  260. }
  261. context.leavePipeline(removalToken: removalToken)
  262. }
  263. }
  264. // MARK: - HTTP Version Parser
  265. struct HTTPVersionParser {
  266. /// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
  267. private static let http2ClientMagic = [
  268. UInt8(ascii: "P"),
  269. UInt8(ascii: "R"),
  270. UInt8(ascii: "I"),
  271. UInt8(ascii: " "),
  272. UInt8(ascii: "*"),
  273. UInt8(ascii: " "),
  274. UInt8(ascii: "H"),
  275. UInt8(ascii: "T"),
  276. UInt8(ascii: "T"),
  277. UInt8(ascii: "P"),
  278. UInt8(ascii: "/"),
  279. UInt8(ascii: "2"),
  280. UInt8(ascii: "."),
  281. UInt8(ascii: "0"),
  282. UInt8(ascii: "\r"),
  283. UInt8(ascii: "\n"),
  284. UInt8(ascii: "\r"),
  285. UInt8(ascii: "\n"),
  286. UInt8(ascii: "S"),
  287. UInt8(ascii: "M"),
  288. UInt8(ascii: "\r"),
  289. UInt8(ascii: "\n"),
  290. UInt8(ascii: "\r"),
  291. UInt8(ascii: "\n"),
  292. ]
  293. /// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
  294. /// connection preface.
  295. static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> Bool {
  296. let view = buffer.readableBytesView
  297. guard view.count >= HTTPVersionParser.http2ClientMagic.count else {
  298. // Not enough bytes.
  299. return false
  300. }
  301. let slice = view[view.startIndex ..< view.startIndex.advanced(by: self.http2ClientMagic.count)]
  302. return slice.elementsEqual(HTTPVersionParser.http2ClientMagic)
  303. }
  304. private static let http1_1 = [
  305. UInt8(ascii: "H"),
  306. UInt8(ascii: "T"),
  307. UInt8(ascii: "T"),
  308. UInt8(ascii: "P"),
  309. UInt8(ascii: "/"),
  310. UInt8(ascii: "1"),
  311. UInt8(ascii: "."),
  312. UInt8(ascii: "1"),
  313. ]
  314. /// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
  315. static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> Bool {
  316. var readableBytesView = buffer.readableBytesView
  317. // From RFC 2616 § 5.1:
  318. // Request-Line = Method SP Request-URI SP HTTP-Version CRLF
  319. // Read off the Method and Request-URI (and spaces).
  320. guard readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil,
  321. readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil else {
  322. return false
  323. }
  324. // Read off the HTTP-Version and CR.
  325. guard let versionView = readableBytesView.trimPrefix(to: UInt8(ascii: "\r")) else {
  326. return false
  327. }
  328. // Check that the LF followed the CR.
  329. guard readableBytesView.first == UInt8(ascii: "\n") else {
  330. return false
  331. }
  332. // Now check the HTTP version.
  333. return versionView.elementsEqual(HTTPVersionParser.http1_1)
  334. }
  335. }