HTTP2ServerTransport+TransportServices.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(Network)
  17. public import GRPCCore
  18. public import NIOTransportServices // has to be public because of default argument value in init
  19. public import GRPCHTTP2Core
  20. internal import NIOCore
  21. internal import NIOExtras
  22. internal import NIOHTTP2
  23. private import Synchronization
  24. extension HTTP2ServerTransport {
  25. /// A NIO Transport Services-backed implementation of a server transport.
  26. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  27. public final class TransportServices: ServerTransport, ListeningServerTransport {
  28. private let address: GRPCHTTP2Core.SocketAddress
  29. private let config: Config
  30. private let eventLoopGroup: NIOTSEventLoopGroup
  31. private let serverQuiescingHelper: ServerQuiescingHelper
  32. private enum State {
  33. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  34. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  35. case closedOrInvalidAddress(RuntimeError)
  36. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  37. get throws {
  38. switch self {
  39. case .idle(let eventLoopPromise):
  40. return eventLoopPromise.futureResult
  41. case .listening(let eventLoopFuture):
  42. return eventLoopFuture
  43. case .closedOrInvalidAddress(let runtimeError):
  44. throw runtimeError
  45. }
  46. }
  47. }
  48. enum OnBound {
  49. case succeedPromise(
  50. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  51. address: GRPCHTTP2Core.SocketAddress
  52. )
  53. case failPromise(
  54. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  55. error: RuntimeError
  56. )
  57. }
  58. mutating func addressBound(_ address: NIOCore.SocketAddress?) -> OnBound {
  59. switch self {
  60. case .idle(let listeningAddressPromise):
  61. if let address {
  62. self = .listening(listeningAddressPromise.futureResult)
  63. return .succeedPromise(
  64. listeningAddressPromise,
  65. address: GRPCHTTP2Core.SocketAddress(address)
  66. )
  67. } else {
  68. assertionFailure("Unknown address type")
  69. let invalidAddressError = RuntimeError(
  70. code: .transportError,
  71. message: "Unknown address type returned by transport."
  72. )
  73. self = .closedOrInvalidAddress(invalidAddressError)
  74. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  75. }
  76. case .listening, .closedOrInvalidAddress:
  77. fatalError(
  78. "Invalid state: addressBound should only be called once and when in idle state"
  79. )
  80. }
  81. }
  82. enum OnClose {
  83. case failPromise(
  84. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  85. error: RuntimeError
  86. )
  87. case doNothing
  88. }
  89. mutating func close() -> OnClose {
  90. let serverStoppedError = RuntimeError(
  91. code: .serverIsStopped,
  92. message: """
  93. There is no listening address bound for this server: there may have been \
  94. an error which caused the transport to close, or it may have shut down.
  95. """
  96. )
  97. switch self {
  98. case .idle(let listeningAddressPromise):
  99. self = .closedOrInvalidAddress(serverStoppedError)
  100. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  101. case .listening:
  102. self = .closedOrInvalidAddress(serverStoppedError)
  103. return .doNothing
  104. case .closedOrInvalidAddress:
  105. return .doNothing
  106. }
  107. }
  108. }
  109. private let listeningAddressState: Mutex<State>
  110. /// The listening address for this server transport.
  111. ///
  112. /// It is an `async` property because it will only return once the address has been successfully bound.
  113. ///
  114. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  115. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  116. /// invalid address.
  117. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  118. get async throws {
  119. try await self.listeningAddressState
  120. .withLock { try $0.listeningAddressFuture }
  121. .get()
  122. }
  123. }
  124. /// Create a new `TransportServices` transport.
  125. ///
  126. /// - Parameters:
  127. /// - address: The address to which the server should be bound.
  128. /// - config: The transport configuration.
  129. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  130. public init(
  131. address: GRPCHTTP2Core.SocketAddress,
  132. config: Config,
  133. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  134. ) {
  135. self.address = address
  136. self.config = config
  137. self.eventLoopGroup = eventLoopGroup
  138. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  139. let eventLoop = eventLoopGroup.any()
  140. self.listeningAddressState = Mutex(.idle(eventLoop.makePromise()))
  141. }
  142. public func listen(
  143. _ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  144. ) async throws {
  145. defer {
  146. switch self.listeningAddressState.withLock({ $0.close() }) {
  147. case .failPromise(let promise, let error):
  148. promise.fail(error)
  149. case .doNothing:
  150. ()
  151. }
  152. }
  153. let serverChannel = try await NIOTSListenerBootstrap(group: self.eventLoopGroup)
  154. .serverChannelOption(
  155. ChannelOptions.socketOption(.so_reuseaddr),
  156. value: 1
  157. )
  158. .serverChannelInitializer { channel in
  159. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  160. channel: channel
  161. )
  162. return channel.pipeline.addHandler(quiescingHandler)
  163. }
  164. .bind(to: self.address) { channel in
  165. channel.eventLoop.makeCompletedFuture {
  166. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  167. channel: channel,
  168. compressionConfig: self.config.compression,
  169. connectionConfig: self.config.connection,
  170. http2Config: self.config.http2,
  171. rpcConfig: self.config.rpc,
  172. requireALPN: false,
  173. scheme: .http
  174. )
  175. }
  176. }
  177. let action = self.listeningAddressState.withLock {
  178. $0.addressBound(serverChannel.channel.localAddress)
  179. }
  180. switch action {
  181. case .succeedPromise(let promise, let address):
  182. promise.succeed(address)
  183. case .failPromise(let promise, let error):
  184. promise.fail(error)
  185. }
  186. try await serverChannel.executeThenClose { inbound in
  187. try await withThrowingDiscardingTaskGroup { group in
  188. for try await (connectionChannel, streamMultiplexer) in inbound {
  189. group.addTask {
  190. try await self.handleConnection(
  191. connectionChannel,
  192. multiplexer: streamMultiplexer,
  193. streamHandler: streamHandler
  194. )
  195. }
  196. }
  197. }
  198. }
  199. }
  200. private func handleConnection(
  201. _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
  202. multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
  203. streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  204. ) async throws {
  205. try await connection.executeThenClose { inbound, _ in
  206. await withDiscardingTaskGroup { group in
  207. group.addTask {
  208. do {
  209. for try await _ in inbound {}
  210. } catch {
  211. // We don't want to close the channel if one connection throws.
  212. return
  213. }
  214. }
  215. do {
  216. for try await (stream, descriptor) in multiplexer.inbound {
  217. group.addTask {
  218. await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
  219. }
  220. }
  221. } catch {
  222. return
  223. }
  224. }
  225. }
  226. }
  227. private func handleStream(
  228. _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
  229. handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
  230. descriptor: EventLoopFuture<MethodDescriptor>
  231. ) async {
  232. // It's okay to ignore these errors:
  233. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  234. // - If we get an error because the inner closure threw, then the only possible scenario in which
  235. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  236. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  237. try? await stream.executeThenClose { inbound, outbound in
  238. guard let descriptor = try? await descriptor.get() else {
  239. return
  240. }
  241. let rpcStream = RPCStream(
  242. descriptor: descriptor,
  243. inbound: RPCAsyncSequence(wrapping: inbound),
  244. outbound: RPCWriter.Closable(
  245. wrapping: ServerConnection.Stream.Outbound(
  246. responseWriter: outbound,
  247. http2Stream: stream
  248. )
  249. )
  250. )
  251. await streamHandler(rpcStream)
  252. }
  253. }
  254. public func beginGracefulShutdown() {
  255. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  256. }
  257. }
  258. }
  259. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  260. extension HTTP2ServerTransport.TransportServices {
  261. /// Configuration for the `TransportServices` transport.
  262. public struct Config: Sendable {
  263. /// Compression configuration.
  264. public var compression: HTTP2ServerTransport.Config.Compression
  265. /// Connection configuration.
  266. public var connection: HTTP2ServerTransport.Config.Connection
  267. /// HTTP2 configuration.
  268. public var http2: HTTP2ServerTransport.Config.HTTP2
  269. /// RPC configuration.
  270. public var rpc: HTTP2ServerTransport.Config.RPC
  271. /// Construct a new `Config`.
  272. /// - Parameters:
  273. /// - compression: Compression configuration.
  274. /// - connection: Connection configuration.
  275. /// - http2: HTTP2 configuration.
  276. /// - rpc: RPC configuration.
  277. public init(
  278. compression: HTTP2ServerTransport.Config.Compression,
  279. connection: HTTP2ServerTransport.Config.Connection,
  280. http2: HTTP2ServerTransport.Config.HTTP2,
  281. rpc: HTTP2ServerTransport.Config.RPC
  282. ) {
  283. self.compression = compression
  284. self.connection = connection
  285. self.http2 = http2
  286. self.rpc = rpc
  287. }
  288. /// Default values for the different configurations.
  289. ///
  290. /// - Parameter configure: A closure which allows you to modify the defaults before
  291. /// returning them.
  292. public static func defaults(configure: (_ config: inout Self) -> Void = { _ in }) -> Self {
  293. var config = Self(
  294. compression: .defaults,
  295. connection: .defaults,
  296. http2: .defaults,
  297. rpc: .defaults
  298. )
  299. configure(&config)
  300. return config
  301. }
  302. }
  303. }
  304. extension NIOCore.SocketAddress {
  305. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  306. if let ipv4 = socketAddress.ipv4 {
  307. self = try Self(ipv4)
  308. } else if let ipv6 = socketAddress.ipv6 {
  309. self = try Self(ipv6)
  310. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  311. self = try Self(unixDomainSocket)
  312. } else {
  313. throw RPCError(
  314. code: .internalError,
  315. message:
  316. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  317. )
  318. }
  319. }
  320. }
  321. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  322. extension NIOTSListenerBootstrap {
  323. fileprivate func bind<Output: Sendable>(
  324. to address: GRPCHTTP2Core.SocketAddress,
  325. childChannelInitializer: @escaping @Sendable (any Channel) -> EventLoopFuture<Output>
  326. ) async throws -> NIOAsyncChannel<Output, Never> {
  327. if address.virtualSocket != nil {
  328. throw RuntimeError(
  329. code: .transportError,
  330. message: """
  331. Virtual sockets are not supported by 'HTTP2ServerTransport.TransportServices'. \
  332. Please use the 'HTTP2ServerTransport.Posix' transport.
  333. """
  334. )
  335. } else {
  336. return try await self.bind(
  337. to: NIOCore.SocketAddress(address),
  338. childChannelInitializer: childChannelInitializer
  339. )
  340. }
  341. }
  342. }
  343. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  344. extension ServerTransport where Self == HTTP2ServerTransport.TransportServices {
  345. /// Create a new `TransportServices` based HTTP/2 server transport.
  346. ///
  347. /// - Parameters:
  348. /// - address: The address to which the server should be bound.
  349. /// - config: The transport configuration.
  350. /// - eventLoopGroup: The underlying NIO `EventLoopGroup` to the server on. This must
  351. /// be a `NIOTSEventLoopGroup` or an `EventLoop` from a `NIOTSEventLoopGroup`.
  352. public static func http2NIOTS(
  353. address: GRPCHTTP2Core.SocketAddress,
  354. config: HTTP2ServerTransport.TransportServices.Config,
  355. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  356. ) -> Self {
  357. return HTTP2ServerTransport.TransportServices(
  358. address: address,
  359. config: config,
  360. eventLoopGroup: eventLoopGroup
  361. )
  362. }
  363. }
  364. #endif