2
0

GRPCServerPipelineConfigurator.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. import NIOTLS
  22. /// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
  23. /// version used for transport.
  24. ///
  25. /// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
  26. /// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
  27. /// configured then the HTTP version is determined by parsing the inbound byte stream.
  28. final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
  29. internal typealias InboundIn = ByteBuffer
  30. internal typealias InboundOut = ByteBuffer
  31. /// The server configuration.
  32. private let configuration: Server.Configuration
  33. /// Reads which we're holding on to before the pipeline is configured.
  34. private var bufferedReads = CircularBuffer<NIOAny>()
  35. /// The current state.
  36. private var state: State
  37. private enum ALPN {
  38. /// ALPN is expected. It may or may not be required, however.
  39. case expected(required: Bool)
  40. /// ALPN was expected but not required and no protocol was negotiated in the handshake. We may
  41. /// now fall back to parsing bytes on the connection.
  42. case expectedButFallingBack
  43. /// ALPN is not expected; this is a cleartext connection.
  44. case notExpected
  45. }
  46. private enum State {
  47. /// The pipeline isn't configured yet.
  48. case notConfigured(alpn: ALPN)
  49. /// We're configuring the pipeline.
  50. case configuring
  51. }
  52. init(configuration: Server.Configuration) {
  53. if let tls = configuration.tlsConfiguration {
  54. self.state = .notConfigured(alpn: .expected(required: tls.requireALPN))
  55. } else {
  56. self.state = .notConfigured(alpn: .notExpected)
  57. }
  58. self.configuration = configuration
  59. }
  60. /// Makes a gRPC idle handler for the server..
  61. private func makeIdleHandler() -> GRPCIdleHandler {
  62. return .init(
  63. idleTimeout: self.configuration.connectionIdleTimeout,
  64. keepalive: self.configuration.connectionKeepalive,
  65. logger: self.configuration.logger
  66. )
  67. }
  68. /// Makes an HTTP/2 handler.
  69. private func makeHTTP2Handler() -> NIOHTTP2Handler {
  70. return .init(
  71. mode: .server,
  72. initialSettings: [
  73. HTTP2Setting(
  74. parameter: .maxConcurrentStreams,
  75. value: self.configuration.httpMaxConcurrentStreams
  76. ),
  77. HTTP2Setting(
  78. parameter: .maxHeaderListSize,
  79. value: HPACKDecoder.defaultMaxHeaderListSize
  80. ),
  81. HTTP2Setting(
  82. parameter: .maxFrameSize,
  83. value: self.configuration.httpMaxFrameSize
  84. ),
  85. ]
  86. )
  87. }
  88. /// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
  89. private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
  90. var logger = self.configuration.logger
  91. return .init(
  92. mode: .server,
  93. channel: channel,
  94. targetWindowSize: self.configuration.httpTargetWindowSize
  95. ) { stream in
  96. // Sync options were added to the HTTP/2 stream channel in 1.17.0 (we require at least this)
  97. // so this shouldn't be `nil`, but it's not a problem if it is.
  98. let http2StreamID = try? stream.syncOptions?.getOption(HTTP2StreamChannelOptions.streamID)
  99. let streamID = http2StreamID.map { streamID in
  100. return String(Int(streamID))
  101. } ?? "<unknown>"
  102. logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
  103. do {
  104. // TODO: provide user configuration for header normalization.
  105. let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
  106. try stream.pipeline.syncOperations.addHandler(handler)
  107. return stream.eventLoop.makeSucceededVoidFuture()
  108. } catch {
  109. return stream.eventLoop.makeFailedFuture(error)
  110. }
  111. }
  112. }
  113. /// Makes an HTTP/2 to raw gRPC server handler.
  114. private func makeHTTP2ToRawGRPCHandler(
  115. normalizeHeaders: Bool,
  116. logger: Logger
  117. ) -> HTTP2ToRawGRPCServerCodec {
  118. return HTTP2ToRawGRPCServerCodec(
  119. servicesByName: self.configuration.serviceProvidersByName,
  120. encoding: self.configuration.messageEncoding,
  121. errorDelegate: self.configuration.errorDelegate,
  122. normalizeHeaders: normalizeHeaders,
  123. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  124. logger: logger
  125. )
  126. }
  127. /// The pipeline finished configuring.
  128. private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
  129. switch result {
  130. case .success:
  131. context.pipeline.removeHandler(context: context, promise: nil)
  132. case let .failure(error):
  133. self.errorCaught(context: context, error: error)
  134. }
  135. }
  136. /// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
  137. private func configureHTTP2(context: ChannelHandlerContext) {
  138. // We're now configuring the pipeline.
  139. self.state = .configuring
  140. // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
  141. // to then insert our keepalive and idle handlers between. We can just add everything together.
  142. let result: Result<Void, Error>
  143. do {
  144. // This is only ever called as a result of reading a user inbound event or reading inbound so
  145. // we'll be on the right event loop and sync operations are fine.
  146. let sync = context.pipeline.syncOperations
  147. try sync.addHandler(self.makeHTTP2Handler())
  148. try sync.addHandler(self.makeIdleHandler())
  149. try sync.addHandler(self.makeHTTP2Multiplexer(for: context.channel))
  150. result = .success(())
  151. } catch {
  152. result = .failure(error)
  153. }
  154. self.configurationCompleted(result: result, context: context)
  155. }
  156. /// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
  157. private func configureHTTP1(context: ChannelHandlerContext) {
  158. // We're now configuring the pipeline.
  159. self.state = .configuring
  160. let result: Result<Void, Error>
  161. do {
  162. // This is only ever called as a result of reading a user inbound event or reading inbound so
  163. // we'll be on the right event loop and sync operations are fine.
  164. let sync = context.pipeline.syncOperations
  165. try sync.configureHTTPServerPipeline(withErrorHandling: true)
  166. try sync.addHandler(WebCORSHandler())
  167. let scheme = self.configuration.tlsConfiguration == nil ? "http" : "https"
  168. try sync.addHandler(GRPCWebToHTTP2ServerCodec(scheme: scheme))
  169. // There's no need to normalize headers for HTTP/1.
  170. try sync.addHandler(
  171. self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger)
  172. )
  173. result = .success(())
  174. } catch {
  175. result = .failure(error)
  176. }
  177. self.configurationCompleted(result: result, context: context)
  178. }
  179. /// Attempts to determine the HTTP version from the buffer and then configure the pipeline
  180. /// appropriately. Closes the connection if the HTTP version could not be determined.
  181. private func determineHTTPVersionAndConfigurePipeline(
  182. buffer: ByteBuffer,
  183. context: ChannelHandlerContext
  184. ) {
  185. if HTTPVersionParser.prefixedWithHTTP2ConnectionPreface(buffer) {
  186. self.configureHTTP2(context: context)
  187. } else if HTTPVersionParser.prefixedWithHTTP1RequestLine(buffer) {
  188. self.configureHTTP1(context: context)
  189. } else {
  190. self.configuration.logger.error("Unable to determine http version, closing")
  191. context.close(mode: .all, promise: nil)
  192. }
  193. }
  194. /// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
  195. /// requests.
  196. private func handleHandshakeCompletedEvent(
  197. _ event: TLSUserEvent,
  198. alpnIsRequired: Bool,
  199. context: ChannelHandlerContext
  200. ) {
  201. switch event {
  202. case let .handshakeCompleted(negotiatedProtocol):
  203. self.configuration.logger.debug("TLS handshake completed", metadata: [
  204. "alpn": "\(negotiatedProtocol ?? "nil")",
  205. ])
  206. switch negotiatedProtocol {
  207. case let .some(negotiated):
  208. if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
  209. self.configureHTTP2(context: context)
  210. } else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
  211. self.configureHTTP1(context: context)
  212. } else {
  213. self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
  214. context.close(mode: .all, promise: nil)
  215. }
  216. case .none:
  217. if alpnIsRequired {
  218. self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
  219. context.close(mode: .all, promise: nil)
  220. } else {
  221. self.configuration.logger.warning("No ALPN protocol negotiated'")
  222. // We're now falling back to parsing bytes.
  223. self.state = .notConfigured(alpn: .expectedButFallingBack)
  224. self.tryParsingBufferedData(context: context)
  225. }
  226. }
  227. case .shutdownCompleted:
  228. // We don't care about this here.
  229. ()
  230. }
  231. }
  232. /// Try to parse the buffered data to determine whether or not HTTP/2 or HTTP/1 should be used.
  233. private func tryParsingBufferedData(context: ChannelHandlerContext) {
  234. guard let first = self.bufferedReads.first else {
  235. // No data buffered yet. We'll try when we read.
  236. return
  237. }
  238. let buffer = self.unwrapInboundIn(first)
  239. self.determineHTTPVersionAndConfigurePipeline(buffer: buffer, context: context)
  240. }
  241. // MARK: - Channel Handler
  242. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  243. if let delegate = self.configuration.errorDelegate {
  244. let baseError: Error
  245. if let errorWithContext = error as? GRPCError.WithContext {
  246. baseError = errorWithContext.error
  247. } else {
  248. baseError = error
  249. }
  250. delegate.observeLibraryError(baseError)
  251. }
  252. context.close(mode: .all, promise: nil)
  253. }
  254. internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  255. switch self.state {
  256. case let .notConfigured(alpn: .expected(required)):
  257. if let event = event as? TLSUserEvent {
  258. self.handleHandshakeCompletedEvent(event, alpnIsRequired: required, context: context)
  259. }
  260. case .notConfigured(alpn: .expectedButFallingBack),
  261. .notConfigured(alpn: .notExpected),
  262. .configuring:
  263. ()
  264. }
  265. context.fireUserInboundEventTriggered(event)
  266. }
  267. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  268. self.bufferedReads.append(data)
  269. switch self.state {
  270. case .notConfigured(alpn: .notExpected),
  271. .notConfigured(alpn: .expectedButFallingBack):
  272. // If ALPN isn't expected, or we didn't negotiate via ALPN and we don't require it then we
  273. // can try parsing the data we just buffered.
  274. self.tryParsingBufferedData(context: context)
  275. case .notConfigured(alpn: .expected),
  276. .configuring:
  277. // We expect ALPN or we're being configured, just buffer the data, we'll forward it later.
  278. ()
  279. }
  280. // Don't forward the reads: we'll do so when we have configured the pipeline.
  281. }
  282. internal func removeHandler(
  283. context: ChannelHandlerContext,
  284. removalToken: ChannelHandlerContext.RemovalToken
  285. ) {
  286. // Forward any buffered reads.
  287. while let read = self.bufferedReads.popFirst() {
  288. context.fireChannelRead(read)
  289. }
  290. context.leavePipeline(removalToken: removalToken)
  291. }
  292. }
  293. // MARK: - HTTP Version Parser
  294. struct HTTPVersionParser {
  295. /// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
  296. private static let http2ClientMagic = [
  297. UInt8(ascii: "P"),
  298. UInt8(ascii: "R"),
  299. UInt8(ascii: "I"),
  300. UInt8(ascii: " "),
  301. UInt8(ascii: "*"),
  302. UInt8(ascii: " "),
  303. UInt8(ascii: "H"),
  304. UInt8(ascii: "T"),
  305. UInt8(ascii: "T"),
  306. UInt8(ascii: "P"),
  307. UInt8(ascii: "/"),
  308. UInt8(ascii: "2"),
  309. UInt8(ascii: "."),
  310. UInt8(ascii: "0"),
  311. UInt8(ascii: "\r"),
  312. UInt8(ascii: "\n"),
  313. UInt8(ascii: "\r"),
  314. UInt8(ascii: "\n"),
  315. UInt8(ascii: "S"),
  316. UInt8(ascii: "M"),
  317. UInt8(ascii: "\r"),
  318. UInt8(ascii: "\n"),
  319. UInt8(ascii: "\r"),
  320. UInt8(ascii: "\n"),
  321. ]
  322. /// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
  323. /// connection preface.
  324. static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> Bool {
  325. let view = buffer.readableBytesView
  326. guard view.count >= HTTPVersionParser.http2ClientMagic.count else {
  327. // Not enough bytes.
  328. return false
  329. }
  330. let slice = view[view.startIndex ..< view.startIndex.advanced(by: self.http2ClientMagic.count)]
  331. return slice.elementsEqual(HTTPVersionParser.http2ClientMagic)
  332. }
  333. private static let http1_1 = [
  334. UInt8(ascii: "H"),
  335. UInt8(ascii: "T"),
  336. UInt8(ascii: "T"),
  337. UInt8(ascii: "P"),
  338. UInt8(ascii: "/"),
  339. UInt8(ascii: "1"),
  340. UInt8(ascii: "."),
  341. UInt8(ascii: "1"),
  342. ]
  343. /// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
  344. static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> Bool {
  345. var readableBytesView = buffer.readableBytesView
  346. // From RFC 2616 § 5.1:
  347. // Request-Line = Method SP Request-URI SP HTTP-Version CRLF
  348. // Read off the Method and Request-URI (and spaces).
  349. guard readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil,
  350. readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil else {
  351. return false
  352. }
  353. // Read off the HTTP-Version and CR.
  354. guard let versionView = readableBytesView.trimPrefix(to: UInt8(ascii: "\r")) else {
  355. return false
  356. }
  357. // Check that the LF followed the CR.
  358. guard readableBytesView.first == UInt8(ascii: "\n") else {
  359. return false
  360. }
  361. // Now check the HTTP version.
  362. return versionView.elementsEqual(HTTPVersionParser.http1_1)
  363. }
  364. }