HTTP2ServerTransport+Posix.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. @_spi(Package) import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOExtras
  20. import NIOPosix
  21. extension HTTP2ServerTransport {
  22. /// A NIOPosix-backed implementation of a server transport.
  23. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  24. public struct Posix: ServerTransport {
  25. private let address: GRPCHTTP2Core.SocketAddress
  26. private let config: Config
  27. private let eventLoopGroup: MultiThreadedEventLoopGroup
  28. private let serverQuiescingHelper: ServerQuiescingHelper
  29. private enum State {
  30. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  31. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  32. case closedOrInvalidAddress(RuntimeError)
  33. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  34. get throws {
  35. switch self {
  36. case .idle(let eventLoopPromise):
  37. return eventLoopPromise.futureResult
  38. case .listening(let eventLoopFuture):
  39. return eventLoopFuture
  40. case .closedOrInvalidAddress(let runtimeError):
  41. throw runtimeError
  42. }
  43. }
  44. }
  45. enum OnBound {
  46. case succeedPromise(
  47. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  48. address: GRPCHTTP2Core.SocketAddress
  49. )
  50. case failPromise(
  51. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  52. error: RuntimeError
  53. )
  54. }
  55. mutating func addressBound(
  56. _ address: NIOCore.SocketAddress?,
  57. userProvidedAddress: GRPCHTTP2Core.SocketAddress
  58. ) -> OnBound {
  59. switch self {
  60. case .idle(let listeningAddressPromise):
  61. if let address {
  62. self = .listening(listeningAddressPromise.futureResult)
  63. return .succeedPromise(
  64. listeningAddressPromise,
  65. address: GRPCHTTP2Core.SocketAddress(address)
  66. )
  67. } else if userProvidedAddress.virtualSocket != nil {
  68. self = .listening(listeningAddressPromise.futureResult)
  69. return .succeedPromise(listeningAddressPromise, address: userProvidedAddress)
  70. } else {
  71. assertionFailure("Unknown address type")
  72. let invalidAddressError = RuntimeError(
  73. code: .transportError,
  74. message: "Unknown address type returned by transport."
  75. )
  76. self = .closedOrInvalidAddress(invalidAddressError)
  77. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  78. }
  79. case .listening, .closedOrInvalidAddress:
  80. fatalError(
  81. "Invalid state: addressBound should only be called once and when in idle state"
  82. )
  83. }
  84. }
  85. enum OnClose {
  86. case failPromise(
  87. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  88. error: RuntimeError
  89. )
  90. case doNothing
  91. }
  92. mutating func close() -> OnClose {
  93. let serverStoppedError = RuntimeError(
  94. code: .serverIsStopped,
  95. message: """
  96. There is no listening address bound for this server: there may have been \
  97. an error which caused the transport to close, or it may have shut down.
  98. """
  99. )
  100. switch self {
  101. case .idle(let listeningAddressPromise):
  102. self = .closedOrInvalidAddress(serverStoppedError)
  103. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  104. case .listening:
  105. self = .closedOrInvalidAddress(serverStoppedError)
  106. return .doNothing
  107. case .closedOrInvalidAddress:
  108. return .doNothing
  109. }
  110. }
  111. }
  112. private let listeningAddressState: _LockedValueBox<State>
  113. /// The listening address for this server transport.
  114. ///
  115. /// It is an `async` property because it will only return once the address has been successfully bound.
  116. ///
  117. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  118. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  119. /// invalid address.
  120. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  121. get async throws {
  122. try await self.listeningAddressState
  123. .withLockedValue { try $0.listeningAddressFuture }
  124. .get()
  125. }
  126. }
  127. /// Create a new `Posix` transport.
  128. ///
  129. /// - Parameters:
  130. /// - address: The address to which the server should be bound.
  131. /// - config: The transport configuration.
  132. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  133. public init(
  134. address: GRPCHTTP2Core.SocketAddress,
  135. config: Config = .defaults,
  136. eventLoopGroup: MultiThreadedEventLoopGroup = .singletonMultiThreadedEventLoopGroup
  137. ) {
  138. self.address = address
  139. self.config = config
  140. self.eventLoopGroup = eventLoopGroup
  141. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  142. let eventLoop = eventLoopGroup.any()
  143. self.listeningAddressState = _LockedValueBox(.idle(eventLoop.makePromise()))
  144. }
  145. public func listen(
  146. _ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  147. ) async throws {
  148. defer {
  149. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  150. case .failPromise(let promise, let error):
  151. promise.fail(error)
  152. case .doNothing:
  153. ()
  154. }
  155. }
  156. let serverChannel = try await ServerBootstrap(group: self.eventLoopGroup)
  157. .serverChannelOption(
  158. ChannelOptions.socketOption(.so_reuseaddr),
  159. value: 1
  160. )
  161. .serverChannelInitializer { channel in
  162. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  163. channel: channel
  164. )
  165. return channel.pipeline.addHandler(quiescingHandler)
  166. }
  167. .bind(to: self.address) { channel in
  168. channel.eventLoop.makeCompletedFuture {
  169. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  170. channel: channel,
  171. compressionConfig: self.config.compression,
  172. connectionConfig: self.config.connection,
  173. http2Config: self.config.http2,
  174. rpcConfig: self.config.rpc,
  175. useTLS: false
  176. )
  177. }
  178. }
  179. let action = self.listeningAddressState.withLockedValue {
  180. $0.addressBound(
  181. serverChannel.channel.localAddress,
  182. userProvidedAddress: self.address
  183. )
  184. }
  185. switch action {
  186. case .succeedPromise(let promise, let address):
  187. promise.succeed(address)
  188. case .failPromise(let promise, let error):
  189. promise.fail(error)
  190. }
  191. try await serverChannel.executeThenClose { inbound in
  192. try await withThrowingDiscardingTaskGroup { serverTaskGroup in
  193. for try await (connectionChannel, streamMultiplexer) in inbound {
  194. serverTaskGroup.addTask {
  195. try await connectionChannel
  196. .executeThenClose { connectionInbound, connectionOutbound in
  197. await withDiscardingTaskGroup { connectionTaskGroup in
  198. connectionTaskGroup.addTask {
  199. do {
  200. for try await _ in connectionInbound {}
  201. } catch {
  202. // We don't want to close the channel if one connection throws.
  203. return
  204. }
  205. }
  206. connectionTaskGroup.addTask {
  207. await withDiscardingTaskGroup { streamTaskGroup in
  208. do {
  209. for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
  210. {
  211. streamTaskGroup.addTask {
  212. // It's okay to ignore these errors:
  213. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  214. // - If we get an error because the inner closure threw, then the only possible scenario in which
  215. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  216. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  217. try? await http2Stream.executeThenClose { inbound, outbound in
  218. guard let descriptor = try? await methodDescriptor.get() else {
  219. return
  220. }
  221. let rpcStream = RPCStream(
  222. descriptor: descriptor,
  223. inbound: RPCAsyncSequence(wrapping: inbound),
  224. outbound: RPCWriter.Closable(
  225. wrapping: ServerConnection.Stream.Outbound(
  226. responseWriter: outbound,
  227. http2Stream: http2Stream
  228. )
  229. )
  230. )
  231. await streamHandler(rpcStream)
  232. }
  233. }
  234. }
  235. } catch {
  236. // We don't want to close the whole connection if one stream throws.
  237. return
  238. }
  239. }
  240. }
  241. }
  242. }
  243. }
  244. }
  245. }
  246. }
  247. }
  248. public func stopListening() {
  249. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  250. }
  251. }
  252. }
  253. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  254. extension HTTP2ServerTransport.Posix {
  255. /// Configuration for the ``GRPCHTTP2TransportNIOPosix/GRPCHTTP2Core/HTTP2ServerTransport/Posix``.
  256. public struct Config: Sendable {
  257. /// Compression configuration.
  258. public var compression: HTTP2ServerTransport.Config.Compression
  259. /// Connection configuration.
  260. public var connection: HTTP2ServerTransport.Config.Connection
  261. /// HTTP2 configuration.
  262. public var http2: HTTP2ServerTransport.Config.HTTP2
  263. /// RPC configuration.
  264. public var rpc: HTTP2ServerTransport.Config.RPC
  265. /// Construct a new `Config`.
  266. /// - Parameters:
  267. /// - compression: Compression configuration.
  268. /// - connection: Connection configuration.
  269. /// - http2: HTTP2 configuration.
  270. /// - rpc: RPC configuration.
  271. public init(
  272. compression: HTTP2ServerTransport.Config.Compression,
  273. connection: HTTP2ServerTransport.Config.Connection,
  274. http2: HTTP2ServerTransport.Config.HTTP2,
  275. rpc: HTTP2ServerTransport.Config.RPC
  276. ) {
  277. self.compression = compression
  278. self.connection = connection
  279. self.http2 = http2
  280. self.rpc = rpc
  281. }
  282. /// Default values for the different configurations.
  283. public static var defaults: Self {
  284. Self(
  285. compression: .defaults,
  286. connection: .defaults,
  287. http2: .defaults,
  288. rpc: .defaults
  289. )
  290. }
  291. }
  292. }
  293. extension NIOCore.SocketAddress {
  294. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  295. if let ipv4 = socketAddress.ipv4 {
  296. self = try Self(ipv4)
  297. } else if let ipv6 = socketAddress.ipv6 {
  298. self = try Self(ipv6)
  299. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  300. self = try Self(unixDomainSocket)
  301. } else {
  302. throw RPCError(
  303. code: .internalError,
  304. message:
  305. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  306. )
  307. }
  308. }
  309. }
  310. extension ServerBootstrap {
  311. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  312. fileprivate func bind<Output: Sendable>(
  313. to address: GRPCHTTP2Core.SocketAddress,
  314. childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
  315. ) async throws -> NIOAsyncChannel<Output, Never> {
  316. if let virtualSocket = address.virtualSocket {
  317. return try await self.bind(
  318. to: VsockAddress(virtualSocket),
  319. childChannelInitializer: childChannelInitializer
  320. )
  321. } else {
  322. return try await self.bind(
  323. to: NIOCore.SocketAddress(address),
  324. childChannelInitializer: childChannelInitializer
  325. )
  326. }
  327. }
  328. }