GRPCServerPipelineConfigurator.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. import NIOTLS
  22. /// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
  23. /// version used for transport.
  24. ///
  25. /// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
  26. /// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
  27. /// configured then the HTTP version is determined by parsing the inbound byte stream.
  28. final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
  29. internal typealias InboundIn = ByteBuffer
  30. internal typealias InboundOut = ByteBuffer
  31. /// The server configuration.
  32. private let configuration: Server.Configuration
  33. /// A buffer containing the buffered bytes.
  34. private var buffer: ByteBuffer?
  35. /// The current state.
  36. private var state: State
  37. private enum ALPN {
  38. /// ALPN is expected. It may or may not be required, however.
  39. case expected(required: Bool)
  40. /// ALPN was expected but not required and no protocol was negotiated in the handshake. We may
  41. /// now fall back to parsing bytes on the connection.
  42. case expectedButFallingBack
  43. /// ALPN is not expected; this is a cleartext connection.
  44. case notExpected
  45. }
  46. private enum State {
  47. /// The pipeline isn't configured yet.
  48. case notConfigured(alpn: ALPN)
  49. /// We're configuring the pipeline.
  50. case configuring
  51. }
  52. init(configuration: Server.Configuration) {
  53. if let tls = configuration.tlsConfiguration {
  54. self.state = .notConfigured(alpn: .expected(required: tls.requireALPN))
  55. } else {
  56. self.state = .notConfigured(alpn: .notExpected)
  57. }
  58. self.configuration = configuration
  59. }
  60. /// Makes a gRPC idle handler for the server..
  61. private func makeIdleHandler() -> GRPCIdleHandler {
  62. return .init(
  63. idleTimeout: self.configuration.connectionIdleTimeout,
  64. keepalive: self.configuration.connectionKeepalive,
  65. logger: self.configuration.logger
  66. )
  67. }
  68. /// Makes an HTTP/2 handler.
  69. private func makeHTTP2Handler() -> NIOHTTP2Handler {
  70. var configuration = NIOHTTP2Handler.ConnectionConfiguration()
  71. configuration.initialSettings = [
  72. HTTP2Setting(
  73. parameter: .maxConcurrentStreams,
  74. value: self.configuration.httpMaxConcurrentStreams
  75. ),
  76. HTTP2Setting(
  77. parameter: .maxHeaderListSize,
  78. value: HPACKDecoder.defaultMaxHeaderListSize
  79. ),
  80. HTTP2Setting(
  81. parameter: .maxFrameSize,
  82. value: self.configuration.httpMaxFrameSize
  83. ),
  84. HTTP2Setting(
  85. parameter: .initialWindowSize,
  86. value: self.configuration.httpTargetWindowSize
  87. ),
  88. ]
  89. configuration.maximumRecentlyResetStreams = self.configuration.httpMaxResetStreams
  90. return NIOHTTP2Handler(mode: .server, connectionConfiguration: configuration)
  91. }
  92. /// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
  93. private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
  94. return .init(
  95. mode: .server,
  96. channel: channel,
  97. targetWindowSize: self.configuration.httpTargetWindowSize
  98. ) { [logger = self.configuration.logger] stream in
  99. // Sync options were added to the HTTP/2 stream channel in 1.17.0 (we require at least this)
  100. // so this shouldn't be `nil`, but it's not a problem if it is.
  101. let http2StreamID = try? stream.syncOptions?.getOption(HTTP2StreamChannelOptions.streamID)
  102. let streamID =
  103. http2StreamID.map { streamID in
  104. return String(Int(streamID))
  105. } ?? "<unknown>"
  106. var logger = logger
  107. logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
  108. do {
  109. // TODO: provide user configuration for header normalization.
  110. let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
  111. try stream.pipeline.syncOperations.addHandler(handler)
  112. return stream.eventLoop.makeSucceededVoidFuture()
  113. } catch {
  114. return stream.eventLoop.makeFailedFuture(error)
  115. }
  116. }
  117. }
  118. /// Makes an HTTP/2 to raw gRPC server handler.
  119. private func makeHTTP2ToRawGRPCHandler(
  120. normalizeHeaders: Bool,
  121. logger: Logger
  122. ) -> HTTP2ToRawGRPCServerCodec {
  123. return HTTP2ToRawGRPCServerCodec(
  124. servicesByName: self.configuration.serviceProvidersByName,
  125. encoding: self.configuration.messageEncoding,
  126. errorDelegate: self.configuration.errorDelegate,
  127. normalizeHeaders: normalizeHeaders,
  128. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  129. logger: logger
  130. )
  131. }
  132. /// The pipeline finished configuring.
  133. private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
  134. switch result {
  135. case .success:
  136. context.pipeline.removeHandler(context: context, promise: nil)
  137. case let .failure(error):
  138. self.errorCaught(context: context, error: error)
  139. }
  140. }
  141. /// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
  142. private func configureHTTP2(context: ChannelHandlerContext) {
  143. // We're now configuring the pipeline.
  144. self.state = .configuring
  145. // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
  146. // to then insert our keepalive and idle handlers between. We can just add everything together.
  147. let result: Result<Void, Error>
  148. do {
  149. // This is only ever called as a result of reading a user inbound event or reading inbound so
  150. // we'll be on the right event loop and sync operations are fine.
  151. let sync = context.pipeline.syncOperations
  152. try sync.addHandler(self.makeHTTP2Handler())
  153. try sync.addHandler(self.makeIdleHandler())
  154. try sync.addHandler(self.makeHTTP2Multiplexer(for: context.channel))
  155. result = .success(())
  156. } catch {
  157. result = .failure(error)
  158. }
  159. self.configurationCompleted(result: result, context: context)
  160. }
  161. /// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
  162. private func configureHTTP1(context: ChannelHandlerContext) {
  163. // We're now configuring the pipeline.
  164. self.state = .configuring
  165. let result: Result<Void, Error>
  166. do {
  167. // This is only ever called as a result of reading a user inbound event or reading inbound so
  168. // we'll be on the right event loop and sync operations are fine.
  169. let sync = context.pipeline.syncOperations
  170. try sync.configureHTTPServerPipeline(withErrorHandling: true)
  171. try sync.addHandler(WebCORSHandler(configuration: self.configuration.webCORS))
  172. let scheme = self.configuration.tlsConfiguration == nil ? "http" : "https"
  173. try sync.addHandler(GRPCWebToHTTP2ServerCodec(scheme: scheme))
  174. // There's no need to normalize headers for HTTP/1.
  175. try sync.addHandler(
  176. self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger)
  177. )
  178. result = .success(())
  179. } catch {
  180. result = .failure(error)
  181. }
  182. self.configurationCompleted(result: result, context: context)
  183. }
  184. /// Attempts to determine the HTTP version from the buffer and then configure the pipeline
  185. /// appropriately. Closes the connection if the HTTP version could not be determined.
  186. private func determineHTTPVersionAndConfigurePipeline(
  187. buffer: ByteBuffer,
  188. context: ChannelHandlerContext
  189. ) {
  190. switch HTTPVersionParser.determineHTTPVersion(buffer) {
  191. case .http2:
  192. self.configureHTTP2(context: context)
  193. case .http1:
  194. self.configureHTTP1(context: context)
  195. case .unknown:
  196. // Neither H2 nor H1 or the length limit has been exceeded.
  197. self.configuration.logger.error("Unable to determine http version, closing")
  198. context.close(mode: .all, promise: nil)
  199. case .notEnoughBytes:
  200. () // Try again with more bytes.
  201. }
  202. }
  203. /// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
  204. /// requests.
  205. private func handleHandshakeCompletedEvent(
  206. _ event: TLSUserEvent,
  207. alpnIsRequired: Bool,
  208. context: ChannelHandlerContext
  209. ) {
  210. switch event {
  211. case let .handshakeCompleted(negotiatedProtocol):
  212. let tlsVersion = try? context.channel.getTLSVersionSync()
  213. self.configuration.logger.debug(
  214. "TLS handshake completed",
  215. metadata: [
  216. "alpn": "\(negotiatedProtocol ?? "nil")",
  217. "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
  218. ]
  219. )
  220. switch negotiatedProtocol {
  221. case let .some(negotiated):
  222. if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
  223. self.configureHTTP2(context: context)
  224. } else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
  225. self.configureHTTP1(context: context)
  226. } else {
  227. self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
  228. context.close(mode: .all, promise: nil)
  229. }
  230. case .none:
  231. if alpnIsRequired {
  232. self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
  233. context.close(mode: .all, promise: nil)
  234. } else {
  235. self.configuration.logger.warning("No ALPN protocol negotiated'")
  236. // We're now falling back to parsing bytes.
  237. self.state = .notConfigured(alpn: .expectedButFallingBack)
  238. self.tryParsingBufferedData(context: context)
  239. }
  240. }
  241. case .shutdownCompleted:
  242. // We don't care about this here.
  243. ()
  244. }
  245. }
  246. /// Try to parse the buffered data to determine whether or not HTTP/2 or HTTP/1 should be used.
  247. private func tryParsingBufferedData(context: ChannelHandlerContext) {
  248. if let buffer = self.buffer {
  249. self.determineHTTPVersionAndConfigurePipeline(buffer: buffer, context: context)
  250. }
  251. }
  252. // MARK: - Channel Handler
  253. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  254. if let delegate = self.configuration.errorDelegate {
  255. let baseError: Error
  256. if let errorWithContext = error as? GRPCError.WithContext {
  257. baseError = errorWithContext.error
  258. } else {
  259. baseError = error
  260. }
  261. delegate.observeLibraryError(baseError)
  262. }
  263. context.close(mode: .all, promise: nil)
  264. }
  265. internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  266. switch self.state {
  267. case let .notConfigured(alpn: .expected(required)):
  268. if let event = event as? TLSUserEvent {
  269. self.handleHandshakeCompletedEvent(event, alpnIsRequired: required, context: context)
  270. }
  271. case .notConfigured(alpn: .expectedButFallingBack),
  272. .notConfigured(alpn: .notExpected),
  273. .configuring:
  274. ()
  275. }
  276. context.fireUserInboundEventTriggered(event)
  277. }
  278. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  279. var buffer = self.unwrapInboundIn(data)
  280. self.buffer.setOrWriteBuffer(&buffer)
  281. switch self.state {
  282. case .notConfigured(alpn: .notExpected),
  283. .notConfigured(alpn: .expectedButFallingBack):
  284. // If ALPN isn't expected, or we didn't negotiate via ALPN and we don't require it then we
  285. // can try parsing the data we just buffered.
  286. self.tryParsingBufferedData(context: context)
  287. case .notConfigured(alpn: .expected),
  288. .configuring:
  289. // We expect ALPN or we're being configured, just buffer the data, we'll forward it later.
  290. ()
  291. }
  292. // Don't forward the reads: we'll do so when we have configured the pipeline.
  293. }
  294. internal func removeHandler(
  295. context: ChannelHandlerContext,
  296. removalToken: ChannelHandlerContext.RemovalToken
  297. ) {
  298. // Forward any buffered reads.
  299. if let buffer = self.buffer {
  300. self.buffer = nil
  301. context.fireChannelRead(self.wrapInboundOut(buffer))
  302. }
  303. context.leavePipeline(removalToken: removalToken)
  304. }
  305. }
  306. // MARK: - HTTP Version Parser
  307. struct HTTPVersionParser {
  308. /// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
  309. private static let http2ClientMagic = [
  310. UInt8(ascii: "P"),
  311. UInt8(ascii: "R"),
  312. UInt8(ascii: "I"),
  313. UInt8(ascii: " "),
  314. UInt8(ascii: "*"),
  315. UInt8(ascii: " "),
  316. UInt8(ascii: "H"),
  317. UInt8(ascii: "T"),
  318. UInt8(ascii: "T"),
  319. UInt8(ascii: "P"),
  320. UInt8(ascii: "/"),
  321. UInt8(ascii: "2"),
  322. UInt8(ascii: "."),
  323. UInt8(ascii: "0"),
  324. UInt8(ascii: "\r"),
  325. UInt8(ascii: "\n"),
  326. UInt8(ascii: "\r"),
  327. UInt8(ascii: "\n"),
  328. UInt8(ascii: "S"),
  329. UInt8(ascii: "M"),
  330. UInt8(ascii: "\r"),
  331. UInt8(ascii: "\n"),
  332. UInt8(ascii: "\r"),
  333. UInt8(ascii: "\n"),
  334. ]
  335. /// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
  336. /// connection preface.
  337. static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> SubParseResult {
  338. let view = buffer.readableBytesView
  339. guard view.count >= HTTPVersionParser.http2ClientMagic.count else {
  340. // Not enough bytes.
  341. return .notEnoughBytes
  342. }
  343. let slice = view[view.startIndex ..< view.startIndex.advanced(by: self.http2ClientMagic.count)]
  344. return slice.elementsEqual(HTTPVersionParser.http2ClientMagic) ? .accepted : .rejected
  345. }
  346. enum ParseResult: Hashable {
  347. case http1
  348. case http2
  349. case unknown
  350. case notEnoughBytes
  351. }
  352. enum SubParseResult: Hashable {
  353. case accepted
  354. case rejected
  355. case notEnoughBytes
  356. }
  357. private static let maxLengthToCheck = 1024
  358. static func determineHTTPVersion(_ buffer: ByteBuffer) -> ParseResult {
  359. switch Self.prefixedWithHTTP2ConnectionPreface(buffer) {
  360. case .accepted:
  361. return .http2
  362. case .notEnoughBytes:
  363. switch Self.prefixedWithHTTP1RequestLine(buffer) {
  364. case .accepted:
  365. // Not enough bytes to check H2, but enough to confirm H1.
  366. return .http1
  367. case .notEnoughBytes:
  368. // Not enough bytes to check H2 or H1.
  369. return .notEnoughBytes
  370. case .rejected:
  371. // Not enough bytes to check H2 and definitely not H1.
  372. return .notEnoughBytes
  373. }
  374. case .rejected:
  375. switch Self.prefixedWithHTTP1RequestLine(buffer) {
  376. case .accepted:
  377. // Not H2, but H1 is confirmed.
  378. return .http1
  379. case .notEnoughBytes:
  380. // Not H2, but not enough bytes to reject H1 yet.
  381. return .notEnoughBytes
  382. case .rejected:
  383. // Not H2 or H1.
  384. return .unknown
  385. }
  386. }
  387. }
  388. private static let http1_1 = [
  389. UInt8(ascii: "H"),
  390. UInt8(ascii: "T"),
  391. UInt8(ascii: "T"),
  392. UInt8(ascii: "P"),
  393. UInt8(ascii: "/"),
  394. UInt8(ascii: "1"),
  395. UInt8(ascii: "."),
  396. UInt8(ascii: "1"),
  397. ]
  398. /// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
  399. static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> SubParseResult {
  400. var readableBytesView = buffer.readableBytesView
  401. // We don't need to validate the request line, only determine whether we think it's an HTTP1
  402. // request line. Another handler will parse it properly.
  403. // From RFC 2616 § 5.1:
  404. // Request-Line = Method SP Request-URI SP HTTP-Version CRLF
  405. // Get through the first space.
  406. guard readableBytesView.dropPrefix(through: UInt8(ascii: " ")) != nil else {
  407. let tooLong = buffer.readableBytes > Self.maxLengthToCheck
  408. return tooLong ? .rejected : .notEnoughBytes
  409. }
  410. // Get through the second space.
  411. guard readableBytesView.dropPrefix(through: UInt8(ascii: " ")) != nil else {
  412. let tooLong = buffer.readableBytes > Self.maxLengthToCheck
  413. return tooLong ? .rejected : .notEnoughBytes
  414. }
  415. // +2 for \r\n
  416. guard readableBytesView.count >= (Self.http1_1.count + 2) else {
  417. return .notEnoughBytes
  418. }
  419. guard let version = readableBytesView.dropPrefix(through: UInt8(ascii: "\r")),
  420. readableBytesView.first == UInt8(ascii: "\n")
  421. else {
  422. // If we didn't drop the prefix OR we did and the next byte wasn't '\n', then we had enough
  423. // bytes but the '\r\n' wasn't present: reject this as being HTTP1.
  424. return .rejected
  425. }
  426. return version.elementsEqual(Self.http1_1) ? .accepted : .rejected
  427. }
  428. }
  429. extension Collection where Self == Self.SubSequence, Self.Element: Equatable {
  430. /// Drops the prefix off the collection up to and including the first `separator`
  431. /// only if that separator appears in the collection.
  432. ///
  433. /// Returns the prefix up to but not including the separator if it was found, nil otherwise.
  434. mutating func dropPrefix(through separator: Element) -> SubSequence? {
  435. if self.isEmpty {
  436. return nil
  437. }
  438. guard let separatorIndex = self.firstIndex(of: separator) else {
  439. return nil
  440. }
  441. let prefix = self[..<separatorIndex]
  442. self = self[self.index(after: separatorIndex)...]
  443. return prefix
  444. }
  445. }