Connection.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOConcurrencyHelpers
  18. import NIOCore
  19. import NIOHTTP2
  20. /// A `Connection` provides communication to a single remote peer.
  21. ///
  22. /// Each `Connection` object is 'one-shot': it may only be used for a single connection over
  23. /// its lifetime. If a connect attempt fails then the `Connection` must be discarded and a new one
  24. /// must be created. However, an active connection may be used multiple times to provide streams
  25. /// to the backend.
  26. ///
  27. /// To use the `Connection` you must run it in a task. You can consume event updates by listening
  28. /// to `events`:
  29. ///
  30. /// ```swift
  31. /// await withTaskGroup(of: Void.self) { group in
  32. /// group.addTask { await connection.run() }
  33. ///
  34. /// for await event in connection.events {
  35. /// switch event {
  36. /// case .connectSucceeded:
  37. /// // ...
  38. /// default:
  39. /// // ...
  40. /// }
  41. /// }
  42. /// }
  43. /// ```
  44. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  45. struct Connection: Sendable {
  46. /// Events which can happen over the lifetime of the connection.
  47. enum Event: Sendable {
  48. /// The connect attempt succeeded and the connection is ready to use.
  49. case connectSucceeded
  50. /// The connect attempt failed.
  51. case connectFailed(any Error)
  52. /// The connection received a GOAWAY and will close soon. No new streams
  53. /// should be opened on this connection.
  54. case goingAway(HTTP2ErrorCode, String)
  55. /// The connection is closed.
  56. case closed(Connection.CloseReason)
  57. }
  58. /// The reason the connection closed.
  59. enum CloseReason: Sendable {
  60. /// Closed because an idle timeout fired.
  61. case idleTimeout
  62. /// Closed because a keepalive timer fired.
  63. case keepaliveTimeout
  64. /// Closed because the caller initiated shutdown and all RPCs on the connection finished.
  65. case initiatedLocally
  66. /// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame).
  67. case remote
  68. /// Closed because the connection encountered an unexpected error.
  69. case error(Error)
  70. }
  71. /// Inputs to the 'run' method.
  72. private enum Input: Sendable {
  73. case close
  74. }
  75. /// Events which have happened to the connection.
  76. private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
  77. /// Events which the connection must react to.
  78. private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
  79. /// The address to connect to.
  80. private let address: SocketAddress
  81. /// The default compression algorithm used for requests.
  82. private let defaultCompression: CompressionAlgorithm
  83. /// The set of enabled compression algorithms.
  84. private let enabledCompression: CompressionAlgorithmSet
  85. /// A connector used to establish a connection.
  86. private let http2Connector: any HTTP2Connector
  87. /// The state of the connection.
  88. private let state: NIOLockedValueBox<State>
  89. /// The default max request message size in bytes, 4 MiB.
  90. private static var defaultMaxRequestMessageSizeBytes: Int {
  91. 4 * 1024 * 1024
  92. }
  93. /// A stream of events which can happen to the connection.
  94. var events: AsyncStream<Event> {
  95. self.event.stream
  96. }
  97. init(
  98. address: SocketAddress,
  99. http2Connector: any HTTP2Connector,
  100. defaultCompression: CompressionAlgorithm,
  101. enabledCompression: CompressionAlgorithmSet
  102. ) {
  103. self.address = address
  104. self.defaultCompression = defaultCompression
  105. self.enabledCompression = enabledCompression
  106. self.http2Connector = http2Connector
  107. self.event = AsyncStream.makeStream(of: Event.self)
  108. self.input = AsyncStream.makeStream(of: Input.self)
  109. self.state = NIOLockedValueBox(.notConnected)
  110. }
  111. /// Connect and run the connection.
  112. ///
  113. /// This function returns when the connection has closed. You can observe connection events
  114. /// by consuming the ``events`` sequence.
  115. func run() async {
  116. let connectResult = await Result {
  117. try await self.http2Connector.establishConnection(to: self.address)
  118. }
  119. switch connectResult {
  120. case .success(let connected):
  121. // Connected successfully, update state and report the event.
  122. self.state.withLockedValue { state in
  123. state.connected(connected)
  124. }
  125. self.event.continuation.yield(.connectSucceeded)
  126. await withDiscardingTaskGroup { group in
  127. // Add a task to run the connection and consume events.
  128. group.addTask {
  129. try? await connected.channel.executeThenClose { inbound, outbound in
  130. await self.consumeConnectionEvents(inbound)
  131. }
  132. }
  133. // Meanwhile, consume input events. This sequence will end when the connection has closed.
  134. for await input in self.input.stream {
  135. switch input {
  136. case .close:
  137. let asyncChannel = self.state.withLockedValue { $0.beginClosing() }
  138. if let channel = asyncChannel?.channel {
  139. let event = ClientConnectionHandler.OutboundEvent.closeGracefully
  140. channel.triggerUserOutboundEvent(event, promise: nil)
  141. }
  142. }
  143. }
  144. }
  145. case .failure(let error):
  146. // Connect failed, this connection is no longer useful.
  147. self.state.withLockedValue { $0.closed() }
  148. self.finishStreams(withEvent: .connectFailed(error))
  149. }
  150. }
  151. /// Gracefully close the connection.
  152. func close() {
  153. self.input.continuation.yield(.close)
  154. }
  155. /// Make a stream using the connection if it's connected.
  156. ///
  157. /// - Parameter descriptor: A descriptor of the method to create a stream for.
  158. /// - Returns: The open stream.
  159. func makeStream(descriptor: MethodDescriptor, options: CallOptions) async throws -> Stream {
  160. let (multiplexer, scheme) = try self.state.withLockedValue { state in
  161. switch state {
  162. case .connected(let connected):
  163. return (connected.multiplexer, connected.scheme)
  164. case .notConnected, .closing, .closed:
  165. throw RPCError(code: .unavailable, message: "subchannel isn't ready")
  166. }
  167. }
  168. let compression: CompressionAlgorithm
  169. if let override = options.compression {
  170. compression = self.enabledCompression.contains(override) ? override : .none
  171. } else {
  172. compression = self.defaultCompression
  173. }
  174. let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes
  175. do {
  176. let stream = try await multiplexer.openStream { channel in
  177. channel.eventLoop.makeCompletedFuture {
  178. let streamHandler = GRPCClientStreamHandler(
  179. methodDescriptor: descriptor,
  180. scheme: scheme,
  181. outboundEncoding: compression,
  182. acceptedEncodings: self.enabledCompression,
  183. maximumPayloadSize: maxRequestSize
  184. )
  185. try channel.pipeline.syncOperations.addHandler(streamHandler)
  186. return try NIOAsyncChannel(
  187. wrappingChannelSynchronously: channel,
  188. configuration: NIOAsyncChannel.Configuration(
  189. isOutboundHalfClosureEnabled: true,
  190. inboundType: RPCResponsePart.self,
  191. outboundType: RPCRequestPart.self
  192. )
  193. )
  194. }
  195. }
  196. return Stream(wrapping: stream, descriptor: descriptor)
  197. } catch {
  198. throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error)
  199. }
  200. }
  201. private func consumeConnectionEvents(
  202. _ connectionEvents: NIOAsyncChannelInboundStream<ClientConnectionEvent>
  203. ) async {
  204. do {
  205. var channelCloseReason: ClientConnectionEvent.CloseReason?
  206. for try await connectionEvent in connectionEvents {
  207. switch connectionEvent {
  208. case .closing(let reason):
  209. self.state.withLockedValue { $0.closing() }
  210. switch reason {
  211. case .goAway(let errorCode, let reason):
  212. // The connection will close at some point soon, yield a notification for this
  213. // because the close might not be imminent and this could result in address resolution.
  214. self.event.continuation.yield(.goingAway(errorCode, reason))
  215. case .idle, .keepaliveExpired, .initiatedLocally:
  216. // The connection will be closed imminently in these cases there's no need to do
  217. // anything.
  218. ()
  219. }
  220. // Take the reason with the highest precedence. A GOAWAY may be superseded by user
  221. // closing, for example.
  222. if channelCloseReason.map({ reason.precedence > $0.precedence }) ?? true {
  223. channelCloseReason = reason
  224. }
  225. }
  226. }
  227. let connectionCloseReason: Self.CloseReason
  228. switch channelCloseReason {
  229. case .keepaliveExpired:
  230. connectionCloseReason = .keepaliveTimeout
  231. case .idle:
  232. // Connection became idle, that's fine.
  233. connectionCloseReason = .idleTimeout
  234. case .goAway:
  235. // Remote peer told us to GOAWAY.
  236. connectionCloseReason = .remote
  237. case .initiatedLocally, .none:
  238. // Shutdown was initiated locally.
  239. connectionCloseReason = .initiatedLocally
  240. }
  241. // The connection events sequence has finished: the connection is now closed.
  242. self.state.withLockedValue { $0.closed() }
  243. self.finishStreams(withEvent: .closed(connectionCloseReason))
  244. } catch {
  245. // Any error must come from consuming the inbound channel meaning that the connection
  246. // must be borked, wrap it up and close.
  247. let rpcError = RPCError(code: .unavailable, message: "connection closed", cause: error)
  248. self.state.withLockedValue { $0.closed() }
  249. self.finishStreams(withEvent: .closed(.error(rpcError)))
  250. }
  251. }
  252. private func finishStreams(withEvent event: Event) {
  253. self.event.continuation.yield(event)
  254. self.event.continuation.finish()
  255. self.input.continuation.finish()
  256. }
  257. }
  258. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  259. extension Connection {
  260. struct Stream {
  261. typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
  262. struct Outbound: ClosableRPCWriterProtocol {
  263. typealias Element = RPCRequestPart
  264. private let requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>
  265. private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
  266. fileprivate init(
  267. requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>,
  268. http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
  269. ) {
  270. self.requestWriter = requestWriter
  271. self.http2Stream = http2Stream
  272. }
  273. func write(contentsOf elements: some Sequence<Self.Element>) async throws {
  274. try await self.requestWriter.write(contentsOf: elements)
  275. }
  276. func finish() {
  277. self.requestWriter.finish()
  278. }
  279. func finish(throwing error: any Error) {
  280. // Fire the error inbound; this fails the inbound writer.
  281. self.http2Stream.channel.pipeline.fireErrorCaught(error)
  282. }
  283. }
  284. let descriptor: MethodDescriptor
  285. private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
  286. init(
  287. wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
  288. descriptor: MethodDescriptor
  289. ) {
  290. self.http2Stream = stream
  291. self.descriptor = descriptor
  292. }
  293. func execute<T>(
  294. _ closure: (_ inbound: Inbound, _ outbound: Outbound) async throws -> T
  295. ) async throws -> T where T: Sendable {
  296. try await self.http2Stream.executeThenClose { inbound, outbound in
  297. return try await closure(
  298. inbound,
  299. Outbound(requestWriter: outbound, http2Stream: self.http2Stream)
  300. )
  301. }
  302. }
  303. }
  304. }
  305. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  306. extension Connection {
  307. private enum State {
  308. /// The connection is idle or connecting.
  309. case notConnected
  310. /// A connection has been established with the remote peer.
  311. case connected(Connected)
  312. /// The connection has started to close. This may be initiated locally or by the remote.
  313. case closing
  314. /// The connection has closed. This is a terminal state.
  315. case closed
  316. struct Connected {
  317. /// The connection channel.
  318. var channel: NIOAsyncChannel<ClientConnectionEvent, Void>
  319. /// Multiplexer for creating HTTP/2 streams.
  320. var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
  321. /// Whether the connection is plaintext, `false` implies TLS is being used.
  322. var scheme: Scheme
  323. init(_ connection: HTTP2Connection) {
  324. self.channel = connection.channel
  325. self.multiplexer = connection.multiplexer
  326. self.scheme = connection.isPlaintext ? .http : .https
  327. }
  328. }
  329. mutating func connected(_ channel: HTTP2Connection) {
  330. switch self {
  331. case .notConnected:
  332. self = .connected(State.Connected(channel))
  333. case .connected, .closing, .closed:
  334. fatalError("Invalid state: 'run()' must only be called once")
  335. }
  336. }
  337. mutating func beginClosing() -> NIOAsyncChannel<ClientConnectionEvent, Void>? {
  338. switch self {
  339. case .notConnected:
  340. fatalError("Invalid state: 'run()' must be called first")
  341. case .connected(let connected):
  342. self = .closing
  343. return connected.channel
  344. case .closing, .closed:
  345. return nil
  346. }
  347. }
  348. mutating func closing() {
  349. switch self {
  350. case .notConnected:
  351. // Not reachable: happens as a result of a connection event, that can only happen if
  352. // the connection has started (i.e. must be in the 'connected' state or later).
  353. fatalError("Invalid state")
  354. case .connected:
  355. self = .closing
  356. case .closing, .closed:
  357. ()
  358. }
  359. }
  360. mutating func closed() {
  361. self = .closed
  362. }
  363. }
  364. }
  365. extension ClientConnectionEvent.CloseReason {
  366. fileprivate var precedence: Int {
  367. switch self {
  368. case .goAway:
  369. return 0
  370. case .idle:
  371. return 1
  372. case .keepaliveExpired:
  373. return 2
  374. case .initiatedLocally:
  375. return 3
  376. }
  377. }
  378. }