HTTP2ServerTransport+TransportServices.swift 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if canImport(Network)
  17. import GRPCCore
  18. @_spi(Package) import GRPCHTTP2Core
  19. import NIOCore
  20. import NIOExtras
  21. import NIOTransportServices
  22. extension HTTP2ServerTransport {
  23. /// A NIO Transport Services-backed implementation of a server transport.
  24. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  25. public struct TransportServices: ServerTransport {
  26. private let address: GRPCHTTP2Core.SocketAddress
  27. private let config: Config
  28. private let eventLoopGroup: NIOTSEventLoopGroup
  29. private let serverQuiescingHelper: ServerQuiescingHelper
  30. /// Create a new `TransportServices` transport.
  31. ///
  32. /// - Parameters:
  33. /// - address: The address to which the server should be bound.
  34. /// - config: The transport configuration.
  35. /// - eventLoopGroup: The ELG from which to get ELs to run this transport.
  36. public init(
  37. address: GRPCHTTP2Core.SocketAddress,
  38. config: Config = .defaults,
  39. eventLoopGroup: NIOTSEventLoopGroup = .singletonNIOTSEventLoopGroup
  40. ) {
  41. self.address = address
  42. self.config = config
  43. self.eventLoopGroup = eventLoopGroup
  44. self.serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)
  45. }
  46. public func listen(
  47. _ streamHandler: @escaping (RPCStream<Inbound, Outbound>) async -> Void
  48. ) async throws {
  49. let serverChannel = try await NIOTSListenerBootstrap(group: self.eventLoopGroup)
  50. .serverChannelInitializer { channel in
  51. let quiescingHandler = self.serverQuiescingHelper.makeServerChannelHandler(
  52. channel: channel
  53. )
  54. return channel.pipeline.addHandler(quiescingHandler)
  55. }
  56. .bind(to: self.address) { channel in
  57. channel.eventLoop.makeCompletedFuture {
  58. return try channel.pipeline.syncOperations.configureGRPCServerPipeline(
  59. channel: channel,
  60. compressionConfig: self.config.compression,
  61. connectionConfig: self.config.connection,
  62. http2Config: self.config.http2,
  63. rpcConfig: self.config.rpc,
  64. useTLS: false
  65. )
  66. }
  67. }
  68. try await serverChannel.executeThenClose { inbound in
  69. try await withThrowingDiscardingTaskGroup { serverTaskGroup in
  70. for try await (connectionChannel, streamMultiplexer) in inbound {
  71. serverTaskGroup.addTask {
  72. try await connectionChannel
  73. .executeThenClose { connectionInbound, connectionOutbound in
  74. await withDiscardingTaskGroup { connectionTaskGroup in
  75. connectionTaskGroup.addTask {
  76. do {
  77. for try await _ in connectionInbound {}
  78. } catch {
  79. // We don't want to close the channel if one connection throws.
  80. return
  81. }
  82. }
  83. connectionTaskGroup.addTask {
  84. await withDiscardingTaskGroup { streamTaskGroup in
  85. do {
  86. for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
  87. {
  88. streamTaskGroup.addTask {
  89. // It's okay to ignore these errors:
  90. // - If we get an error because the http2Stream failed to close, then there's nothing we can do
  91. // - If we get an error because the inner closure threw, then the only possible scenario in which
  92. // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
  93. // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
  94. try? await http2Stream.executeThenClose { inbound, outbound in
  95. guard let descriptor = try? await methodDescriptor.get() else {
  96. return
  97. }
  98. let rpcStream = RPCStream(
  99. descriptor: descriptor,
  100. inbound: RPCAsyncSequence(wrapping: inbound),
  101. outbound: RPCWriter.Closable(
  102. wrapping: ServerConnection.Stream.Outbound(
  103. responseWriter: outbound,
  104. http2Stream: http2Stream
  105. )
  106. )
  107. )
  108. await streamHandler(rpcStream)
  109. }
  110. }
  111. }
  112. } catch {
  113. // We don't want to close the whole connection if one stream throws.
  114. return
  115. }
  116. }
  117. }
  118. }
  119. }
  120. }
  121. }
  122. }
  123. }
  124. }
  125. public func stopListening() {
  126. self.serverQuiescingHelper.initiateShutdown(promise: nil)
  127. }
  128. }
  129. }
  130. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  131. extension HTTP2ServerTransport.TransportServices {
  132. /// Configuration for the ``GRPCHTTP2TransportNIOTransportServices/GRPCHTTP2Core/HTTP2ServerTransport/TransportServices``.
  133. public struct Config: Sendable {
  134. /// Compression configuration.
  135. public var compression: HTTP2ServerTransport.Config.Compression
  136. /// Connection configuration.
  137. public var connection: HTTP2ServerTransport.Config.Connection
  138. /// HTTP2 configuration.
  139. public var http2: HTTP2ServerTransport.Config.HTTP2
  140. /// RPC configuration.
  141. public var rpc: HTTP2ServerTransport.Config.RPC
  142. /// Construct a new `Config`.
  143. /// - Parameters:
  144. /// - compression: Compression configuration.
  145. /// - connection: Connection configuration.
  146. /// - http2: HTTP2 configuration.
  147. /// - rpc: RPC configuration.
  148. public init(
  149. compression: HTTP2ServerTransport.Config.Compression,
  150. connection: HTTP2ServerTransport.Config.Connection,
  151. http2: HTTP2ServerTransport.Config.HTTP2,
  152. rpc: HTTP2ServerTransport.Config.RPC
  153. ) {
  154. self.compression = compression
  155. self.connection = connection
  156. self.http2 = http2
  157. self.rpc = rpc
  158. }
  159. /// Default values for the different configurations.
  160. public static var defaults: Self {
  161. Self(
  162. compression: .defaults,
  163. connection: .defaults,
  164. http2: .defaults,
  165. rpc: .defaults
  166. )
  167. }
  168. }
  169. }
  170. extension NIOCore.SocketAddress {
  171. fileprivate init(_ socketAddress: GRPCHTTP2Core.SocketAddress) throws {
  172. if let ipv4 = socketAddress.ipv4 {
  173. self = try Self(ipv4)
  174. } else if let ipv6 = socketAddress.ipv6 {
  175. self = try Self(ipv6)
  176. } else if let unixDomainSocket = socketAddress.unixDomainSocket {
  177. self = try Self(unixDomainSocket)
  178. } else {
  179. throw RPCError(
  180. code: .internalError,
  181. message:
  182. "Unsupported mapping to NIOCore/SocketAddress for GRPCHTTP2Core/SocketAddress: \(socketAddress)."
  183. )
  184. }
  185. }
  186. }
  187. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  188. extension NIOTSListenerBootstrap {
  189. fileprivate func bind<Output: Sendable>(
  190. to address: GRPCHTTP2Core.SocketAddress,
  191. childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
  192. ) async throws -> NIOAsyncChannel<Output, Never> {
  193. if let virtualSocket = address.virtualSocket {
  194. throw RuntimeError(
  195. code: .transportError,
  196. message: """
  197. Virtual sockets are not supported by 'HTTP2ServerTransport.TransportServices'. \
  198. Please use the 'HTTP2ServerTransport.Posix' transport.
  199. """
  200. )
  201. } else {
  202. return try await self.bind(
  203. to: NIOCore.SocketAddress(address),
  204. childChannelInitializer: childChannelInitializer
  205. )
  206. }
  207. }
  208. }
  209. #endif