HTTP2ServerTransport+TransportServices.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(Network)
  17. public import GRPCCore
  18. public import NIOTransportServices // has to be public because of default argument value in init
  19. public import GRPCHTTP2Core
  20. internal import NIOCore
  21. internal import NIOExtras
  22. internal import NIOHTTP2
  23. extension HTTP2ServerTransport {
  24. /// A NIO Transport Services-backed implementation of a server transport.
  25. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  26. public struct TransportServices: ServerTransport, ListeningServerTransport {
  27. private let address: GRPCHTTP2Core.SocketAddress
  28. private let config: Config
  29. private let eventLoopGroup: NIOTSEventLoopGroup
  30. private let serverQuiescingHelper: ServerQuiescingHelper
  31. private enum State {
  32. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  33. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  34. case closedOrInvalidAddress(RuntimeError)
  35. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  36. get throws {
  37. switch self {
  38. case .idle(let eventLoopPromise):
  39. return eventLoopPromise.futureResult
  40. case .listening(let eventLoopFuture):
  41. return eventLoopFuture
  42. case .closedOrInvalidAddress(let runtimeError):
  43. throw runtimeError
  44. }
  45. }
  46. }
  47. enum OnBound {
  48. case succeedPromise(
  49. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  50. address: GRPCHTTP2Core.SocketAddress
  51. )
  52. case failPromise(
  53. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  54. error: RuntimeError
  55. )
  56. }
  57. mutating func addressBound(_ address: NIOCore.SocketAddress?) -> OnBound {
  58. switch self {
  59. case .idle(let listeningAddressPromise):
  60. if let address {
  61. self = .listening(listeningAddressPromise.futureResult)
  62. return .succeedPromise(
  63. listeningAddressPromise,
  64. address: GRPCHTTP2Core.SocketAddress(address)
  65. )
  66. } else {
  67. assertionFailure("Unknown address type")
  68. let invalidAddressError = RuntimeError(
  69. code: .transportError,
  70. message: "Unknown address type returned by transport."
  71. )
  72. self = .closedOrInvalidAddress(invalidAddressError)
  73. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  74. }
  75. case .listening, .closedOrInvalidAddress:
  76. fatalError(
  77. "Invalid state: addressBound should only be called once and when in idle state"
  78. )
  79. }
  80. }
  81. enum OnClose {
  82. case failPromise(
  83. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  84. error: RuntimeError
  85. )
  86. case doNothing
  87. }
  88. mutating func close() -> OnClose {
  89. let serverStoppedError = RuntimeError(
  90. code: .serverIsStopped,
  91. message: """
  92. There is no listening address bound for this server: there may have been \
  93. an error which caused the transport to close, or it may have shut down.
  94. """
  95. )
  96. switch self {
  97. case .idle(let listeningAddressPromise):
  98. self = .closedOrInvalidAddress(serverStoppedError)
  99. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  100. case .listening:
  101. self = .closedOrInvalidAddress(serverStoppedError)
  102. return .doNothing
  103. case .closedOrInvalidAddress:
  104. return .doNothing
  105. }
  106. }
  107. }
  108. private let listeningAddressState: LockedValueBox<State>
  109. /// The listening address for this server transport.
  110. ///
  111. /// It is an `async` property because it will only return once the address has been successfully bound.
  112. ///
  113. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  114. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  115. /// invalid address.
  116. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  117. get async throws {
  118. try await self.listeningAddressState
  119. .withLockedValue { try $0.listeningAddressFuture }
  120. .get()
  121. }
  122. }
  123. /// Create a new `TransportServices` transport.
  124. ///
  125. /// - Parameters:
  126. /// - address: The address to which the server should be bound.
  127. /// - config: The transport configuration.
  128. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  129. public init(
  130. address: GRPCHTTP2Core.SocketAddress,
  131. config: Config,
  132. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  133. ) {
  134. self.address = address
  135. self.config = config
  136. self.eventLoopGroup = eventLoopGroup
  137. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  138. let eventLoop = eventLoopGroup.any()
  139. self.listeningAddressState = LockedValueBox(.idle(eventLoop.makePromise()))
  140. }
  141. public func listen(
  142. _ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  143. ) async throws {
  144. defer {
  145. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  146. case .failPromise(let promise, let error):
  147. promise.fail(error)
  148. case .doNothing:
  149. ()
  150. }
  151. }
  152. let serverChannel = try await NIOTSListenerBootstrap(group: self.eventLoopGroup)
  153. .serverChannelOption(
  154. ChannelOptions.socketOption(.so_reuseaddr),
  155. value: 1
  156. )
  157. .serverChannelInitializer { channel in
  158. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  159. channel: channel
  160. )
  161. return channel.pipeline.addHandler(quiescingHandler)
  162. }
  163. .bind(to: self.address) { channel in
  164. channel.eventLoop.makeCompletedFuture {
  165. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  166. channel: channel,
  167. compressionConfig: self.config.compression,
  168. connectionConfig: self.config.connection,
  169. http2Config: self.config.http2,
  170. rpcConfig: self.config.rpc,
  171. requireALPN: false,
  172. scheme: .http
  173. )
  174. }
  175. }
  176. let action = self.listeningAddressState.withLockedValue {
  177. $0.addressBound(serverChannel.channel.localAddress)
  178. }
  179. switch action {
  180. case .succeedPromise(let promise, let address):
  181. promise.succeed(address)
  182. case .failPromise(let promise, let error):
  183. promise.fail(error)
  184. }
  185. try await serverChannel.executeThenClose { inbound in
  186. try await withThrowingDiscardingTaskGroup { group in
  187. for try await (connectionChannel, streamMultiplexer) in inbound {
  188. group.addTask {
  189. try await self.handleConnection(
  190. connectionChannel,
  191. multiplexer: streamMultiplexer,
  192. streamHandler: streamHandler
  193. )
  194. }
  195. }
  196. }
  197. }
  198. }
  199. private func handleConnection(
  200. _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
  201. multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
  202. streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  203. ) async throws {
  204. try await connection.executeThenClose { inbound, _ in
  205. await withDiscardingTaskGroup { group in
  206. group.addTask {
  207. do {
  208. for try await _ in inbound {}
  209. } catch {
  210. // We don't want to close the channel if one connection throws.
  211. return
  212. }
  213. }
  214. do {
  215. for try await (stream, descriptor) in multiplexer.inbound {
  216. group.addTask {
  217. await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
  218. }
  219. }
  220. } catch {
  221. return
  222. }
  223. }
  224. }
  225. }
  226. private func handleStream(
  227. _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
  228. handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
  229. descriptor: EventLoopFuture<MethodDescriptor>
  230. ) async {
  231. // It's okay to ignore these errors:
  232. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  233. // - If we get an error because the inner closure threw, then the only possible scenario in which
  234. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  235. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  236. try? await stream.executeThenClose { inbound, outbound in
  237. guard let descriptor = try? await descriptor.get() else {
  238. return
  239. }
  240. let rpcStream = RPCStream(
  241. descriptor: descriptor,
  242. inbound: RPCAsyncSequence(wrapping: inbound),
  243. outbound: RPCWriter.Closable(
  244. wrapping: ServerConnection.Stream.Outbound(
  245. responseWriter: outbound,
  246. http2Stream: stream
  247. )
  248. )
  249. )
  250. await streamHandler(rpcStream)
  251. }
  252. }
  253. public func stopListening() {
  254. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  255. }
  256. }
  257. }
  258. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  259. extension HTTP2ServerTransport.TransportServices {
  260. /// Configuration for the `TransportServices` transport.
  261. public struct Config: Sendable {
  262. /// Compression configuration.
  263. public var compression: HTTP2ServerTransport.Config.Compression
  264. /// Connection configuration.
  265. public var connection: HTTP2ServerTransport.Config.Connection
  266. /// HTTP2 configuration.
  267. public var http2: HTTP2ServerTransport.Config.HTTP2
  268. /// RPC configuration.
  269. public var rpc: HTTP2ServerTransport.Config.RPC
  270. /// Construct a new `Config`.
  271. /// - Parameters:
  272. /// - compression: Compression configuration.
  273. /// - connection: Connection configuration.
  274. /// - http2: HTTP2 configuration.
  275. /// - rpc: RPC configuration.
  276. public init(
  277. compression: HTTP2ServerTransport.Config.Compression,
  278. connection: HTTP2ServerTransport.Config.Connection,
  279. http2: HTTP2ServerTransport.Config.HTTP2,
  280. rpc: HTTP2ServerTransport.Config.RPC
  281. ) {
  282. self.compression = compression
  283. self.connection = connection
  284. self.http2 = http2
  285. self.rpc = rpc
  286. }
  287. /// Default values for the different configurations.
  288. ///
  289. /// - Parameter configure: A closure which allows you to modify the defaults before
  290. /// returning them.
  291. public static func defaults(configure: (_ config: inout Self) -> Void = { _ in }) -> Self {
  292. var config = Self(
  293. compression: .defaults,
  294. connection: .defaults,
  295. http2: .defaults,
  296. rpc: .defaults
  297. )
  298. configure(&config)
  299. return config
  300. }
  301. }
  302. }
  303. extension NIOCore.SocketAddress {
  304. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  305. if let ipv4 = socketAddress.ipv4 {
  306. self = try Self(ipv4)
  307. } else if let ipv6 = socketAddress.ipv6 {
  308. self = try Self(ipv6)
  309. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  310. self = try Self(unixDomainSocket)
  311. } else {
  312. throw RPCError(
  313. code: .internalError,
  314. message:
  315. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  316. )
  317. }
  318. }
  319. }
  320. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  321. extension NIOTSListenerBootstrap {
  322. fileprivate func bind<Output: Sendable>(
  323. to address: GRPCHTTP2Core.SocketAddress,
  324. childChannelInitializer: @escaping @Sendable (any Channel) -> EventLoopFuture<Output>
  325. ) async throws -> NIOAsyncChannel<Output, Never> {
  326. if address.virtualSocket != nil {
  327. throw RuntimeError(
  328. code: .transportError,
  329. message: """
  330. Virtual sockets are not supported by 'HTTP2ServerTransport.TransportServices'. \
  331. Please use the 'HTTP2ServerTransport.Posix' transport.
  332. """
  333. )
  334. } else {
  335. return try await self.bind(
  336. to: NIOCore.SocketAddress(address),
  337. childChannelInitializer: childChannelInitializer
  338. )
  339. }
  340. }
  341. }
  342. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  343. extension ServerTransport where Self == HTTP2ServerTransport.TransportServices {
  344. /// Create a new `TransportServices` based HTTP/2 server transport.
  345. ///
  346. /// - Parameters:
  347. /// - address: The address to which the server should be bound.
  348. /// - config: The transport configuration.
  349. /// - eventLoopGroup: The underlying NIO `EventLoopGroup` to the server on. This must
  350. /// be a `NIOTSEventLoopGroup` or an `EventLoop` from a `NIOTSEventLoopGroup`.
  351. public static func http2NIOTS(
  352. address: GRPCHTTP2Core.SocketAddress,
  353. config: HTTP2ServerTransport.TransportServices.Config,
  354. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  355. ) -> Self {
  356. return HTTP2ServerTransport.TransportServices(
  357. address: address,
  358. config: config,
  359. eventLoopGroup: eventLoopGroup
  360. )
  361. }
  362. }
  363. #endif