| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- /*
- * Copyright 2024, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package import GRPCCore
- internal import NIOConcurrencyHelpers
- package import NIOCore
- package import NIOHTTP2
- /// A `Connection` provides communication to a single remote peer.
- ///
- /// Each `Connection` object is 'one-shot': it may only be used for a single connection over
- /// its lifetime. If a connect attempt fails then the `Connection` must be discarded and a new one
- /// must be created. However, an active connection may be used multiple times to provide streams
- /// to the backend.
- ///
- /// To use the `Connection` you must run it in a task. You can consume event updates by listening
- /// to `events`:
- ///
- /// ```swift
- /// await withTaskGroup(of: Void.self) { group in
- /// group.addTask { await connection.run() }
- ///
- /// for await event in connection.events {
- /// switch event {
- /// case .connectSucceeded:
- /// // ...
- /// default:
- /// // ...
- /// }
- /// }
- /// }
- /// ```
- @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
- package struct Connection: Sendable {
- /// Events which can happen over the lifetime of the connection.
- package enum Event: Sendable {
- /// The connect attempt succeeded and the connection is ready to use.
- case connectSucceeded
- /// The connect attempt failed.
- case connectFailed(any Error)
- /// The connection received a GOAWAY and will close soon. No new streams
- /// should be opened on this connection.
- case goingAway(HTTP2ErrorCode, String)
- /// The connection is closed.
- case closed(Connection.CloseReason)
- }
- /// The reason the connection closed.
- package enum CloseReason: Sendable {
- /// Closed because an idle timeout fired.
- case idleTimeout
- /// Closed because a keepalive timer fired.
- case keepaliveTimeout
- /// Closed because the caller initiated shutdown and all RPCs on the connection finished.
- case initiatedLocally
- /// Closed because the remote peer initiate shutdown (i.e. sent a GOAWAY frame).
- case remote
- /// Closed because the connection encountered an unexpected error.
- case error(any Error, wasIdle: Bool)
- }
- /// Inputs to the 'run' method.
- private enum Input: Sendable {
- case close
- }
- /// Events which have happened to the connection.
- private let event: (stream: AsyncStream<Event>, continuation: AsyncStream<Event>.Continuation)
- /// Events which the connection must react to.
- private let input: (stream: AsyncStream<Input>, continuation: AsyncStream<Input>.Continuation)
- /// The address to connect to.
- private let address: SocketAddress
- /// The default compression algorithm used for requests.
- private let defaultCompression: CompressionAlgorithm
- /// The set of enabled compression algorithms.
- private let enabledCompression: CompressionAlgorithmSet
- /// A connector used to establish a connection.
- private let http2Connector: any HTTP2Connector
- /// The state of the connection.
- private let state: NIOLockedValueBox<State>
- /// The default max request message size in bytes, 4 MiB.
- private static var defaultMaxRequestMessageSizeBytes: Int {
- 4 * 1024 * 1024
- }
- /// A stream of events which can happen to the connection.
- package var events: AsyncStream<Event> {
- self.event.stream
- }
- package init(
- address: SocketAddress,
- http2Connector: any HTTP2Connector,
- defaultCompression: CompressionAlgorithm,
- enabledCompression: CompressionAlgorithmSet
- ) {
- self.address = address
- self.defaultCompression = defaultCompression
- self.enabledCompression = enabledCompression
- self.http2Connector = http2Connector
- self.event = AsyncStream.makeStream(of: Event.self)
- self.input = AsyncStream.makeStream(of: Input.self)
- self.state = NIOLockedValueBox(.notConnected)
- }
- /// Connect and run the connection.
- ///
- /// This function returns when the connection has closed. You can observe connection events
- /// by consuming the ``events`` sequence.
- package func run() async {
- let connectResult = await Result {
- try await self.http2Connector.establishConnection(to: self.address)
- }
- switch connectResult {
- case .success(let connected):
- // Connected successfully, update state and report the event.
- self.state.withLockedValue { state in
- state.connected(connected)
- }
- await withDiscardingTaskGroup { group in
- // Add a task to run the connection and consume events.
- group.addTask {
- try? await connected.channel.executeThenClose { inbound, outbound in
- await self.consumeConnectionEvents(inbound)
- }
- }
- // Meanwhile, consume input events. This sequence will end when the connection has closed.
- for await input in self.input.stream {
- switch input {
- case .close:
- let asyncChannel = self.state.withLockedValue { $0.beginClosing() }
- if let channel = asyncChannel?.channel {
- let event = ClientConnectionHandler.OutboundEvent.closeGracefully
- channel.triggerUserOutboundEvent(event, promise: nil)
- }
- }
- }
- }
- case .failure(let error):
- // Connect failed, this connection is no longer useful.
- self.state.withLockedValue { $0.closed() }
- self.finishStreams(withEvent: .connectFailed(error))
- }
- }
- /// Gracefully close the connection.
- package func close() {
- self.input.continuation.yield(.close)
- }
- /// Make a stream using the connection if it's connected.
- ///
- /// - Parameter descriptor: A descriptor of the method to create a stream for.
- /// - Returns: The open stream.
- package func makeStream(
- descriptor: MethodDescriptor,
- options: CallOptions
- ) async throws -> Stream {
- let (multiplexer, scheme) = try self.state.withLockedValue { state in
- switch state {
- case .connected(let connected):
- return (connected.multiplexer, connected.scheme)
- case .notConnected, .closing, .closed:
- throw RPCError(code: .unavailable, message: "subchannel isn't ready")
- }
- }
- let compression: CompressionAlgorithm
- if let override = options.compression {
- compression = self.enabledCompression.contains(override) ? override : .none
- } else {
- compression = self.defaultCompression
- }
- let maxRequestSize = options.maxRequestMessageBytes ?? Self.defaultMaxRequestMessageSizeBytes
- do {
- let stream = try await multiplexer.openStream { channel in
- channel.eventLoop.makeCompletedFuture {
- let streamHandler = GRPCClientStreamHandler(
- methodDescriptor: descriptor,
- scheme: scheme,
- outboundEncoding: compression,
- acceptedEncodings: self.enabledCompression,
- maximumPayloadSize: maxRequestSize
- )
- try channel.pipeline.syncOperations.addHandler(streamHandler)
- return try NIOAsyncChannel(
- wrappingChannelSynchronously: channel,
- configuration: NIOAsyncChannel.Configuration(
- isOutboundHalfClosureEnabled: true,
- inboundType: RPCResponsePart.self,
- outboundType: RPCRequestPart.self
- )
- )
- }
- }
- return Stream(wrapping: stream, descriptor: descriptor)
- } catch {
- throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error)
- }
- }
- private func consumeConnectionEvents(
- _ connectionEvents: NIOAsyncChannelInboundStream<ClientConnectionEvent>
- ) async {
- // The connection becomes 'ready' when the initial HTTP/2 SETTINGS frame is received.
- // Establishing a TCP connection is insufficient as the TLS handshake may not complete or the
- // server might not be configured for gRPC or HTTP/2.
- //
- // This state is tracked here so that if the connection events sequence finishes and the
- // connection never became ready then the connection can report that the connect failed.
- var isReady = false
- func makeNeverReadyError(cause: (any Error)?) -> RPCError {
- return RPCError(
- code: .unavailable,
- message: """
- The server accepted the TCP connection but closed the connection before completing \
- the HTTP/2 connection preface.
- """,
- cause: cause
- )
- }
- do {
- var channelCloseReason: ClientConnectionEvent.CloseReason?
- for try await connectionEvent in connectionEvents {
- switch connectionEvent {
- case .ready:
- isReady = true
- self.event.continuation.yield(.connectSucceeded)
- case .closing(let reason):
- self.state.withLockedValue { $0.closing() }
- switch reason {
- case .goAway(let errorCode, let reason):
- // The connection will close at some point soon, yield a notification for this
- // because the close might not be imminent and this could result in address resolution.
- self.event.continuation.yield(.goingAway(errorCode, reason))
- case .idle, .keepaliveExpired, .initiatedLocally, .unexpected:
- // The connection will be closed imminently in these cases there's no need to do
- // anything.
- ()
- }
- // Take the reason with the highest precedence. A GOAWAY may be superseded by user
- // closing, for example.
- if channelCloseReason.map({ reason.precedence > $0.precedence }) ?? true {
- channelCloseReason = reason
- }
- }
- }
- let finalEvent: Event
- if isReady {
- let connectionCloseReason: Self.CloseReason
- switch channelCloseReason {
- case .keepaliveExpired:
- connectionCloseReason = .keepaliveTimeout
- case .idle:
- // Connection became idle, that's fine.
- connectionCloseReason = .idleTimeout
- case .goAway:
- // Remote peer told us to GOAWAY.
- connectionCloseReason = .remote
- case .initiatedLocally:
- // Shutdown was initiated locally.
- connectionCloseReason = .initiatedLocally
- case .unexpected(let error, let isIdle):
- let error = RPCError(
- code: .unavailable,
- message: "The TCP connection was dropped unexpectedly.",
- cause: error
- )
- connectionCloseReason = .error(error, wasIdle: isIdle)
- case .none:
- let error = RPCError(
- code: .unavailable,
- message: "The TCP connection was dropped unexpectedly.",
- cause: nil
- )
- connectionCloseReason = .error(error, wasIdle: true)
- }
- finalEvent = .closed(connectionCloseReason)
- } else {
- // The connection never became ready, this therefore counts as a failed connect attempt.
- finalEvent = .connectFailed(makeNeverReadyError(cause: nil))
- }
- // The connection events sequence has finished: the connection is now closed.
- self.state.withLockedValue { $0.closed() }
- self.finishStreams(withEvent: finalEvent)
- } catch {
- let finalEvent: Event
- if isReady {
- // Any error must come from consuming the inbound channel meaning that the connection
- // must be borked, wrap it up and close.
- let rpcError = RPCError(code: .unavailable, message: "connection closed", cause: error)
- finalEvent = .closed(.error(rpcError, wasIdle: true))
- } else {
- // The connection never became ready, this therefore counts as a failed connect attempt.
- finalEvent = .connectFailed(makeNeverReadyError(cause: error))
- }
- self.state.withLockedValue { $0.closed() }
- self.finishStreams(withEvent: finalEvent)
- }
- }
- private func finishStreams(withEvent event: Event) {
- self.event.continuation.yield(event)
- self.event.continuation.finish()
- self.input.continuation.finish()
- }
- }
- @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
- extension Connection {
- package struct Stream {
- package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
- package struct Outbound: ClosableRPCWriterProtocol {
- package typealias Element = RPCRequestPart
- private let requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>
- private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
- fileprivate init(
- requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>,
- http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
- ) {
- self.requestWriter = requestWriter
- self.http2Stream = http2Stream
- }
- package func write(_ element: RPCRequestPart) async throws {
- try await self.requestWriter.write(element)
- }
- package func write(contentsOf elements: some Sequence<Self.Element>) async throws {
- try await self.requestWriter.write(contentsOf: elements)
- }
- package func finish() {
- self.requestWriter.finish()
- }
- package func finish(throwing error: any Error) {
- // Fire the error inbound; this fails the inbound writer.
- self.http2Stream.channel.pipeline.fireErrorCaught(error)
- }
- }
- let descriptor: MethodDescriptor
- private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
- init(
- wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
- descriptor: MethodDescriptor
- ) {
- self.http2Stream = stream
- self.descriptor = descriptor
- }
- package func execute<T>(
- _ closure: (_ inbound: Inbound, _ outbound: Outbound) async throws -> T
- ) async throws -> T where T: Sendable {
- try await self.http2Stream.executeThenClose { inbound, outbound in
- return try await closure(
- inbound,
- Outbound(requestWriter: outbound, http2Stream: self.http2Stream)
- )
- }
- }
- }
- }
- @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
- extension Connection {
- private enum State: Sendable {
- /// The connection is idle or connecting.
- case notConnected
- /// A TCP connection has been established with the remote peer. However, the connection may not
- /// be ready to use yet.
- case connected(Connected)
- /// The connection has started to close. This may be initiated locally or by the remote.
- case closing
- /// The connection has closed. This is a terminal state.
- case closed
- struct Connected: Sendable {
- /// The connection channel.
- var channel: NIOAsyncChannel<ClientConnectionEvent, Void>
- /// Multiplexer for creating HTTP/2 streams.
- var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
- /// Whether the connection is plaintext, `false` implies TLS is being used.
- var scheme: Scheme
- init(_ connection: HTTP2Connection) {
- self.channel = connection.channel
- self.multiplexer = connection.multiplexer
- self.scheme = connection.isPlaintext ? .http : .https
- }
- }
- mutating func connected(_ channel: HTTP2Connection) {
- switch self {
- case .notConnected:
- self = .connected(State.Connected(channel))
- case .connected, .closing, .closed:
- fatalError("Invalid state: 'run()' must only be called once")
- }
- }
- mutating func beginClosing() -> NIOAsyncChannel<ClientConnectionEvent, Void>? {
- switch self {
- case .notConnected:
- fatalError("Invalid state: 'run()' must be called first")
- case .connected(let connected):
- self = .closing
- return connected.channel
- case .closing, .closed:
- return nil
- }
- }
- mutating func closing() {
- switch self {
- case .notConnected:
- // Not reachable: happens as a result of a connection event, that can only happen if
- // the connection has started (i.e. must be in the 'connected' state or later).
- fatalError("Invalid state")
- case .connected:
- self = .closing
- case .closing, .closed:
- ()
- }
- }
- mutating func closed() {
- self = .closed
- }
- }
- }
- extension ClientConnectionEvent.CloseReason {
- fileprivate var precedence: Int {
- switch self {
- case .unexpected:
- return -1
- case .goAway:
- return 0
- case .idle:
- return 1
- case .keepaliveExpired:
- return 2
- case .initiatedLocally:
- return 3
- }
- }
- }
|