HTTP2ServerTransport+TransportServices.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(Network)
  17. import GRPCCore
  18. @_spi(Package) import GRPCHTTP2Core
  19. import NIOCore
  20. import NIOExtras
  21. import NIOTransportServices
  22. extension HTTP2ServerTransport {
  23. /// A NIO Transport Services-backed implementation of a server transport.
  24. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  25. public struct TransportServices: ServerTransport {
  26. private let address: GRPCHTTP2Core.SocketAddress
  27. private let config: Config
  28. private let eventLoopGroup: NIOTSEventLoopGroup
  29. private let serverQuiescingHelper: ServerQuiescingHelper
  30. private enum State {
  31. case idle(EventLoopPromise<GRPCHTTP2Core.SocketAddress>)
  32. case listening(EventLoopFuture<GRPCHTTP2Core.SocketAddress>)
  33. case closedOrInvalidAddress(RuntimeError)
  34. var listeningAddressFuture: EventLoopFuture<GRPCHTTP2Core.SocketAddress> {
  35. get throws {
  36. switch self {
  37. case .idle(let eventLoopPromise):
  38. return eventLoopPromise.futureResult
  39. case .listening(let eventLoopFuture):
  40. return eventLoopFuture
  41. case .closedOrInvalidAddress(let runtimeError):
  42. throw runtimeError
  43. }
  44. }
  45. }
  46. enum OnBound {
  47. case succeedPromise(
  48. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  49. address: GRPCHTTP2Core.SocketAddress
  50. )
  51. case failPromise(
  52. _ promise: EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  53. error: RuntimeError
  54. )
  55. }
  56. mutating func addressBound(_ address: NIOCore.SocketAddress?) -> OnBound {
  57. switch self {
  58. case .idle(let listeningAddressPromise):
  59. if let address {
  60. self = .listening(listeningAddressPromise.futureResult)
  61. return .succeedPromise(
  62. listeningAddressPromise,
  63. address: GRPCHTTP2Core.SocketAddress(address)
  64. )
  65. } else {
  66. assertionFailure("Unknown address type")
  67. let invalidAddressError = RuntimeError(
  68. code: .transportError,
  69. message: "Unknown address type returned by transport."
  70. )
  71. self = .closedOrInvalidAddress(invalidAddressError)
  72. return .failPromise(listeningAddressPromise, error: invalidAddressError)
  73. }
  74. case .listening, .closedOrInvalidAddress:
  75. fatalError(
  76. "Invalid state: addressBound should only be called once and when in idle state"
  77. )
  78. }
  79. }
  80. enum OnClose {
  81. case failPromise(
  82. EventLoopPromise<GRPCHTTP2Core.SocketAddress>,
  83. error: RuntimeError
  84. )
  85. case doNothing
  86. }
  87. mutating func close() -> OnClose {
  88. let serverStoppedError = RuntimeError(
  89. code: .serverIsStopped,
  90. message: """
  91. There is no listening address bound for this server: there may have been \
  92. an error which caused the transport to close, or it may have shut down.
  93. """
  94. )
  95. switch self {
  96. case .idle(let listeningAddressPromise):
  97. self = .closedOrInvalidAddress(serverStoppedError)
  98. return .failPromise(listeningAddressPromise, error: serverStoppedError)
  99. case .listening:
  100. self = .closedOrInvalidAddress(serverStoppedError)
  101. return .doNothing
  102. case .closedOrInvalidAddress:
  103. return .doNothing
  104. }
  105. }
  106. }
  107. private let listeningAddressState: _LockedValueBox<State>
  108. /// The listening address for this server transport.
  109. ///
  110. /// It is an `async` property because it will only return once the address has been successfully bound.
  111. ///
  112. /// - Throws: A runtime error will be thrown if the address could not be bound or is not bound any
  113. /// longer, because the transport isn't listening anymore. It can also throw if the transport returned an
  114. /// invalid address.
  115. public var listeningAddress: GRPCHTTP2Core.SocketAddress {
  116. get async throws {
  117. try await self.listeningAddressState
  118. .withLockedValue { try $0.listeningAddressFuture }
  119. .get()
  120. }
  121. }
  122. /// Create a new `TransportServices` transport.
  123. ///
  124. /// - Parameters:
  125. /// - address: The address to which the server should be bound.
  126. /// - config: The transport configuration.
  127. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  128. public init(
  129. address: GRPCHTTP2Core.SocketAddress,
  130. config: Config = .defaults,
  131. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  132. ) {
  133. self.address = address
  134. self.config = config
  135. self.eventLoopGroup = eventLoopGroup
  136. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  137. let eventLoop = eventLoopGroup.any()
  138. self.listeningAddressState = _LockedValueBox(.idle(eventLoop.makePromise()))
  139. }
  140. public func listen(
  141. _ streamHandler: @escaping (RPCStream<Inbound, Outbound>) async -> Void
  142. ) async throws {
  143. defer {
  144. switch self.listeningAddressState.withLockedValue({ $0.close() }) {
  145. case .failPromise(let promise, let error):
  146. promise.fail(error)
  147. case .doNothing:
  148. ()
  149. }
  150. }
  151. let serverChannel = try await NIOTSListenerBootstrap(group: self.eventLoopGroup)
  152. .serverChannelOption(
  153. ChannelOptions.socketOption(.so_reuseaddr),
  154. value: 1
  155. )
  156. .serverChannelInitializer { channel in
  157. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  158. channel: channel
  159. )
  160. return channel.pipeline.addHandler(quiescingHandler)
  161. }
  162. .bind(to: self.address) { channel in
  163. channel.eventLoop.makeCompletedFuture {
  164. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  165. channel: channel,
  166. compressionConfig: self.config.compression,
  167. connectionConfig: self.config.connection,
  168. http2Config: self.config.http2,
  169. rpcConfig: self.config.rpc,
  170. useTLS: false
  171. )
  172. }
  173. }
  174. let action = self.listeningAddressState.withLockedValue {
  175. $0.addressBound(serverChannel.channel.localAddress)
  176. }
  177. switch action {
  178. case .succeedPromise(let promise, let address):
  179. promise.succeed(address)
  180. case .failPromise(let promise, let error):
  181. promise.fail(error)
  182. }
  183. try await serverChannel.executeThenClose { inbound in
  184. try await withThrowingDiscardingTaskGroup { serverTaskGroup in
  185. for try await (connectionChannel, streamMultiplexer) in inbound {
  186. serverTaskGroup.addTask {
  187. try await connectionChannel
  188. .executeThenClose { connectionInbound, connectionOutbound in
  189. await withDiscardingTaskGroup { connectionTaskGroup in
  190. connectionTaskGroup.addTask {
  191. do {
  192. for try await _ in connectionInbound {}
  193. } catch {
  194. // We don't want to close the channel if one connection throws.
  195. return
  196. }
  197. }
  198. connectionTaskGroup.addTask {
  199. await withDiscardingTaskGroup { streamTaskGroup in
  200. do {
  201. for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
  202. {
  203. streamTaskGroup.addTask {
  204. // It's okay to ignore these errors:
  205. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  206. // - If we get an error because the inner closure threw, then the only possible scenario in which
  207. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  208. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  209. try? await http2Stream.executeThenClose { inbound, outbound in
  210. guard let descriptor = try? await methodDescriptor.get() else {
  211. return
  212. }
  213. let rpcStream = RPCStream(
  214. descriptor: descriptor,
  215. inbound: RPCAsyncSequence(wrapping: inbound),
  216. outbound: RPCWriter.Closable(
  217. wrapping: ServerConnection.Stream.Outbound(
  218. responseWriter: outbound,
  219. http2Stream: http2Stream
  220. )
  221. )
  222. )
  223. await streamHandler(rpcStream)
  224. }
  225. }
  226. }
  227. } catch {
  228. // We don't want to close the whole connection if one stream throws.
  229. return
  230. }
  231. }
  232. }
  233. }
  234. }
  235. }
  236. }
  237. }
  238. }
  239. }
  240. public func stopListening() {
  241. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  242. }
  243. }
  244. }
  245. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  246. extension HTTP2ServerTransport.TransportServices {
  247. /// Configuration for the ``GRPCHTTP2TransportNIOTransportServices/GRPCHTTP2Core/HTTP2ServerTransport/TransportServices``.
  248. public struct Config: Sendable {
  249. /// Compression configuration.
  250. public var compression: HTTP2ServerTransport.Config.Compression
  251. /// Connection configuration.
  252. public var connection: HTTP2ServerTransport.Config.Connection
  253. /// HTTP2 configuration.
  254. public var http2: HTTP2ServerTransport.Config.HTTP2
  255. /// RPC configuration.
  256. public var rpc: HTTP2ServerTransport.Config.RPC
  257. /// Construct a new `Config`.
  258. /// - Parameters:
  259. /// - compression: Compression configuration.
  260. /// - connection: Connection configuration.
  261. /// - http2: HTTP2 configuration.
  262. /// - rpc: RPC configuration.
  263. public init(
  264. compression: HTTP2ServerTransport.Config.Compression,
  265. connection: HTTP2ServerTransport.Config.Connection,
  266. http2: HTTP2ServerTransport.Config.HTTP2,
  267. rpc: HTTP2ServerTransport.Config.RPC
  268. ) {
  269. self.compression = compression
  270. self.connection = connection
  271. self.http2 = http2
  272. self.rpc = rpc
  273. }
  274. /// Default values for the different configurations.
  275. public static var defaults: Self {
  276. Self(
  277. compression: .defaults,
  278. connection: .defaults,
  279. http2: .defaults,
  280. rpc: .defaults
  281. )
  282. }
  283. }
  284. }
  285. extension NIOCore.SocketAddress {
  286. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  287. if let ipv4 = socketAddress.ipv4 {
  288. self = try Self(ipv4)
  289. } else if let ipv6 = socketAddress.ipv6 {
  290. self = try Self(ipv6)
  291. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  292. self = try Self(unixDomainSocket)
  293. } else {
  294. throw RPCError(
  295. code: .internalError,
  296. message:
  297. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  298. )
  299. }
  300. }
  301. }
  302. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  303. extension NIOTSListenerBootstrap {
  304. fileprivate func bind<Output: Sendable>(
  305. to address: GRPCHTTP2Core.SocketAddress,
  306. childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
  307. ) async throws -> NIOAsyncChannel<Output, Never> {
  308. if address.virtualSocket != nil {
  309. throw RuntimeError(
  310. code: .transportError,
  311. message: """
  312. Virtual sockets are not supported by 'HTTP2ServerTransport.TransportServices'. \
  313. Please use the 'HTTP2ServerTransport.Posix' transport.
  314. """
  315. )
  316. } else {
  317. return try await self.bind(
  318. to: NIOCore.SocketAddress(address),
  319. childChannelInitializer: childChannelInitializer
  320. )
  321. }
  322. }
  323. }
  324. #endif