HTTP2ServerTransport+Posix.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. @_spi(Package) import GRPCHTTP2Core
  18. import NIOCore
  19. import NIOExtras
  20. import NIOPosix
  21. extension HTTP2ServerTransport {
  22. /// A NIOPosix-backed implementation of a server transport.
  23. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  24. public struct Posix: ServerTransport {
  25. private let address: GRPCHTTP2Core.SocketAddress
  26. private let config: Config
  27. private let eventLoopGroup: MultiThreadedEventLoopGroup
  28. private let serverQuiescingHelper: ServerQuiescingHelper
  29. private enum State {
  30. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  31. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  32. case closedOrInvalidAddress(RuntimeError)
  33. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  34. get throws {
  35. switch self {
  36. case .idle(let eventLoopPromise):
  37. return eventLoopPromise.futureResult
  38. case .listening(let eventLoopFuture):
  39. return eventLoopFuture
  40. case .closedOrInvalidAddress(let runtimeError):
  41. throw runtimeError
  42. }
  43. }
  44. }
  45. enum OnBound {
  46. case succeedPromise(
  47. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  48. address: GRPCHTTP2Core.SocketAddress
  49. )
  50. case failPromise(
  51. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  52. error: RuntimeError
  53. )
  54. }
  55. mutating func addressBound(
  56. _ address: NIOCore.SocketAddress?,
  57. userProvidedAddress: GRPCHTTP2Core.SocketAddress
  58. ) -> OnBound {
  59. switch self {
  60. case .idle(let listeningAddressPromise):
  61. if let address {
  62. self = .listening(listeningAddressPromise.futureResult)
  63. return .succeedPromise(
  64. listeningAddressPromise,
  65. address: GRPCHTTP2Core.SocketAddress(address)
  66. )
  67. } else if userProvidedAddress.virtualSocket != nil {
  68. self = .listening(listeningAddressPromise.futureResult)
  69. return .succeedPromise(listeningAddressPromise, address: userProvidedAddress)
  70. } else {
  71. assertionFailure("Unknown address type")
  72. let invalidAddressError = RuntimeError(
  73. code: .transportError,
  74. message: "Unknown address type returned by transport."
  75. )
  76. self = .closedOrInvalidAddress(invalidAddressError)
  77. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  78. }
  79. case .listening, .closedOrInvalidAddress:
  80. fatalError(
  81. "Invalid state: addressBound should only be called once and when in idle state"
  82. )
  83. }
  84. }
  85. enum OnClose {
  86. case failPromise(
  87. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  88. error: RuntimeError
  89. )
  90. case doNothing
  91. }
  92. mutating func close() -> OnClose {
  93. let serverStoppedError = RuntimeError(
  94. code: .serverIsStopped,
  95. message: """
  96. There is no listening address bound for this server: there may have been \
  97. an error which caused the transport to close, or it may have shut down.
  98. """
  99. )
  100. switch self {
  101. case .idle(let listeningAddressPromise):
  102. self = .closedOrInvalidAddress(serverStoppedError)
  103. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  104. case .listening:
  105. self = .closedOrInvalidAddress(serverStoppedError)
  106. return .doNothing
  107. case .closedOrInvalidAddress:
  108. return .doNothing
  109. }
  110. }
  111. }
  112. private let listeningAddressState: _LockedValueBox<State>
  113. /// The listening address for this server transport.
  114. ///
  115. /// It is an `async` property because it will only return once the address has been successfully bound.
  116. ///
  117. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  118. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  119. /// invalid address.
  120. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  121. get async throws {
  122. try await self.listeningAddressState
  123. .withLockedValue { try $0.listeningAddressFuture }
  124. .get()
  125. }
  126. }
  127. /// Create a new `Posix` transport.
  128. ///
  129. /// - Parameters:
  130. /// - address: The address to which the server should be bound.
  131. /// - config: The transport configuration.
  132. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  133. public init(
  134. address: GRPCHTTP2Core.SocketAddress,
  135. config: Config = .defaults,
  136. eventLoopGroup: MultiThreadedEventLoopGroup = .singletonMultiThreadedEventLoopGroup
  137. ) {
  138. self.address = address
  139. self.config = config
  140. self.eventLoopGroup = eventLoopGroup
  141. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  142. let eventLoop = eventLoopGroup.any()
  143. self.listeningAddressState = _LockedValueBox(.idle(eventLoop.makePromise()))
  144. }
  145. public func listen(
  146. _ streamHandler: @escaping (RPCStream<Inbound, Outbound>) async -> Void
  147. ) async throws {
  148. defer {
  149. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  150. case .failPromise(let promise, let error):
  151. promise.fail(error)
  152. case .doNothing:
  153. ()
  154. }
  155. }
  156. let serverChannel = try await ServerBootstrap(group: self.eventLoopGroup)
  157. .serverChannelInitializer { channel in
  158. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  159. channel: channel
  160. )
  161. return channel.pipeline.addHandler(quiescingHandler)
  162. }
  163. .bind(to: self.address) { channel in
  164. channel.eventLoop.makeCompletedFuture {
  165. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  166. channel: channel,
  167. compressionConfig: self.config.compression,
  168. connectionConfig: self.config.connection,
  169. http2Config: self.config.http2,
  170. rpcConfig: self.config.rpc,
  171. useTLS: false
  172. )
  173. }
  174. }
  175. let action = self.listeningAddressState.withLockedValue {
  176. $0.addressBound(
  177. serverChannel.channel.localAddress,
  178. userProvidedAddress: self.address
  179. )
  180. }
  181. switch action {
  182. case .succeedPromise(let promise, let address):
  183. promise.succeed(address)
  184. case .failPromise(let promise, let error):
  185. promise.fail(error)
  186. }
  187. try await serverChannel.executeThenClose { inbound in
  188. try await withThrowingDiscardingTaskGroup { serverTaskGroup in
  189. for try await (connectionChannel, streamMultiplexer) in inbound {
  190. serverTaskGroup.addTask {
  191. try await connectionChannel
  192. .executeThenClose { connectionInbound, connectionOutbound in
  193. await withDiscardingTaskGroup { connectionTaskGroup in
  194. connectionTaskGroup.addTask {
  195. do {
  196. for try await _ in connectionInbound {}
  197. } catch {
  198. // We don't want to close the channel if one connection throws.
  199. return
  200. }
  201. }
  202. connectionTaskGroup.addTask {
  203. await withDiscardingTaskGroup { streamTaskGroup in
  204. do {
  205. for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
  206. {
  207. streamTaskGroup.addTask {
  208. // It's okay to ignore these errors:
  209. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  210. // - If we get an error because the inner closure threw, then the only possible scenario in which
  211. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  212. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  213. try? await http2Stream.executeThenClose { inbound, outbound in
  214. guard let descriptor = try? await methodDescriptor.get() else {
  215. return
  216. }
  217. let rpcStream = RPCStream(
  218. descriptor: descriptor,
  219. inbound: RPCAsyncSequence(wrapping: inbound),
  220. outbound: RPCWriter.Closable(
  221. wrapping: ServerConnection.Stream.Outbound(
  222. responseWriter: outbound,
  223. http2Stream: http2Stream
  224. )
  225. )
  226. )
  227. await streamHandler(rpcStream)
  228. }
  229. }
  230. }
  231. } catch {
  232. // We don't want to close the whole connection if one stream throws.
  233. return
  234. }
  235. }
  236. }
  237. }
  238. }
  239. }
  240. }
  241. }
  242. }
  243. }
  244. public func stopListening() {
  245. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  246. }
  247. }
  248. }
  249. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  250. extension HTTP2ServerTransport.Posix {
  251. /// Configuration for the ``GRPCHTTP2TransportNIOPosix/GRPCHTTP2Core/HTTP2ServerTransport/Posix``.
  252. public struct Config: Sendable {
  253. /// Compression configuration.
  254. public var compression: HTTP2ServerTransport.Config.Compression
  255. /// Connection configuration.
  256. public var connection: HTTP2ServerTransport.Config.Connection
  257. /// HTTP2 configuration.
  258. public var http2: HTTP2ServerTransport.Config.HTTP2
  259. /// RPC configuration.
  260. public var rpc: HTTP2ServerTransport.Config.RPC
  261. /// Construct a new `Config`.
  262. /// - Parameters:
  263. /// - compression: Compression configuration.
  264. /// - connection: Connection configuration.
  265. /// - http2: HTTP2 configuration.
  266. /// - rpc: RPC configuration.
  267. public init(
  268. compression: HTTP2ServerTransport.Config.Compression,
  269. connection: HTTP2ServerTransport.Config.Connection,
  270. http2: HTTP2ServerTransport.Config.HTTP2,
  271. rpc: HTTP2ServerTransport.Config.RPC
  272. ) {
  273. self.compression = compression
  274. self.connection = connection
  275. self.http2 = http2
  276. self.rpc = rpc
  277. }
  278. /// Default values for the different configurations.
  279. public static var defaults: Self {
  280. Self(
  281. compression: .defaults,
  282. connection: .defaults,
  283. http2: .defaults,
  284. rpc: .defaults
  285. )
  286. }
  287. }
  288. }
  289. extension NIOCore.SocketAddress {
  290. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  291. if let ipv4 = socketAddress.ipv4 {
  292. self = try Self(ipv4)
  293. } else if let ipv6 = socketAddress.ipv6 {
  294. self = try Self(ipv6)
  295. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  296. self = try Self(unixDomainSocket)
  297. } else {
  298. throw RPCError(
  299. code: .internalError,
  300. message:
  301. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  302. )
  303. }
  304. }
  305. }
  306. extension ServerBootstrap {
  307. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  308. fileprivate func bind<Output: Sendable>(
  309. to address: GRPCHTTP2Core.SocketAddress,
  310. childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
  311. ) async throws -> NIOAsyncChannel<Output, Never> {
  312. if let virtualSocket = address.virtualSocket {
  313. return try await self.bind(
  314. to: VsockAddress(virtualSocket),
  315. childChannelInitializer: childChannelInitializer
  316. )
  317. } else {
  318. return try await self.bind(
  319. to: NIOCore.SocketAddress(address),
  320. childChannelInitializer: childChannelInitializer
  321. )
  322. }
  323. }
  324. }