ClientConnection.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if os(Linux)
  17. @preconcurrency import Foundation
  18. #else
  19. import Foundation
  20. #endif
  21. import Logging
  22. import NIOCore
  23. import NIOHPACK
  24. import NIOHTTP2
  25. import NIOPosix
  26. #if canImport(NIOSSL)
  27. import NIOSSL
  28. #endif
  29. import NIOTLS
  30. import NIOTransportServices
  31. import SwiftProtobuf
  32. /// Provides a single, managed connection to a server which is guaranteed to always use the same
  33. /// `EventLoop`.
  34. ///
  35. /// The connection to the server is provided by a single channel which will attempt to reconnect to
  36. /// the server if the connection is dropped. When either the client or server detects that the
  37. /// connection has become idle -- that is, there are no outstanding RPCs and the idle timeout has
  38. /// passed (5 minutes, by default) -- the underlying channel will be closed. The client will not
  39. /// idle the connection if any RPC exists, even if there has been no activity on the RPC for the
  40. /// idle timeout. Long-lived, low activity RPCs may benefit from configuring keepalive (see
  41. /// ``ClientConnectionKeepalive``) which periodically pings the server to ensure that the connection
  42. /// is not dropped. If the connection is idle a new channel will be created on-demand when the next
  43. /// RPC is made.
  44. ///
  45. /// The state of the connection can be observed using a ``ConnectivityStateDelegate``.
  46. ///
  47. /// Since the connection is managed, and may potentially spend long periods of time waiting for a
  48. /// connection to come up (cellular connections, for example), different behaviors may be used when
  49. /// starting a call. The different behaviors are detailed in the ``CallStartBehavior`` documentation.
  50. ///
  51. /// ### Channel Pipeline
  52. ///
  53. /// The `NIO.ChannelPipeline` for the connection is configured as such:
  54. ///
  55. /// ┌──────────────────────────┐
  56. /// │ DelegatingErrorHandler │
  57. /// └──────────▲───────────────┘
  58. /// HTTP2Frame│
  59. /// │
  60. /// │
  61. /// │
  62. /// │
  63. /// │
  64. /// HTTP2Frame│ ⠇ ⠇ ⠇ ⠇ ⠇
  65. /// ┌─┴──────────────────▼─┐ ┌┴─▼┐ ┌┴─▼┐
  66. /// │ GRPCIdleHandler │ │ | │ | HTTP/2 streams
  67. /// └─▲──────────────────┬─┘ └▲─┬┘ └▲─┬┘
  68. /// HTTP2Frame│ │ │ │ │ │ HTTP2Frame
  69. /// ┌─┴──────────────────▼────────┴─▼───┴─▼┐
  70. /// │ NIOHTTP2Handler │
  71. /// └─▲──────────────────────────────────┬─┘
  72. /// ByteBuffer│ │ByteBuffer
  73. /// ┌─┴──────────────────────────────────▼─┐
  74. /// │ NIOSSLHandler │
  75. /// └─▲──────────────────────────────────┬─┘
  76. /// ByteBuffer│ │ByteBuffer
  77. /// │ ▼
  78. ///
  79. /// The 'GRPCIdleHandler' intercepts HTTP/2 frames and various events and is responsible for
  80. /// informing and controlling the state of the connection (idling and keepalive). The HTTP/2 streams
  81. /// are used to handle individual RPCs.
  82. public final class ClientConnection: Sendable {
  83. private let connectionManager: ConnectionManager
  84. /// HTTP multiplexer from the underlying channel handling gRPC calls.
  85. internal func getMultiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
  86. return self.connectionManager.getHTTP2Multiplexer()
  87. }
  88. /// The configuration for this client.
  89. internal let configuration: Configuration
  90. /// The scheme of the URI for each RPC, i.e. 'http' or 'https'.
  91. internal let scheme: String
  92. /// The authority of the URI for each RPC.
  93. internal let authority: String
  94. /// A monitor for the connectivity state.
  95. public let connectivity: ConnectivityStateMonitor
  96. /// The `EventLoop` this connection is using.
  97. public var eventLoop: EventLoop {
  98. return self.connectionManager.eventLoop
  99. }
  100. /// Creates a new connection from the given configuration. Prefer using
  101. /// ``ClientConnection/secure(group:)`` to build a connection secured with TLS or
  102. /// ``ClientConnection/insecure(group:)`` to build a plaintext connection.
  103. ///
  104. /// - Important: Users should prefer using ``ClientConnection/secure(group:)`` to build a connection
  105. /// with TLS, or ``ClientConnection/insecure(group:)`` to build a connection without TLS.
  106. public init(configuration: Configuration) {
  107. self.configuration = configuration
  108. self.scheme = configuration.tlsConfiguration == nil ? "http" : "https"
  109. self.authority = configuration.tlsConfiguration?.hostnameOverride ?? configuration.target.host
  110. let monitor = ConnectivityStateMonitor(
  111. delegate: configuration.connectivityStateDelegate,
  112. queue: configuration.connectivityStateDelegateQueue
  113. )
  114. self.connectivity = monitor
  115. self.connectionManager = ConnectionManager(
  116. configuration: configuration,
  117. connectivityDelegate: monitor,
  118. logger: configuration.backgroundActivityLogger
  119. )
  120. }
  121. /// Close the channel, and any connections associated with it. Any ongoing RPCs may fail.
  122. ///
  123. /// - Returns: Returns a future which will be resolved when shutdown has completed.
  124. public func close() -> EventLoopFuture<Void> {
  125. let promise = self.eventLoop.makePromise(of: Void.self)
  126. self.close(promise: promise)
  127. return promise.futureResult
  128. }
  129. /// Close the channel, and any connections associated with it. Any ongoing RPCs may fail.
  130. ///
  131. /// - Parameter promise: A promise which will be completed when shutdown has completed.
  132. public func close(promise: EventLoopPromise<Void>) {
  133. self.connectionManager.shutdown(mode: .forceful, promise: promise)
  134. }
  135. /// Attempt to gracefully shutdown the channel. New RPCs will be failed immediately and existing
  136. /// RPCs may continue to run until they complete.
  137. ///
  138. /// - Parameters:
  139. /// - deadline: A point in time by which the graceful shutdown must have completed. If the
  140. /// deadline passes and RPCs are still active then the connection will be closed forcefully
  141. /// and any remaining in-flight RPCs may be failed.
  142. /// - promise: A promise which will be completed when shutdown has completed.
  143. public func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) {
  144. return self.connectionManager.shutdown(mode: .graceful(deadline), promise: promise)
  145. }
  146. /// Populates the logger in `options` and appends a request ID header to the metadata, if
  147. /// configured.
  148. /// - Parameter options: The options containing the logger to populate.
  149. private func populateLogger(in options: inout CallOptions) {
  150. // Get connection metadata.
  151. self.connectionManager.appendMetadata(to: &options.logger)
  152. // Attach a request ID.
  153. let requestID = options.requestIDProvider.requestID()
  154. if let requestID = requestID {
  155. options.logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
  156. // Add the request ID header too.
  157. if let requestIDHeader = options.requestIDHeader {
  158. options.customMetadata.add(name: requestIDHeader, value: requestID)
  159. }
  160. }
  161. }
  162. }
  163. extension ClientConnection: GRPCChannel {
  164. public func makeCall<Request: Message, Response: Message>(
  165. path: String,
  166. type: GRPCCallType,
  167. callOptions: CallOptions,
  168. interceptors: [ClientInterceptor<Request, Response>]
  169. ) -> Call<Request, Response> {
  170. var options = callOptions
  171. self.populateLogger(in: &options)
  172. let multiplexer = self.getMultiplexer()
  173. let eventLoop = callOptions.eventLoopPreference.exact ?? multiplexer.eventLoop
  174. // This should be on the same event loop as the multiplexer (i.e. the event loop of the
  175. // underlying `Channel`.
  176. let channel = multiplexer.eventLoop.makePromise(of: Channel.self)
  177. multiplexer.whenComplete {
  178. ClientConnection.makeStreamChannel(using: $0, promise: channel)
  179. }
  180. return Call(
  181. path: path,
  182. type: type,
  183. eventLoop: eventLoop,
  184. options: options,
  185. interceptors: interceptors,
  186. transportFactory: .http2(
  187. channel: channel.futureResult,
  188. authority: self.authority,
  189. scheme: self.scheme,
  190. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  191. errorDelegate: self.configuration.errorDelegate
  192. )
  193. )
  194. }
  195. public func makeCall<Request: GRPCPayload, Response: GRPCPayload>(
  196. path: String,
  197. type: GRPCCallType,
  198. callOptions: CallOptions,
  199. interceptors: [ClientInterceptor<Request, Response>]
  200. ) -> Call<Request, Response> {
  201. var options = callOptions
  202. self.populateLogger(in: &options)
  203. let multiplexer = self.getMultiplexer()
  204. let eventLoop = callOptions.eventLoopPreference.exact ?? multiplexer.eventLoop
  205. // This should be on the same event loop as the multiplexer (i.e. the event loop of the
  206. // underlying `Channel`.
  207. let channel = multiplexer.eventLoop.makePromise(of: Channel.self)
  208. multiplexer.whenComplete {
  209. ClientConnection.makeStreamChannel(using: $0, promise: channel)
  210. }
  211. return Call(
  212. path: path,
  213. type: type,
  214. eventLoop: eventLoop,
  215. options: options,
  216. interceptors: interceptors,
  217. transportFactory: .http2(
  218. channel: channel.futureResult,
  219. authority: self.authority,
  220. scheme: self.scheme,
  221. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  222. errorDelegate: self.configuration.errorDelegate
  223. )
  224. )
  225. }
  226. private static func makeStreamChannel(
  227. using result: Result<NIOHTTP2Handler.StreamMultiplexer, Error>,
  228. promise: EventLoopPromise<Channel>
  229. ) {
  230. switch result {
  231. case let .success(multiplexer):
  232. multiplexer.createStreamChannel(promise: promise) {
  233. $0.eventLoop.makeSucceededVoidFuture()
  234. }
  235. case let .failure(error):
  236. promise.fail(error)
  237. }
  238. }
  239. }
  240. // MARK: - Configuration structures
  241. /// A target to connect to.
  242. public struct ConnectionTarget: Sendable {
  243. internal enum Wrapped {
  244. case hostAndPort(String, Int)
  245. case unixDomainSocket(String)
  246. case socketAddress(SocketAddress)
  247. case connectedSocket(NIOBSDSocket.Handle)
  248. case vsockAddress(VsockAddress)
  249. }
  250. internal var wrapped: Wrapped
  251. private init(_ wrapped: Wrapped) {
  252. self.wrapped = wrapped
  253. }
  254. /// The host and port. The port is 443 by default.
  255. public static func host(_ host: String, port: Int = 443) -> ConnectionTarget {
  256. return ConnectionTarget(.hostAndPort(host, port))
  257. }
  258. /// The host and port.
  259. public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget {
  260. return ConnectionTarget(.hostAndPort(host, port))
  261. }
  262. /// The path of a Unix domain socket.
  263. public static func unixDomainSocket(_ path: String) -> ConnectionTarget {
  264. return ConnectionTarget(.unixDomainSocket(path))
  265. }
  266. /// A NIO socket address.
  267. public static func socketAddress(_ address: SocketAddress) -> ConnectionTarget {
  268. return ConnectionTarget(.socketAddress(address))
  269. }
  270. /// A connected NIO socket.
  271. public static func connectedSocket(_ socket: NIOBSDSocket.Handle) -> ConnectionTarget {
  272. return ConnectionTarget(.connectedSocket(socket))
  273. }
  274. /// A vsock socket.
  275. public static func vsockAddress(_ vsockAddress: VsockAddress) -> ConnectionTarget {
  276. return ConnectionTarget(.vsockAddress(vsockAddress))
  277. }
  278. @usableFromInline
  279. var host: String {
  280. switch self.wrapped {
  281. case let .hostAndPort(host, _):
  282. return host
  283. case let .socketAddress(.v4(address)):
  284. return address.host
  285. case let .socketAddress(.v6(address)):
  286. return address.host
  287. case .unixDomainSocket, .socketAddress(.unixDomainSocket), .connectedSocket:
  288. return "localhost"
  289. case let .vsockAddress(address):
  290. return "vsock://\(address.cid)"
  291. }
  292. }
  293. }
  294. /// The connectivity behavior to use when starting an RPC.
  295. public struct CallStartBehavior: Hashable, Sendable {
  296. internal enum Behavior: Hashable, Sendable {
  297. case waitsForConnectivity
  298. case fastFailure
  299. }
  300. internal var wrapped: Behavior
  301. private init(_ wrapped: Behavior) {
  302. self.wrapped = wrapped
  303. }
  304. /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start
  305. /// an RPC. Doing so may involve multiple connection attempts.
  306. ///
  307. /// This is the preferred, and default, behaviour.
  308. public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity)
  309. /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed
  310. /// quickly rather than waiting for an active connection. The behaviour depends on the current
  311. /// connectivity state:
  312. ///
  313. /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails.
  314. /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt
  315. /// fails.
  316. /// - Ready: a connection is already active: the RPC will be started using that connection.
  317. /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to
  318. /// connect again. The RPC will fail immediately.
  319. /// - Shutdown: the connection is shutdown, the RPC will fail immediately.
  320. public static let fastFailure = CallStartBehavior(.fastFailure)
  321. }
  322. extension ClientConnection {
  323. /// Configuration for a ``ClientConnection``. Users should prefer using one of the
  324. /// ``ClientConnection`` builders: ``ClientConnection/secure(group:)`` or ``ClientConnection/insecure(group:)``.
  325. public struct Configuration: Sendable {
  326. /// The target to connect to.
  327. public var target: ConnectionTarget
  328. /// The event loop group to run the connection on.
  329. public var eventLoopGroup: EventLoopGroup
  330. /// An error delegate which is called when errors are caught. Provided delegates **must not
  331. /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain
  332. /// cycle. Defaults to ``LoggingClientErrorDelegate``.
  333. public var errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate.shared
  334. /// A delegate which is called when the connectivity state is changed. Defaults to `nil`.
  335. public var connectivityStateDelegate: ConnectivityStateDelegate?
  336. /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is
  337. /// provided but the queue is `nil` then one will be created by gRPC. Defaults to `nil`.
  338. public var connectivityStateDelegateQueue: DispatchQueue?
  339. #if canImport(NIOSSL)
  340. /// TLS configuration for this connection. `nil` if TLS is not desired.
  341. ///
  342. /// - Important: `tls` is deprecated; use ``tlsConfiguration`` or one of
  343. /// the ``ClientConnection/usingTLS(with:on:)`` builder functions.
  344. @available(*, deprecated, renamed: "tlsConfiguration")
  345. public var tls: TLS? {
  346. get {
  347. return self.tlsConfiguration?.asDeprecatedClientConfiguration
  348. }
  349. set {
  350. self.tlsConfiguration = newValue.map { .init(transforming: $0) }
  351. }
  352. }
  353. #endif // canImport(NIOSSL)
  354. /// TLS configuration for this connection. `nil` if TLS is not desired.
  355. public var tlsConfiguration: GRPCTLSConfiguration?
  356. /// The connection backoff configuration. If no connection retrying is required then this should
  357. /// be `nil`.
  358. public var connectionBackoff: ConnectionBackoff? = ConnectionBackoff()
  359. /// The connection keepalive configuration.
  360. public var connectionKeepalive = ClientConnectionKeepalive()
  361. /// The amount of time to wait before closing the connection. The idle timeout will start only
  362. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  363. ///
  364. /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
  365. ///
  366. /// Defaults to 30 minutes.
  367. public var connectionIdleTimeout: TimeAmount = .minutes(30)
  368. /// The behavior used to determine when an RPC should start. That is, whether it should wait for
  369. /// an active connection or fail quickly if no connection is currently available.
  370. ///
  371. /// Defaults to ``CallStartBehavior/waitsForConnectivity``.
  372. public var callStartBehavior: CallStartBehavior = .waitsForConnectivity
  373. /// The HTTP/2 flow control target window size. Defaults to 8MB. Values are clamped between
  374. /// 1 and 2^31-1 inclusive.
  375. public var httpTargetWindowSize = 8 * 1024 * 1024 {
  376. didSet {
  377. self.httpTargetWindowSize = self.httpTargetWindowSize.clamped(to: 1 ... Int(Int32.max))
  378. }
  379. }
  380. /// The HTTP/2 max frame size. Defaults to 16384. Value is clamped between 2^14 and 2^24-1
  381. /// octets inclusive (the minimum and maximum allowable values - HTTP/2 RFC 7540 4.2).
  382. public var httpMaxFrameSize: Int = 16384 {
  383. didSet {
  384. self.httpMaxFrameSize = self.httpMaxFrameSize.clamped(to: 16384 ... 16_777_215)
  385. }
  386. }
  387. /// The HTTP protocol used for this connection.
  388. public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol {
  389. return self.tlsConfiguration == nil ? .http : .https
  390. }
  391. /// The maximum size in bytes of a message which may be received from a server. Defaults to 4MB.
  392. public var maximumReceiveMessageLength: Int = 4 * 1024 * 1024 {
  393. willSet {
  394. precondition(newValue >= 0, "maximumReceiveMessageLength must be positive")
  395. }
  396. }
  397. /// A logger for background information (such as connectivity state). A separate logger for
  398. /// requests may be provided in the `CallOptions`.
  399. ///
  400. /// Defaults to a no-op logger.
  401. public var backgroundActivityLogger = Logger(
  402. label: "io.grpc",
  403. factory: { _ in SwiftLogNoOpLogHandler() }
  404. )
  405. /// A channel initializer which will be run after gRPC has initialized each channel. This may be
  406. /// used to add additional handlers to the pipeline and is intended for debugging.
  407. ///
  408. /// - Warning: The initializer closure may be invoked *multiple times*.
  409. @preconcurrency
  410. public var debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
  411. #if canImport(NIOSSL)
  412. /// Create a `Configuration` with some pre-defined defaults. Prefer using
  413. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  414. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  415. ///
  416. /// - Parameter target: The target to connect to.
  417. /// - Parameter eventLoopGroup: The event loop group to run the connection on.
  418. /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
  419. /// on debug builds.
  420. /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
  421. /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the
  422. /// `connectivityStateDelegate`.
  423. /// - Parameter tls: TLS configuration, defaulting to `nil`.
  424. /// - Parameter connectionBackoff: The connection backoff configuration to use.
  425. /// - Parameter connectionKeepalive: The keepalive configuration to use.
  426. /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 30 minutes.
  427. /// - Parameter callStartBehavior: The behavior used to determine when a call should start in
  428. /// relation to its underlying connection. Defaults to `waitsForConnectivity`.
  429. /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size.
  430. /// - Parameter backgroundActivityLogger: A logger for background information (such as
  431. /// connectivity state). Defaults to a no-op logger.
  432. /// - Parameter debugChannelInitializer: A channel initializer will be called after gRPC has
  433. /// initialized the channel. Defaults to `nil`.
  434. @available(*, deprecated, renamed: "default(target:eventLoopGroup:)")
  435. @preconcurrency
  436. public init(
  437. target: ConnectionTarget,
  438. eventLoopGroup: EventLoopGroup,
  439. errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
  440. connectivityStateDelegate: ConnectivityStateDelegate? = nil,
  441. connectivityStateDelegateQueue: DispatchQueue? = nil,
  442. tls: Configuration.TLS? = nil,
  443. connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
  444. connectionKeepalive: ClientConnectionKeepalive = ClientConnectionKeepalive(),
  445. connectionIdleTimeout: TimeAmount = .minutes(30),
  446. callStartBehavior: CallStartBehavior = .waitsForConnectivity,
  447. httpTargetWindowSize: Int = 8 * 1024 * 1024,
  448. backgroundActivityLogger: Logger = Logger(
  449. label: "io.grpc",
  450. factory: { _ in SwiftLogNoOpLogHandler() }
  451. ),
  452. debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
  453. ) {
  454. self.target = target
  455. self.eventLoopGroup = eventLoopGroup
  456. self.errorDelegate = errorDelegate
  457. self.connectivityStateDelegate = connectivityStateDelegate
  458. self.connectivityStateDelegateQueue = connectivityStateDelegateQueue
  459. self.tlsConfiguration = tls.map { GRPCTLSConfiguration(transforming: $0) }
  460. self.connectionBackoff = connectionBackoff
  461. self.connectionKeepalive = connectionKeepalive
  462. self.connectionIdleTimeout = connectionIdleTimeout
  463. self.callStartBehavior = callStartBehavior
  464. self.httpTargetWindowSize = httpTargetWindowSize
  465. self.backgroundActivityLogger = backgroundActivityLogger
  466. self.debugChannelInitializer = debugChannelInitializer
  467. }
  468. #endif // canImport(NIOSSL)
  469. private init(eventLoopGroup: EventLoopGroup, target: ConnectionTarget) {
  470. self.eventLoopGroup = eventLoopGroup
  471. self.target = target
  472. }
  473. /// Make a new configuration using default values.
  474. ///
  475. /// - Parameters:
  476. /// - target: The target to connect to.
  477. /// - eventLoopGroup: The `EventLoopGroup` providing an `EventLoop` for the connection to
  478. /// run on.
  479. /// - Returns: A configuration with default values set.
  480. public static func `default`(
  481. target: ConnectionTarget,
  482. eventLoopGroup: EventLoopGroup
  483. ) -> Configuration {
  484. return .init(eventLoopGroup: eventLoopGroup, target: target)
  485. }
  486. }
  487. }
  488. // MARK: - Configuration helpers/extensions
  489. extension ClientBootstrapProtocol {
  490. /// Connect to the given connection target.
  491. ///
  492. /// - Parameter target: The target to connect to.
  493. func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> {
  494. switch target.wrapped {
  495. case let .hostAndPort(host, port):
  496. return self.connect(host: host, port: port)
  497. case let .unixDomainSocket(path):
  498. return self.connect(unixDomainSocketPath: path)
  499. case let .socketAddress(address):
  500. return self.connect(to: address)
  501. case let .connectedSocket(socket):
  502. return self.withConnectedSocket(socket)
  503. case let .vsockAddress(address):
  504. return self.connect(to: address)
  505. }
  506. }
  507. }
  508. #if canImport(NIOSSL)
  509. extension ChannelPipeline.SynchronousOperations {
  510. internal func configureNIOSSLForGRPCClient(
  511. sslContext: Result<NIOSSLContext, Error>,
  512. serverHostname: String?,
  513. customVerificationCallback: NIOSSLCustomVerificationCallback?,
  514. logger: Logger
  515. ) throws {
  516. let sslContext = try sslContext.get()
  517. let sslClientHandler: NIOSSLClientHandler
  518. if let customVerificationCallback = customVerificationCallback {
  519. sslClientHandler = try NIOSSLClientHandler(
  520. context: sslContext,
  521. serverHostname: serverHostname,
  522. customVerificationCallback: customVerificationCallback
  523. )
  524. } else {
  525. sslClientHandler = try NIOSSLClientHandler(
  526. context: sslContext,
  527. serverHostname: serverHostname
  528. )
  529. }
  530. try self.addHandler(sslClientHandler)
  531. try self.addHandler(TLSVerificationHandler(logger: logger))
  532. }
  533. }
  534. #endif // canImport(NIOSSL)
  535. extension ChannelPipeline.SynchronousOperations {
  536. internal func configureHTTP2AndGRPCHandlersForGRPCClient(
  537. channel: Channel,
  538. connectionManager: ConnectionManager,
  539. connectionKeepalive: ClientConnectionKeepalive,
  540. connectionIdleTimeout: TimeAmount,
  541. httpTargetWindowSize: Int,
  542. httpMaxFrameSize: Int,
  543. errorDelegate: ClientErrorDelegate?,
  544. logger: Logger
  545. ) throws {
  546. let initialSettings = [
  547. // As per the default settings for swift-nio-http2:
  548. HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize),
  549. // We never expect (or allow) server initiated streams.
  550. HTTP2Setting(parameter: .maxConcurrentStreams, value: 0),
  551. // As configured by the user.
  552. HTTP2Setting(parameter: .maxFrameSize, value: httpMaxFrameSize),
  553. HTTP2Setting(parameter: .initialWindowSize, value: httpTargetWindowSize),
  554. ]
  555. let grpcIdleHandler = GRPCIdleHandler(
  556. connectionManager: connectionManager,
  557. idleTimeout: connectionIdleTimeout,
  558. keepalive: connectionKeepalive,
  559. logger: logger
  560. )
  561. var connectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration()
  562. connectionConfiguration.initialSettings = initialSettings
  563. var streamConfiguration = NIOHTTP2Handler.StreamConfiguration()
  564. streamConfiguration.targetWindowSize = httpTargetWindowSize
  565. let h2Handler = NIOHTTP2Handler(
  566. mode: .client,
  567. eventLoop: channel.eventLoop,
  568. connectionConfiguration: connectionConfiguration,
  569. streamConfiguration: streamConfiguration,
  570. streamDelegate: grpcIdleHandler
  571. ) { channel in
  572. channel.close()
  573. }
  574. try self.addHandler(h2Handler)
  575. grpcIdleHandler.setMultiplexer(try h2Handler.syncMultiplexer())
  576. try self.addHandler(grpcIdleHandler)
  577. try self.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  578. }
  579. }
  580. extension Channel {
  581. func configureGRPCClient(
  582. errorDelegate: ClientErrorDelegate?,
  583. logger: Logger
  584. ) -> EventLoopFuture<Void> {
  585. return self.configureHTTP2Pipeline(
  586. mode: .client,
  587. connectionConfiguration: .init(),
  588. streamConfiguration: .init()
  589. ) { channel in
  590. channel.eventLoop.makeSucceededVoidFuture()
  591. }.flatMap { _ in
  592. self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  593. }
  594. }
  595. }
  596. extension TimeAmount {
  597. /// Creates a new `TimeAmount` from the given time interval in seconds.
  598. ///
  599. /// - Parameter timeInterval: The amount of time in seconds
  600. static func seconds(timeInterval: TimeInterval) -> TimeAmount {
  601. return .nanoseconds(Int64(timeInterval * 1_000_000_000))
  602. }
  603. }
  604. extension String {
  605. var isIPAddress: Bool {
  606. // We need some scratch space to let inet_pton write into.
  607. var ipv4Addr = in_addr()
  608. var ipv6Addr = in6_addr()
  609. return self.withCString { ptr in
  610. inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
  611. inet_pton(AF_INET6, ptr, &ipv6Addr) == 1
  612. }
  613. }
  614. }