HTTP2ServerTransport+TransportServices.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(Network)
  17. public import GRPCCore
  18. public import NIOTransportServices // has to be public because of default argument value in init
  19. public import GRPCHTTP2Core
  20. internal import NIOCore
  21. internal import NIOExtras
  22. internal import NIOHTTP2
  23. extension HTTP2ServerTransport {
  24. /// A NIO Transport Services-backed implementation of a server transport.
  25. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  26. public struct TransportServices: ServerTransport {
  27. private let address: GRPCHTTP2Core.SocketAddress
  28. private let config: Config
  29. private let eventLoopGroup: NIOTSEventLoopGroup
  30. private let serverQuiescingHelper: ServerQuiescingHelper
  31. private enum State {
  32. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  33. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  34. case closedOrInvalidAddress(RuntimeError)
  35. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  36. get throws {
  37. switch self {
  38. case .idle(let eventLoopPromise):
  39. return eventLoopPromise.futureResult
  40. case .listening(let eventLoopFuture):
  41. return eventLoopFuture
  42. case .closedOrInvalidAddress(let runtimeError):
  43. throw runtimeError
  44. }
  45. }
  46. }
  47. enum OnBound {
  48. case succeedPromise(
  49. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  50. address: GRPCHTTP2Core.SocketAddress
  51. )
  52. case failPromise(
  53. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  54. error: RuntimeError
  55. )
  56. }
  57. mutating func addressBound(_ address: NIOCore.SocketAddress?) -> OnBound {
  58. switch self {
  59. case .idle(let listeningAddressPromise):
  60. if let address {
  61. self = .listening(listeningAddressPromise.futureResult)
  62. return .succeedPromise(
  63. listeningAddressPromise,
  64. address: GRPCHTTP2Core.SocketAddress(address)
  65. )
  66. } else {
  67. assertionFailure("Unknown address type")
  68. let invalidAddressError = RuntimeError(
  69. code: .transportError,
  70. message: "Unknown address type returned by transport."
  71. )
  72. self = .closedOrInvalidAddress(invalidAddressError)
  73. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  74. }
  75. case .listening, .closedOrInvalidAddress:
  76. fatalError(
  77. "Invalid state: addressBound should only be called once and when in idle state"
  78. )
  79. }
  80. }
  81. enum OnClose {
  82. case failPromise(
  83. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  84. error: RuntimeError
  85. )
  86. case doNothing
  87. }
  88. mutating func close() -> OnClose {
  89. let serverStoppedError = RuntimeError(
  90. code: .serverIsStopped,
  91. message: """
  92. There is no listening address bound for this server: there may have been \
  93. an error which caused the transport to close, or it may have shut down.
  94. """
  95. )
  96. switch self {
  97. case .idle(let listeningAddressPromise):
  98. self = .closedOrInvalidAddress(serverStoppedError)
  99. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  100. case .listening:
  101. self = .closedOrInvalidAddress(serverStoppedError)
  102. return .doNothing
  103. case .closedOrInvalidAddress:
  104. return .doNothing
  105. }
  106. }
  107. }
  108. private let listeningAddressState: LockedValueBox<State>
  109. /// The listening address for this server transport.
  110. ///
  111. /// It is an `async` property because it will only return once the address has been successfully bound.
  112. ///
  113. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  114. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  115. /// invalid address.
  116. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  117. get async throws {
  118. try await self.listeningAddressState
  119. .withLockedValue { try $0.listeningAddressFuture }
  120. .get()
  121. }
  122. }
  123. /// Create a new `TransportServices` transport.
  124. ///
  125. /// - Parameters:
  126. /// - address: The address to which the server should be bound.
  127. /// - config: The transport configuration.
  128. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  129. public init(
  130. address: GRPCHTTP2Core.SocketAddress,
  131. config: Config = .defaults,
  132. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  133. ) {
  134. self.address = address
  135. self.config = config
  136. self.eventLoopGroup = eventLoopGroup
  137. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  138. let eventLoop = eventLoopGroup.any()
  139. self.listeningAddressState = LockedValueBox(.idle(eventLoop.makePromise()))
  140. }
  141. public func listen(
  142. _ streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  143. ) async throws {
  144. defer {
  145. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  146. case .failPromise(let promise, let error):
  147. promise.fail(error)
  148. case .doNothing:
  149. ()
  150. }
  151. }
  152. let serverChannel = try await NIOTSListenerBootstrap(group: self.eventLoopGroup)
  153. .serverChannelOption(
  154. ChannelOptions.socketOption(.so_reuseaddr),
  155. value: 1
  156. )
  157. .serverChannelInitializer { channel in
  158. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  159. channel: channel
  160. )
  161. return channel.pipeline.addHandler(quiescingHandler)
  162. }
  163. .bind(to: self.address) { channel in
  164. channel.eventLoop.makeCompletedFuture {
  165. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  166. channel: channel,
  167. compressionConfig: self.config.compression,
  168. connectionConfig: self.config.connection,
  169. http2Config: self.config.http2,
  170. rpcConfig: self.config.rpc,
  171. useTLS: false
  172. )
  173. }
  174. }
  175. let action = self.listeningAddressState.withLockedValue {
  176. $0.addressBound(serverChannel.channel.localAddress)
  177. }
  178. switch action {
  179. case .succeedPromise(let promise, let address):
  180. promise.succeed(address)
  181. case .failPromise(let promise, let error):
  182. promise.fail(error)
  183. }
  184. try await serverChannel.executeThenClose { inbound in
  185. try await withThrowingDiscardingTaskGroup { group in
  186. for try await (connectionChannel, streamMultiplexer) in inbound {
  187. group.addTask {
  188. try await self.handleConnection(
  189. connectionChannel,
  190. multiplexer: streamMultiplexer,
  191. streamHandler: streamHandler
  192. )
  193. }
  194. }
  195. }
  196. }
  197. }
  198. private func handleConnection(
  199. _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
  200. multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
  201. streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
  202. ) async throws {
  203. try await connection.executeThenClose { inbound, _ in
  204. await withDiscardingTaskGroup { group in
  205. group.addTask {
  206. do {
  207. for try await _ in inbound {}
  208. } catch {
  209. // We don't want to close the channel if one connection throws.
  210. return
  211. }
  212. }
  213. do {
  214. for try await (stream, descriptor) in multiplexer.inbound {
  215. group.addTask {
  216. await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
  217. }
  218. }
  219. } catch {
  220. return
  221. }
  222. }
  223. }
  224. }
  225. private func handleStream(
  226. _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
  227. handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
  228. descriptor: EventLoopFuture<MethodDescriptor>
  229. ) async {
  230. // It's okay to ignore these errors:
  231. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  232. // - If we get an error because the inner closure threw, then the only possible scenario in which
  233. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  234. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  235. try? await stream.executeThenClose { inbound, outbound in
  236. guard let descriptor = try? await descriptor.get() else {
  237. return
  238. }
  239. let rpcStream = RPCStream(
  240. descriptor: descriptor,
  241. inbound: RPCAsyncSequence(wrapping: inbound),
  242. outbound: RPCWriter.Closable(
  243. wrapping: ServerConnection.Stream.Outbound(
  244. responseWriter: outbound,
  245. http2Stream: stream
  246. )
  247. )
  248. )
  249. await streamHandler(rpcStream)
  250. }
  251. }
  252. public func stopListening() {
  253. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  254. }
  255. }
  256. }
  257. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  258. extension HTTP2ServerTransport.TransportServices {
  259. /// Configuration for the ``GRPCHTTP2TransportNIOTransportServices/GRPCHTTP2Core/HTTP2ServerTransport/TransportServices``.
  260. public struct Config: Sendable {
  261. /// Compression configuration.
  262. public var compression: HTTP2ServerTransport.Config.Compression
  263. /// Connection configuration.
  264. public var connection: HTTP2ServerTransport.Config.Connection
  265. /// HTTP2 configuration.
  266. public var http2: HTTP2ServerTransport.Config.HTTP2
  267. /// RPC configuration.
  268. public var rpc: HTTP2ServerTransport.Config.RPC
  269. /// Construct a new `Config`.
  270. /// - Parameters:
  271. /// - compression: Compression configuration.
  272. /// - connection: Connection configuration.
  273. /// - http2: HTTP2 configuration.
  274. /// - rpc: RPC configuration.
  275. public init(
  276. compression: HTTP2ServerTransport.Config.Compression,
  277. connection: HTTP2ServerTransport.Config.Connection,
  278. http2: HTTP2ServerTransport.Config.HTTP2,
  279. rpc: HTTP2ServerTransport.Config.RPC
  280. ) {
  281. self.compression = compression
  282. self.connection = connection
  283. self.http2 = http2
  284. self.rpc = rpc
  285. }
  286. /// Default values for the different configurations.
  287. public static var defaults: Self {
  288. Self(
  289. compression: .defaults,
  290. connection: .defaults,
  291. http2: .defaults,
  292. rpc: .defaults
  293. )
  294. }
  295. }
  296. }
  297. extension NIOCore.SocketAddress {
  298. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  299. if let ipv4 = socketAddress.ipv4 {
  300. self = try Self(ipv4)
  301. } else if let ipv6 = socketAddress.ipv6 {
  302. self = try Self(ipv6)
  303. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  304. self = try Self(unixDomainSocket)
  305. } else {
  306. throw RPCError(
  307. code: .internalError,
  308. message:
  309. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  310. )
  311. }
  312. }
  313. }
  314. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  315. extension NIOTSListenerBootstrap {
  316. fileprivate func bind<Output: Sendable>(
  317. to address: GRPCHTTP2Core.SocketAddress,
  318. childChannelInitializer: @escaping @Sendable (any Channel) -> EventLoopFuture<Output>
  319. ) async throws -> NIOAsyncChannel<Output, Never> {
  320. if address.virtualSocket != nil {
  321. throw RuntimeError(
  322. code: .transportError,
  323. message: """
  324. Virtual sockets are not supported by 'HTTP2ServerTransport.TransportServices'. \
  325. Please use the 'HTTP2ServerTransport.Posix' transport.
  326. """
  327. )
  328. } else {
  329. return try await self.bind(
  330. to: NIOCore.SocketAddress(address),
  331. childChannelInitializer: childChannelInitializer
  332. )
  333. }
  334. }
  335. }
  336. #endif