2
0

GRPCServerPipelineConfigurator.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import NIOTLS
  21. /// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
  22. /// version used for transport.
  23. ///
  24. /// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
  25. /// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
  26. /// configured then the HTTP version is determined by parsing the inbound byte stream.
  27. final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
  28. internal typealias InboundIn = ByteBuffer
  29. internal typealias InboundOut = ByteBuffer
  30. /// The server configuration.
  31. private let configuration: Server.Configuration
  32. /// Reads which we're holding on to before the pipeline is configured.
  33. private var bufferedReads = CircularBuffer<NIOAny>()
  34. /// The current state.
  35. private var state: State
  36. private enum ALPN {
  37. /// ALPN is expected. It may or may not be required, however.
  38. case expected(required: Bool)
  39. /// ALPN was expected but not required and no protocol was negotiated in the handshake. We may
  40. /// now fall back to parsing bytes on the connection.
  41. case expectedButFallingBack
  42. /// ALPN is not expected; this is a cleartext connection.
  43. case notExpected
  44. }
  45. private enum State {
  46. /// The pipeline isn't configured yet.
  47. case notConfigured(alpn: ALPN)
  48. /// We're configuring the pipeline.
  49. case configuring
  50. }
  51. init(configuration: Server.Configuration) {
  52. if let tls = configuration.tlsConfiguration {
  53. self.state = .notConfigured(alpn: .expected(required: tls.requireALPN))
  54. } else {
  55. self.state = .notConfigured(alpn: .notExpected)
  56. }
  57. self.configuration = configuration
  58. }
  59. /// Makes a gRPC idle handler for the server..
  60. private func makeIdleHandler() -> GRPCIdleHandler {
  61. return .init(
  62. idleTimeout: self.configuration.connectionIdleTimeout,
  63. keepalive: self.configuration.connectionKeepalive,
  64. logger: self.configuration.logger
  65. )
  66. }
  67. /// Makes an HTTP/2 handler.
  68. private func makeHTTP2Handler() -> NIOHTTP2Handler {
  69. return .init(mode: .server)
  70. }
  71. /// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
  72. private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
  73. var logger = self.configuration.logger
  74. return .init(
  75. mode: .server,
  76. channel: channel,
  77. targetWindowSize: self.configuration.httpTargetWindowSize
  78. ) { stream in
  79. // TODO: use sync options when NIO HTTP/2 support for them is released
  80. // https://github.com/apple/swift-nio-http2/pull/283
  81. stream.getOption(HTTP2StreamChannelOptions.streamID).map { streamID -> Logger in
  82. logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
  83. return logger
  84. }.recover { _ in
  85. logger[metadataKey: MetadataKey.h2StreamID] = "<unknown>"
  86. return logger
  87. }.flatMap { logger in
  88. // TODO: provide user configuration for header normalization.
  89. let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
  90. return stream.pipeline.addHandler(handler)
  91. }
  92. }
  93. }
  94. /// Makes an HTTP/2 to raw gRPC server handler.
  95. private func makeHTTP2ToRawGRPCHandler(
  96. normalizeHeaders: Bool,
  97. logger: Logger
  98. ) -> HTTP2ToRawGRPCServerCodec {
  99. return HTTP2ToRawGRPCServerCodec(
  100. servicesByName: self.configuration.serviceProvidersByName,
  101. encoding: self.configuration.messageEncoding,
  102. errorDelegate: self.configuration.errorDelegate,
  103. normalizeHeaders: normalizeHeaders,
  104. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  105. logger: logger
  106. )
  107. }
  108. /// The pipeline finished configuring.
  109. private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
  110. switch result {
  111. case .success:
  112. context.pipeline.removeHandler(context: context, promise: nil)
  113. case let .failure(error):
  114. self.errorCaught(context: context, error: error)
  115. }
  116. }
  117. /// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
  118. private func configureHTTP2(context: ChannelHandlerContext) {
  119. // We're now configuring the pipeline.
  120. self.state = .configuring
  121. // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
  122. // to then insert our keepalive and idle handlers between. We can just add everything together.
  123. let result: Result<Void, Error>
  124. do {
  125. // This is only ever called as a result of reading a user inbound event or reading inbound so
  126. // we'll be on the right event loop and sync operations are fine.
  127. let sync = context.pipeline.syncOperations
  128. try sync.addHandler(self.makeHTTP2Handler())
  129. try sync.addHandler(self.makeIdleHandler())
  130. try sync.addHandler(self.makeHTTP2Multiplexer(for: context.channel))
  131. result = .success(())
  132. } catch {
  133. result = .failure(error)
  134. }
  135. self.configurationCompleted(result: result, context: context)
  136. }
  137. /// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
  138. private func configureHTTP1(context: ChannelHandlerContext) {
  139. // We're now configuring the pipeline.
  140. self.state = .configuring
  141. let result: Result<Void, Error>
  142. do {
  143. // This is only ever called as a result of reading a user inbound event or reading inbound so
  144. // we'll be on the right event loop and sync operations are fine.
  145. let sync = context.pipeline.syncOperations
  146. try sync.configureHTTPServerPipeline(withErrorHandling: true)
  147. try sync.addHandler(WebCORSHandler())
  148. let scheme = self.configuration.tlsConfiguration == nil ? "http" : "https"
  149. try sync.addHandler(GRPCWebToHTTP2ServerCodec(scheme: scheme))
  150. // There's no need to normalize headers for HTTP/1.
  151. try sync.addHandler(
  152. self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger)
  153. )
  154. result = .success(())
  155. } catch {
  156. result = .failure(error)
  157. }
  158. self.configurationCompleted(result: result, context: context)
  159. }
  160. /// Attempts to determine the HTTP version from the buffer and then configure the pipeline
  161. /// appropriately. Closes the connection if the HTTP version could not be determined.
  162. private func determineHTTPVersionAndConfigurePipeline(
  163. buffer: ByteBuffer,
  164. context: ChannelHandlerContext
  165. ) {
  166. if HTTPVersionParser.prefixedWithHTTP2ConnectionPreface(buffer) {
  167. self.configureHTTP2(context: context)
  168. } else if HTTPVersionParser.prefixedWithHTTP1RequestLine(buffer) {
  169. self.configureHTTP1(context: context)
  170. } else {
  171. self.configuration.logger.error("Unable to determine http version, closing")
  172. context.close(mode: .all, promise: nil)
  173. }
  174. }
  175. /// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
  176. /// requests.
  177. private func handleHandshakeCompletedEvent(
  178. _ event: TLSUserEvent,
  179. alpnIsRequired: Bool,
  180. context: ChannelHandlerContext
  181. ) {
  182. switch event {
  183. case let .handshakeCompleted(negotiatedProtocol):
  184. self.configuration.logger.debug("TLS handshake completed", metadata: [
  185. "alpn": "\(negotiatedProtocol ?? "nil")",
  186. ])
  187. switch negotiatedProtocol {
  188. case let .some(negotiated):
  189. if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
  190. self.configureHTTP2(context: context)
  191. } else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
  192. self.configureHTTP1(context: context)
  193. } else {
  194. self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
  195. context.close(mode: .all, promise: nil)
  196. }
  197. case .none:
  198. if alpnIsRequired {
  199. self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
  200. context.close(mode: .all, promise: nil)
  201. } else {
  202. self.configuration.logger.warning("No ALPN protocol negotiated'")
  203. // We're now falling back to parsing bytes.
  204. self.state = .notConfigured(alpn: .expectedButFallingBack)
  205. self.tryParsingBufferedData(context: context)
  206. }
  207. }
  208. case .shutdownCompleted:
  209. // We don't care about this here.
  210. ()
  211. }
  212. }
  213. /// Try to parse the buffered data to determine whether or not HTTP/2 or HTTP/1 should be used.
  214. private func tryParsingBufferedData(context: ChannelHandlerContext) {
  215. guard let first = self.bufferedReads.first else {
  216. // No data buffered yet. We'll try when we read.
  217. return
  218. }
  219. let buffer = self.unwrapInboundIn(first)
  220. self.determineHTTPVersionAndConfigurePipeline(buffer: buffer, context: context)
  221. }
  222. // MARK: - Channel Handler
  223. internal func errorCaught(context: ChannelHandlerContext, error: Error) {
  224. if let delegate = self.configuration.errorDelegate {
  225. let baseError: Error
  226. if let errorWithContext = error as? GRPCError.WithContext {
  227. baseError = errorWithContext.error
  228. } else {
  229. baseError = error
  230. }
  231. delegate.observeLibraryError(baseError)
  232. }
  233. context.close(mode: .all, promise: nil)
  234. }
  235. internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  236. switch self.state {
  237. case let .notConfigured(alpn: .expected(required)):
  238. if let event = event as? TLSUserEvent {
  239. self.handleHandshakeCompletedEvent(event, alpnIsRequired: required, context: context)
  240. }
  241. case .notConfigured(alpn: .expectedButFallingBack),
  242. .notConfigured(alpn: .notExpected),
  243. .configuring:
  244. ()
  245. }
  246. context.fireUserInboundEventTriggered(event)
  247. }
  248. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  249. self.bufferedReads.append(data)
  250. switch self.state {
  251. case .notConfigured(alpn: .notExpected),
  252. .notConfigured(alpn: .expectedButFallingBack):
  253. // If ALPN isn't expected, or we didn't negotiate via ALPN and we don't require it then we
  254. // can try parsing the data we just buffered.
  255. self.tryParsingBufferedData(context: context)
  256. case .notConfigured(alpn: .expected),
  257. .configuring:
  258. // We expect ALPN or we're being configured, just buffer the data, we'll forward it later.
  259. ()
  260. }
  261. // Don't forward the reads: we'll do so when we have configured the pipeline.
  262. }
  263. internal func removeHandler(
  264. context: ChannelHandlerContext,
  265. removalToken: ChannelHandlerContext.RemovalToken
  266. ) {
  267. // Forward any buffered reads.
  268. while let read = self.bufferedReads.popFirst() {
  269. context.fireChannelRead(read)
  270. }
  271. context.leavePipeline(removalToken: removalToken)
  272. }
  273. }
  274. // MARK: - HTTP Version Parser
  275. struct HTTPVersionParser {
  276. /// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
  277. private static let http2ClientMagic = [
  278. UInt8(ascii: "P"),
  279. UInt8(ascii: "R"),
  280. UInt8(ascii: "I"),
  281. UInt8(ascii: " "),
  282. UInt8(ascii: "*"),
  283. UInt8(ascii: " "),
  284. UInt8(ascii: "H"),
  285. UInt8(ascii: "T"),
  286. UInt8(ascii: "T"),
  287. UInt8(ascii: "P"),
  288. UInt8(ascii: "/"),
  289. UInt8(ascii: "2"),
  290. UInt8(ascii: "."),
  291. UInt8(ascii: "0"),
  292. UInt8(ascii: "\r"),
  293. UInt8(ascii: "\n"),
  294. UInt8(ascii: "\r"),
  295. UInt8(ascii: "\n"),
  296. UInt8(ascii: "S"),
  297. UInt8(ascii: "M"),
  298. UInt8(ascii: "\r"),
  299. UInt8(ascii: "\n"),
  300. UInt8(ascii: "\r"),
  301. UInt8(ascii: "\n"),
  302. ]
  303. /// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
  304. /// connection preface.
  305. static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> Bool {
  306. let view = buffer.readableBytesView
  307. guard view.count >= HTTPVersionParser.http2ClientMagic.count else {
  308. // Not enough bytes.
  309. return false
  310. }
  311. let slice = view[view.startIndex ..< view.startIndex.advanced(by: self.http2ClientMagic.count)]
  312. return slice.elementsEqual(HTTPVersionParser.http2ClientMagic)
  313. }
  314. private static let http1_1 = [
  315. UInt8(ascii: "H"),
  316. UInt8(ascii: "T"),
  317. UInt8(ascii: "T"),
  318. UInt8(ascii: "P"),
  319. UInt8(ascii: "/"),
  320. UInt8(ascii: "1"),
  321. UInt8(ascii: "."),
  322. UInt8(ascii: "1"),
  323. ]
  324. /// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
  325. static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> Bool {
  326. var readableBytesView = buffer.readableBytesView
  327. // From RFC 2616 § 5.1:
  328. // Request-Line = Method SP Request-URI SP HTTP-Version CRLF
  329. // Read off the Method and Request-URI (and spaces).
  330. guard readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil,
  331. readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil else {
  332. return false
  333. }
  334. // Read off the HTTP-Version and CR.
  335. guard let versionView = readableBytesView.trimPrefix(to: UInt8(ascii: "\r")) else {
  336. return false
  337. }
  338. // Check that the LF followed the CR.
  339. guard readableBytesView.first == UInt8(ascii: "\n") else {
  340. return false
  341. }
  342. // Now check the HTTP version.
  343. return versionView.elementsEqual(HTTPVersionParser.http1_1)
  344. }
  345. }