HTTP2ServerTransport+TransportServices.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(Network)
  17. import GRPCCore
  18. @_spi(Package) import GRPCHTTP2Core
  19. import NIOCore
  20. import NIOExtras
  21. import NIOTransportServices
  22. extension HTTP2ServerTransport {
  23. /// A NIO Transport Services-backed implementation of a server transport.
  24. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  25. public struct TransportServices: ServerTransport {
  26. private let address: GRPCHTTP2Core.SocketAddress
  27. private let config: Config
  28. private let eventLoopGroup: NIOTSEventLoopGroup
  29. private let serverQuiescingHelper: ServerQuiescingHelper
  30. private enum State {
  31. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  32. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  33. case closedOrInvalidAddress(RuntimeError)
  34. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  35. get throws {
  36. switch self {
  37. case .idle(let eventLoopPromise):
  38. return eventLoopPromise.futureResult
  39. case .listening(let eventLoopFuture):
  40. return eventLoopFuture
  41. case .closedOrInvalidAddress(let runtimeError):
  42. throw runtimeError
  43. }
  44. }
  45. }
  46. enum OnBound {
  47. case succeedPromise(
  48. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  49. address: GRPCHTTP2Core.SocketAddress
  50. )
  51. case failPromise(
  52. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  53. error: RuntimeError
  54. )
  55. }
  56. mutating func addressBound(_ address: NIOCore.SocketAddress?) -> OnBound {
  57. switch self {
  58. case .idle(let listeningAddressPromise):
  59. if let address {
  60. self = .listening(listeningAddressPromise.futureResult)
  61. return .succeedPromise(
  62. listeningAddressPromise,
  63. address: GRPCHTTP2Core.SocketAddress(address)
  64. )
  65. } else {
  66. assertionFailure("Unknown address type")
  67. let invalidAddressError = RuntimeError(
  68. code: .transportError,
  69. message: "Unknown address type returned by transport."
  70. )
  71. self = .closedOrInvalidAddress(invalidAddressError)
  72. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  73. }
  74. case .listening, .closedOrInvalidAddress:
  75. fatalError(
  76. "Invalid state: addressBound should only be called once and when in idle state"
  77. )
  78. }
  79. }
  80. enum OnClose {
  81. case failPromise(
  82. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  83. error: RuntimeError
  84. )
  85. case doNothing
  86. }
  87. mutating func close() -> OnClose {
  88. let serverStoppedError = RuntimeError(
  89. code: .serverIsStopped,
  90. message: """
  91. There is no listening address bound for this server: there may have been \
  92. an error which caused the transport to close, or it may have shut down.
  93. """
  94. )
  95. switch self {
  96. case .idle(let listeningAddressPromise):
  97. self = .closedOrInvalidAddress(serverStoppedError)
  98. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  99. case .listening:
  100. self = .closedOrInvalidAddress(serverStoppedError)
  101. return .doNothing
  102. case .closedOrInvalidAddress:
  103. return .doNothing
  104. }
  105. }
  106. }
  107. private let listeningAddressState: _LockedValueBox<State>
  108. /// The listening address for this server transport.
  109. ///
  110. /// It is an `async` property because it will only return once the address has been successfully bound.
  111. ///
  112. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  113. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  114. /// invalid address.
  115. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  116. get async throws {
  117. try await self.listeningAddressState
  118. .withLockedValue { try $0.listeningAddressFuture }
  119. .get()
  120. }
  121. }
  122. /// Create a new `TransportServices` transport.
  123. ///
  124. /// - Parameters:
  125. /// - address: The address to which the server should be bound.
  126. /// - config: The transport configuration.
  127. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  128. public init(
  129. address: GRPCHTTP2Core.SocketAddress,
  130. config: Config = .defaults,
  131. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  132. ) {
  133. self.address = address
  134. self.config = config
  135. self.eventLoopGroup = eventLoopGroup
  136. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  137. let eventLoop = eventLoopGroup.any()
  138. self.listeningAddressState = _LockedValueBox(.idle(eventLoop.makePromise()))
  139. }
  140. public func listen(
  141. _ streamHandler: @escaping (RPCStream<Inbound, Outbound>) async -> Void
  142. ) async throws {
  143. defer {
  144. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  145. case .failPromise(let promise, let error):
  146. promise.fail(error)
  147. case .doNothing:
  148. ()
  149. }
  150. }
  151. let serverChannel = try await NIOTSListenerBootstrap(group: self.eventLoopGroup)
  152. .serverChannelInitializer { channel in
  153. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  154. channel: channel
  155. )
  156. return channel.pipeline.addHandler(quiescingHandler)
  157. }
  158. .bind(to: self.address) { channel in
  159. channel.eventLoop.makeCompletedFuture {
  160. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  161. channel: channel,
  162. compressionConfig: self.config.compression,
  163. connectionConfig: self.config.connection,
  164. http2Config: self.config.http2,
  165. rpcConfig: self.config.rpc,
  166. useTLS: false
  167. )
  168. }
  169. }
  170. let action = self.listeningAddressState.withLockedValue {
  171. $0.addressBound(serverChannel.channel.localAddress)
  172. }
  173. switch action {
  174. case .succeedPromise(let promise, let address):
  175. promise.succeed(address)
  176. case .failPromise(let promise, let error):
  177. promise.fail(error)
  178. }
  179. try await serverChannel.executeThenClose { inbound in
  180. try await withThrowingDiscardingTaskGroup { serverTaskGroup in
  181. for try await (connectionChannel, streamMultiplexer) in inbound {
  182. serverTaskGroup.addTask {
  183. try await connectionChannel
  184. .executeThenClose { connectionInbound, connectionOutbound in
  185. await withDiscardingTaskGroup { connectionTaskGroup in
  186. connectionTaskGroup.addTask {
  187. do {
  188. for try await _ in connectionInbound {}
  189. } catch {
  190. // We don't want to close the channel if one connection throws.
  191. return
  192. }
  193. }
  194. connectionTaskGroup.addTask {
  195. await withDiscardingTaskGroup { streamTaskGroup in
  196. do {
  197. for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
  198. {
  199. streamTaskGroup.addTask {
  200. // It's okay to ignore these errors:
  201. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  202. // - If we get an error because the inner closure threw, then the only possible scenario in which
  203. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  204. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  205. try? await http2Stream.executeThenClose { inbound, outbound in
  206. guard let descriptor = try? await methodDescriptor.get() else {
  207. return
  208. }
  209. let rpcStream = RPCStream(
  210. descriptor: descriptor,
  211. inbound: RPCAsyncSequence(wrapping: inbound),
  212. outbound: RPCWriter.Closable(
  213. wrapping: ServerConnection.Stream.Outbound(
  214. responseWriter: outbound,
  215. http2Stream: http2Stream
  216. )
  217. )
  218. )
  219. await streamHandler(rpcStream)
  220. }
  221. }
  222. }
  223. } catch {
  224. // We don't want to close the whole connection if one stream throws.
  225. return
  226. }
  227. }
  228. }
  229. }
  230. }
  231. }
  232. }
  233. }
  234. }
  235. }
  236. public func stopListening() {
  237. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  238. }
  239. }
  240. }
  241. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  242. extension HTTP2ServerTransport.TransportServices {
  243. /// Configuration for the ``GRPCHTTP2TransportNIOTransportServices/GRPCHTTP2Core/HTTP2ServerTransport/TransportServices``.
  244. public struct Config: Sendable {
  245. /// Compression configuration.
  246. public var compression: HTTP2ServerTransport.Config.Compression
  247. /// Connection configuration.
  248. public var connection: HTTP2ServerTransport.Config.Connection
  249. /// HTTP2 configuration.
  250. public var http2: HTTP2ServerTransport.Config.HTTP2
  251. /// RPC configuration.
  252. public var rpc: HTTP2ServerTransport.Config.RPC
  253. /// Construct a new `Config`.
  254. /// - Parameters:
  255. /// - compression: Compression configuration.
  256. /// - connection: Connection configuration.
  257. /// - http2: HTTP2 configuration.
  258. /// - rpc: RPC configuration.
  259. public init(
  260. compression: HTTP2ServerTransport.Config.Compression,
  261. connection: HTTP2ServerTransport.Config.Connection,
  262. http2: HTTP2ServerTransport.Config.HTTP2,
  263. rpc: HTTP2ServerTransport.Config.RPC
  264. ) {
  265. self.compression = compression
  266. self.connection = connection
  267. self.http2 = http2
  268. self.rpc = rpc
  269. }
  270. /// Default values for the different configurations.
  271. public static var defaults: Self {
  272. Self(
  273. compression: .defaults,
  274. connection: .defaults,
  275. http2: .defaults,
  276. rpc: .defaults
  277. )
  278. }
  279. }
  280. }
  281. extension NIOCore.SocketAddress {
  282. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  283. if let ipv4 = socketAddress.ipv4 {
  284. self = try Self(ipv4)
  285. } else if let ipv6 = socketAddress.ipv6 {
  286. self = try Self(ipv6)
  287. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  288. self = try Self(unixDomainSocket)
  289. } else {
  290. throw RPCError(
  291. code: .internalError,
  292. message:
  293. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  294. )
  295. }
  296. }
  297. }
  298. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  299. extension NIOTSListenerBootstrap {
  300. fileprivate func bind<Output: Sendable>(
  301. to address: GRPCHTTP2Core.SocketAddress,
  302. childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
  303. ) async throws -> NIOAsyncChannel<Output, Never> {
  304. if address.virtualSocket != nil {
  305. throw RuntimeError(
  306. code: .transportError,
  307. message: """
  308. Virtual sockets are not supported by 'HTTP2ServerTransport.TransportServices'. \
  309. Please use the 'HTTP2ServerTransport.Posix' transport.
  310. """
  311. )
  312. } else {
  313. return try await self.bind(
  314. to: NIOCore.SocketAddress(address),
  315. childChannelInitializer: childChannelInitializer
  316. )
  317. }
  318. }
  319. }
  320. #endif