HTTP2ServerTransport+Posix.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. public import GRPCCore
  17. public import GRPCHTTP2Core
  18. internal import NIOCore
  19. internal import NIOExtras
  20. internal import NIOHTTP2
  21. public import NIOPosix // has to be public because of default argument value in init
  22. #if canImport(NIOSSL)
  23. import NIOSSL
  24. #endif
  25. extension HTTP2ServerTransport {
  26. /// A NIOPosix-backed implementation of a server transport.
  27. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  28. public struct Posix: ServerTransport {
  29. private let address: GRPCHTTP2Core.SocketAddress
  30. private let config: Config
  31. private let eventLoopGroup: MultiThreadedEventLoopGroup
  32. private let serverQuiescingHelper: ServerQuiescingHelper
  33. private enum State {
  34. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  35. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  36. case closedOrInvalidAddress(RuntimeError)
  37. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  38. get throws {
  39. switch self {
  40. case .idle(let eventLoopPromise):
  41. return eventLoopPromise.futureResult
  42. case .listening(let eventLoopFuture):
  43. return eventLoopFuture
  44. case .closedOrInvalidAddress(let runtimeError):
  45. throw runtimeError
  46. }
  47. }
  48. }
  49. enum OnBound {
  50. case succeedPromise(
  51. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  52. address: GRPCHTTP2Core.SocketAddress
  53. )
  54. case failPromise(
  55. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  56. error: RuntimeError
  57. )
  58. }
  59. mutating func addressBound(
  60. _ address: NIOCore.SocketAddress?,
  61. userProvidedAddress: GRPCHTTP2Core.SocketAddress
  62. ) -> OnBound {
  63. switch self {
  64. case .idle(let listeningAddressPromise):
  65. if let address {
  66. self = .listening(listeningAddressPromise.futureResult)
  67. return .succeedPromise(
  68. listeningAddressPromise,
  69. address: GRPCHTTP2Core.SocketAddress(address)
  70. )
  71. } else if userProvidedAddress.virtualSocket != nil {
  72. self = .listening(listeningAddressPromise.futureResult)
  73. return .succeedPromise(listeningAddressPromise, address: userProvidedAddress)
  74. } else {
  75. assertionFailure("Unknown address type")
  76. let invalidAddressError = RuntimeError(
  77. code: .transportError,
  78. message: "Unknown address type returned by transport."
  79. )
  80. self = .closedOrInvalidAddress(invalidAddressError)
  81. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  82. }
  83. case .listening, .closedOrInvalidAddress:
  84. fatalError(
  85. "Invalid state: addressBound should only be called once and when in idle state"
  86. )
  87. }
  88. }
  89. enum OnClose {
  90. case failPromise(
  91. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  92. error: RuntimeError
  93. )
  94. case doNothing
  95. }
  96. mutating func close() -> OnClose {
  97. let serverStoppedError = RuntimeError(
  98. code: .serverIsStopped,
  99. message: """
  100. There is no listening address bound for this server: there may have been \
  101. an error which caused the transport to close, or it may have shut down.
  102. """
  103. )
  104. switch self {
  105. case .idle(let listeningAddressPromise):
  106. self = .closedOrInvalidAddress(serverStoppedError)
  107. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  108. case .listening:
  109. self = .closedOrInvalidAddress(serverStoppedError)
  110. return .doNothing
  111. case .closedOrInvalidAddress:
  112. return .doNothing
  113. }
  114. }
  115. }
  116. private let listeningAddressState: LockedValueBox<State>
  117. /// The listening address for this server transport.
  118. ///
  119. /// It is an `async` property because it will only return once the address has been successfully bound.
  120. ///
  121. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  122. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  123. /// invalid address.
  124. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  125. get async throws {
  126. try await self.listeningAddressState
  127. .withLockedValue { try $0.listeningAddressFuture }
  128. .get()
  129. }
  130. }
  131. /// Create a new `Posix` transport.
  132. ///
  133. /// - Parameters:
  134. /// - address: The address to which the server should be bound.
  135. /// - config: The transport configuration.
  136. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  137. public init(
  138. address: GRPCHTTP2Core.SocketAddress,
  139. config: Config,
  140. eventLoopGroup: MultiThreadedEventLoopGroup = .singletonMultiThreadedEventLoopGroup
  141. ) {
  142. self.address = address
  143. self.config = config
  144. self.eventLoopGroup = eventLoopGroup
  145. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  146. let eventLoop = eventLoopGroup.any()
  147. self.listeningAddressState = LockedValueBox(.idle(eventLoop.makePromise()))
  148. }
  149. public func listen(
  150. _ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  151. ) async throws {
  152. defer {
  153. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  154. case .failPromise(let promise, let error):
  155. promise.fail(error)
  156. case .doNothing:
  157. ()
  158. }
  159. }
  160. #if canImport(NIOSSL)
  161. let nioSSLContext: NIOSSLContext?
  162. switch self.config.transportSecurity.wrapped {
  163. case .plaintext:
  164. nioSSLContext = nil
  165. case .tls(let tlsConfig):
  166. do {
  167. nioSSLContext = try NIOSSLContext(configuration: TLSConfiguration(tlsConfig))
  168. } catch {
  169. throw RuntimeError(
  170. code: .transportError,
  171. message: "Couldn't create SSL context, check your TLS configuration.",
  172. cause: error
  173. )
  174. }
  175. }
  176. #endif
  177. let serverChannel = try await ServerBootstrap(group: self.eventLoopGroup)
  178. .serverChannelOption(
  179. ChannelOptions.socketOption(.so_reuseaddr),
  180. value: 1
  181. )
  182. .serverChannelInitializer { channel in
  183. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  184. channel: channel
  185. )
  186. return channel.pipeline.addHandler(quiescingHandler)
  187. }
  188. .bind(to: self.address) { channel in
  189. channel.eventLoop.makeCompletedFuture {
  190. #if canImport(NIOSSL)
  191. if let nioSSLContext {
  192. try channel.pipeline.syncOperations.addHandler(
  193. NIOSSLServerHandler(context: nioSSLContext)
  194. )
  195. }
  196. #endif
  197. let requireALPN: Bool
  198. let scheme: Scheme
  199. switch self.config.transportSecurity.wrapped {
  200. case .plaintext:
  201. requireALPN = false
  202. scheme = .http
  203. case .tls(let tlsConfig):
  204. requireALPN = tlsConfig.requireALPN
  205. scheme = .https
  206. }
  207. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  208. channel: channel,
  209. compressionConfig: self.config.compression,
  210. connectionConfig: self.config.connection,
  211. http2Config: self.config.http2,
  212. rpcConfig: self.config.rpc,
  213. requireALPN: requireALPN,
  214. scheme: scheme
  215. )
  216. }
  217. }
  218. let action = self.listeningAddressState.withLockedValue {
  219. $0.addressBound(
  220. serverChannel.channel.localAddress,
  221. userProvidedAddress: self.address
  222. )
  223. }
  224. switch action {
  225. case .succeedPromise(let promise, let address):
  226. promise.succeed(address)
  227. case .failPromise(let promise, let error):
  228. promise.fail(error)
  229. }
  230. try await serverChannel.executeThenClose { inbound in
  231. try await withThrowingDiscardingTaskGroup { group in
  232. for try await (connectionChannel, streamMultiplexer) in inbound {
  233. group.addTask {
  234. try await self.handleConnection(
  235. connectionChannel,
  236. multiplexer: streamMultiplexer,
  237. streamHandler: streamHandler
  238. )
  239. }
  240. }
  241. }
  242. }
  243. }
  244. private func handleConnection(
  245. _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
  246. multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
  247. streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  248. ) async throws {
  249. try await connection.executeThenClose { inbound, _ in
  250. await withDiscardingTaskGroup { group in
  251. group.addTask {
  252. do {
  253. for try await _ in inbound {}
  254. } catch {
  255. // We don't want to close the channel if one connection throws.
  256. return
  257. }
  258. }
  259. do {
  260. for try await (stream, descriptor) in multiplexer.inbound {
  261. group.addTask {
  262. await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
  263. }
  264. }
  265. } catch {
  266. return
  267. }
  268. }
  269. }
  270. }
  271. private func handleStream(
  272. _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
  273. handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
  274. descriptor: EventLoopFuture<MethodDescriptor>
  275. ) async {
  276. // It's okay to ignore these errors:
  277. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  278. // - If we get an error because the inner closure threw, then the only possible scenario in which
  279. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  280. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  281. try? await stream.executeThenClose { inbound, outbound in
  282. guard let descriptor = try? await descriptor.get() else {
  283. return
  284. }
  285. let rpcStream = RPCStream(
  286. descriptor: descriptor,
  287. inbound: RPCAsyncSequence(wrapping: inbound),
  288. outbound: RPCWriter.Closable(
  289. wrapping: ServerConnection.Stream.Outbound(
  290. responseWriter: outbound,
  291. http2Stream: stream
  292. )
  293. )
  294. )
  295. await streamHandler(rpcStream)
  296. }
  297. }
  298. public func stopListening() {
  299. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  300. }
  301. }
  302. }
  303. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  304. extension HTTP2ServerTransport.Posix {
  305. /// Config for the `Posix` transport.
  306. public struct Config: Sendable {
  307. /// Compression configuration.
  308. public var compression: HTTP2ServerTransport.Config.Compression
  309. /// Connection configuration.
  310. public var connection: HTTP2ServerTransport.Config.Connection
  311. /// HTTP2 configuration.
  312. public var http2: HTTP2ServerTransport.Config.HTTP2
  313. /// RPC configuration.
  314. public var rpc: HTTP2ServerTransport.Config.RPC
  315. /// The transport's security.
  316. public var transportSecurity: TransportSecurity
  317. /// Construct a new `Config`.
  318. /// - Parameters:
  319. /// - compression: Compression configuration.
  320. /// - connection: Connection configuration.
  321. /// - http2: HTTP2 configuration.
  322. /// - rpc: RPC configuration.
  323. /// - transportSecurity: The transport's security configuration.
  324. public init(
  325. compression: HTTP2ServerTransport.Config.Compression,
  326. connection: HTTP2ServerTransport.Config.Connection,
  327. http2: HTTP2ServerTransport.Config.HTTP2,
  328. rpc: HTTP2ServerTransport.Config.RPC,
  329. transportSecurity: TransportSecurity
  330. ) {
  331. self.compression = compression
  332. self.connection = connection
  333. self.http2 = http2
  334. self.rpc = rpc
  335. self.transportSecurity = transportSecurity
  336. }
  337. /// Default values for the different configurations.
  338. public static func defaults(
  339. transportSecurity: TransportSecurity
  340. ) -> Self {
  341. Self(
  342. compression: .defaults,
  343. connection: .defaults,
  344. http2: .defaults,
  345. rpc: .defaults,
  346. transportSecurity: transportSecurity
  347. )
  348. }
  349. }
  350. }
  351. extension NIOCore.SocketAddress {
  352. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  353. if let ipv4 = socketAddress.ipv4 {
  354. self = try Self(ipv4)
  355. } else if let ipv6 = socketAddress.ipv6 {
  356. self = try Self(ipv6)
  357. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  358. self = try Self(unixDomainSocket)
  359. } else {
  360. throw RPCError(
  361. code: .internalError,
  362. message:
  363. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  364. )
  365. }
  366. }
  367. }
  368. extension ServerBootstrap {
  369. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  370. fileprivate func bind<Output: Sendable>(
  371. to address: GRPCHTTP2Core.SocketAddress,
  372. childChannelInitializer: @escaping @Sendable (any Channel) -> EventLoopFuture<Output>
  373. ) async throws -> NIOAsyncChannel<Output, Never> {
  374. if let virtualSocket = address.virtualSocket {
  375. return try await self.bind(
  376. to: VsockAddress(virtualSocket),
  377. childChannelInitializer: childChannelInitializer
  378. )
  379. } else {
  380. return try await self.bind(
  381. to: NIOCore.SocketAddress(address),
  382. childChannelInitializer: childChannelInitializer
  383. )
  384. }
  385. }
  386. }