HTTP2ServerTransport+Posix.swift 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. public import GRPCCore
  17. public import GRPCHTTP2Core
  18. internal import NIOCore
  19. internal import NIOExtras
  20. internal import NIOHTTP2
  21. public import NIOPosix // has to be public because of default argument value in init
  22. private import Synchronization
  23. #if canImport(NIOSSL)
  24. import NIOSSL
  25. #endif
  26. extension HTTP2ServerTransport {
  27. /// A NIOPosix-backed implementation of a server transport.
  28. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  29. public final class Posix: ServerTransport, ListeningServerTransport {
  30. private let address: GRPCHTTP2Core.SocketAddress
  31. private let config: Config
  32. private let eventLoopGroup: MultiThreadedEventLoopGroup
  33. private let serverQuiescingHelper: ServerQuiescingHelper
  34. private enum State {
  35. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  36. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  37. case closedOrInvalidAddress(RuntimeError)
  38. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  39. get throws {
  40. switch self {
  41. case .idle(let eventLoopPromise):
  42. return eventLoopPromise.futureResult
  43. case .listening(let eventLoopFuture):
  44. return eventLoopFuture
  45. case .closedOrInvalidAddress(let runtimeError):
  46. throw runtimeError
  47. }
  48. }
  49. }
  50. enum OnBound {
  51. case succeedPromise(
  52. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  53. address: GRPCHTTP2Core.SocketAddress
  54. )
  55. case failPromise(
  56. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  57. error: RuntimeError
  58. )
  59. }
  60. mutating func addressBound(
  61. _ address: NIOCore.SocketAddress?,
  62. userProvidedAddress: GRPCHTTP2Core.SocketAddress
  63. ) -> OnBound {
  64. switch self {
  65. case .idle(let listeningAddressPromise):
  66. if let address {
  67. self = .listening(listeningAddressPromise.futureResult)
  68. return .succeedPromise(
  69. listeningAddressPromise,
  70. address: GRPCHTTP2Core.SocketAddress(address)
  71. )
  72. } else if userProvidedAddress.virtualSocket != nil {
  73. self = .listening(listeningAddressPromise.futureResult)
  74. return .succeedPromise(listeningAddressPromise, address: userProvidedAddress)
  75. } else {
  76. assertionFailure("Unknown address type")
  77. let invalidAddressError = RuntimeError(
  78. code: .transportError,
  79. message: "Unknown address type returned by transport."
  80. )
  81. self = .closedOrInvalidAddress(invalidAddressError)
  82. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  83. }
  84. case .listening, .closedOrInvalidAddress:
  85. fatalError(
  86. "Invalid state: addressBound should only be called once and when in idle state"
  87. )
  88. }
  89. }
  90. enum OnClose {
  91. case failPromise(
  92. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  93. error: RuntimeError
  94. )
  95. case doNothing
  96. }
  97. mutating func close() -> OnClose {
  98. let serverStoppedError = RuntimeError(
  99. code: .serverIsStopped,
  100. message: """
  101. There is no listening address bound for this server: there may have been \
  102. an error which caused the transport to close, or it may have shut down.
  103. """
  104. )
  105. switch self {
  106. case .idle(let listeningAddressPromise):
  107. self = .closedOrInvalidAddress(serverStoppedError)
  108. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  109. case .listening:
  110. self = .closedOrInvalidAddress(serverStoppedError)
  111. return .doNothing
  112. case .closedOrInvalidAddress:
  113. return .doNothing
  114. }
  115. }
  116. }
  117. private let listeningAddressState: Mutex<State>
  118. /// The listening address for this server transport.
  119. ///
  120. /// It is an `async` property because it will only return once the address has been successfully bound.
  121. ///
  122. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  123. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  124. /// invalid address.
  125. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  126. get async throws {
  127. try await self.listeningAddressState
  128. .withLock { try $0.listeningAddressFuture }
  129. .get()
  130. }
  131. }
  132. /// Create a new `Posix` transport.
  133. ///
  134. /// - Parameters:
  135. /// - address: The address to which the server should be bound.
  136. /// - config: The transport configuration.
  137. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  138. public init(
  139. address: GRPCHTTP2Core.SocketAddress,
  140. config: Config,
  141. eventLoopGroup: MultiThreadedEventLoopGroup = .singletonMultiThreadedEventLoopGroup
  142. ) {
  143. self.address = address
  144. self.config = config
  145. self.eventLoopGroup = eventLoopGroup
  146. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  147. let eventLoop = eventLoopGroup.any()
  148. self.listeningAddressState = Mutex(.idle(eventLoop.makePromise()))
  149. }
  150. public func listen(
  151. _ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  152. ) async throws {
  153. defer {
  154. switch self.listeningAddressState.withLock({ $0.close() }) {
  155. case .failPromise(let promise, let error):
  156. promise.fail(error)
  157. case .doNothing:
  158. ()
  159. }
  160. }
  161. #if canImport(NIOSSL)
  162. let nioSSLContext: NIOSSLContext?
  163. switch self.config.transportSecurity.wrapped {
  164. case .plaintext:
  165. nioSSLContext = nil
  166. case .tls(let tlsConfig):
  167. do {
  168. nioSSLContext = try NIOSSLContext(configuration: TLSConfiguration(tlsConfig))
  169. } catch {
  170. throw RuntimeError(
  171. code: .transportError,
  172. message: "Couldn't create SSL context, check your TLS configuration.",
  173. cause: error
  174. )
  175. }
  176. }
  177. #endif
  178. let serverChannel = try await ServerBootstrap(group: self.eventLoopGroup)
  179. .serverChannelOption(
  180. ChannelOptions.socketOption(.so_reuseaddr),
  181. value: 1
  182. )
  183. .serverChannelInitializer { channel in
  184. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  185. channel: channel
  186. )
  187. return channel.pipeline.addHandler(quiescingHandler)
  188. }
  189. .bind(to: self.address) { channel in
  190. channel.eventLoop.makeCompletedFuture {
  191. #if canImport(NIOSSL)
  192. if let nioSSLContext {
  193. try channel.pipeline.syncOperations.addHandler(
  194. NIOSSLServerHandler(context: nioSSLContext)
  195. )
  196. }
  197. #endif
  198. let requireALPN: Bool
  199. let scheme: Scheme
  200. switch self.config.transportSecurity.wrapped {
  201. case .plaintext:
  202. requireALPN = false
  203. scheme = .http
  204. case .tls(let tlsConfig):
  205. requireALPN = tlsConfig.requireALPN
  206. scheme = .https
  207. }
  208. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  209. channel: channel,
  210. compressionConfig: self.config.compression,
  211. connectionConfig: self.config.connection,
  212. http2Config: self.config.http2,
  213. rpcConfig: self.config.rpc,
  214. requireALPN: requireALPN,
  215. scheme: scheme
  216. )
  217. }
  218. }
  219. let action = self.listeningAddressState.withLock {
  220. $0.addressBound(
  221. serverChannel.channel.localAddress,
  222. userProvidedAddress: self.address
  223. )
  224. }
  225. switch action {
  226. case .succeedPromise(let promise, let address):
  227. promise.succeed(address)
  228. case .failPromise(let promise, let error):
  229. promise.fail(error)
  230. }
  231. try await serverChannel.executeThenClose { inbound in
  232. try await withThrowingDiscardingTaskGroup { group in
  233. for try await (connectionChannel, streamMultiplexer) in inbound {
  234. group.addTask {
  235. try await self.handleConnection(
  236. connectionChannel,
  237. multiplexer: streamMultiplexer,
  238. streamHandler: streamHandler
  239. )
  240. }
  241. }
  242. }
  243. }
  244. }
  245. private func handleConnection(
  246. _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
  247. multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
  248. streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  249. ) async throws {
  250. try await connection.executeThenClose { inbound, _ in
  251. await withDiscardingTaskGroup { group in
  252. group.addTask {
  253. do {
  254. for try await _ in inbound {}
  255. } catch {
  256. // We don't want to close the channel if one connection throws.
  257. return
  258. }
  259. }
  260. do {
  261. for try await (stream, descriptor) in multiplexer.inbound {
  262. group.addTask {
  263. await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
  264. }
  265. }
  266. } catch {
  267. return
  268. }
  269. }
  270. }
  271. }
  272. private func handleStream(
  273. _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
  274. handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
  275. descriptor: EventLoopFuture<MethodDescriptor>
  276. ) async {
  277. // It's okay to ignore these errors:
  278. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  279. // - If we get an error because the inner closure threw, then the only possible scenario in which
  280. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  281. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  282. try? await stream.executeThenClose { inbound, outbound in
  283. guard let descriptor = try? await descriptor.get() else {
  284. return
  285. }
  286. let rpcStream = RPCStream(
  287. descriptor: descriptor,
  288. inbound: RPCAsyncSequence(wrapping: inbound),
  289. outbound: RPCWriter.Closable(
  290. wrapping: ServerConnection.Stream.Outbound(
  291. responseWriter: outbound,
  292. http2Stream: stream
  293. )
  294. )
  295. )
  296. await streamHandler(rpcStream)
  297. }
  298. }
  299. public func beginGracefulShutdown() {
  300. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  301. }
  302. }
  303. }
  304. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  305. extension HTTP2ServerTransport.Posix {
  306. /// Config for the `Posix` transport.
  307. public struct Config: Sendable {
  308. /// Compression configuration.
  309. public var compression: HTTP2ServerTransport.Config.Compression
  310. /// Connection configuration.
  311. public var connection: HTTP2ServerTransport.Config.Connection
  312. /// HTTP2 configuration.
  313. public var http2: HTTP2ServerTransport.Config.HTTP2
  314. /// RPC configuration.
  315. public var rpc: HTTP2ServerTransport.Config.RPC
  316. /// The transport's security.
  317. public var transportSecurity: TransportSecurity
  318. /// Construct a new `Config`.
  319. /// - Parameters:
  320. /// - compression: Compression configuration.
  321. /// - connection: Connection configuration.
  322. /// - http2: HTTP2 configuration.
  323. /// - rpc: RPC configuration.
  324. /// - transportSecurity: The transport's security configuration.
  325. public init(
  326. compression: HTTP2ServerTransport.Config.Compression,
  327. connection: HTTP2ServerTransport.Config.Connection,
  328. http2: HTTP2ServerTransport.Config.HTTP2,
  329. rpc: HTTP2ServerTransport.Config.RPC,
  330. transportSecurity: TransportSecurity
  331. ) {
  332. self.compression = compression
  333. self.connection = connection
  334. self.http2 = http2
  335. self.rpc = rpc
  336. self.transportSecurity = transportSecurity
  337. }
  338. /// Default values for the different configurations.
  339. ///
  340. /// - Parameters:
  341. /// - transportSecurity: The security settings applied to the transport.
  342. /// - configure: A closure which allows you to modify the defaults before returning them.
  343. public static func defaults(
  344. transportSecurity: TransportSecurity,
  345. configure: (_ config: inout Self) -> Void = { _ in }
  346. ) -> Self {
  347. var config = Self(
  348. compression: .defaults,
  349. connection: .defaults,
  350. http2: .defaults,
  351. rpc: .defaults,
  352. transportSecurity: transportSecurity
  353. )
  354. configure(&config)
  355. return config
  356. }
  357. }
  358. }
  359. extension NIOCore.SocketAddress {
  360. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  361. if let ipv4 = socketAddress.ipv4 {
  362. self = try Self(ipv4)
  363. } else if let ipv6 = socketAddress.ipv6 {
  364. self = try Self(ipv6)
  365. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  366. self = try Self(unixDomainSocket)
  367. } else {
  368. throw RPCError(
  369. code: .internalError,
  370. message:
  371. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  372. )
  373. }
  374. }
  375. }
  376. extension ServerBootstrap {
  377. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  378. fileprivate func bind<Output: Sendable>(
  379. to address: GRPCHTTP2Core.SocketAddress,
  380. childChannelInitializer: @escaping @Sendable (any Channel) -> EventLoopFuture<Output>
  381. ) async throws -> NIOAsyncChannel<Output, Never> {
  382. if let virtualSocket = address.virtualSocket {
  383. return try await self.bind(
  384. to: VsockAddress(virtualSocket),
  385. childChannelInitializer: childChannelInitializer
  386. )
  387. } else {
  388. return try await self.bind(
  389. to: NIOCore.SocketAddress(address),
  390. childChannelInitializer: childChannelInitializer
  391. )
  392. }
  393. }
  394. }
  395. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  396. extension ServerTransport where Self == HTTP2ServerTransport.Posix {
  397. /// Create a new `Posix` based HTTP/2 server transport.
  398. ///
  399. /// - Parameters:
  400. /// - address: The address to which the server should be bound.
  401. /// - config: The transport configuration.
  402. /// - eventLoopGroup: The underlying NIO `EventLoopGroup` to the server on. This must
  403. /// be a `MultiThreadedEventLoopGroup` or an `EventLoop` from
  404. /// a `MultiThreadedEventLoopGroup`.
  405. public static func http2NIOPosix(
  406. address: GRPCHTTP2Core.SocketAddress,
  407. config: HTTP2ServerTransport.Posix.Config,
  408. eventLoopGroup: MultiThreadedEventLoopGroup = .singletonMultiThreadedEventLoopGroup
  409. ) -> Self {
  410. return HTTP2ServerTransport.Posix(
  411. address: address,
  412. config: config,
  413. eventLoopGroup: eventLoopGroup
  414. )
  415. }
  416. }