ClientConnection.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP2
  19. import NIOSSL
  20. import NIOTLS
  21. import Logging
  22. /// Provides a single, managed connection to a server.
  23. ///
  24. /// The connection to the server is provided by a single channel which will attempt to reconnect
  25. /// to the server if the connection is dropped. This connection is guaranteed to always use the same
  26. /// event loop.
  27. ///
  28. /// The connection is initially setup with a handler to verify that TLS was established
  29. /// successfully (assuming TLS is being used).
  30. ///
  31. /// ┌──────────────────────────┐
  32. /// │ DelegatingErrorHandler │
  33. /// └──────────▲───────────────┘
  34. /// HTTP2Frame│
  35. /// ┌──────────┴───────────────┐
  36. /// │ SettingsObservingHandler │
  37. /// └──────────▲───────────────┘
  38. /// HTTP2Frame│
  39. /// │ ⠇ ⠇ ⠇ ⠇
  40. /// │ ┌┴─▼┐ ┌┴─▼┐
  41. /// │ │ | │ | HTTP/2 streams
  42. /// │ └▲─┬┘ └▲─┬┘
  43. /// │ │ │ │ │ HTTP2Frame
  44. /// ┌─┴────────────────┴─▼───┴─▼┐
  45. /// │ HTTP2StreamMultiplexer |
  46. /// └─▲───────────────────────┬─┘
  47. /// HTTP2Frame│ │HTTP2Frame
  48. /// ┌─┴───────────────────────▼─┐
  49. /// │ NIOHTTP2Handler │
  50. /// └─▲───────────────────────┬─┘
  51. /// ByteBuffer│ │ByteBuffer
  52. /// ┌─┴───────────────────────▼─┐
  53. /// │ TLSVerificationHandler │
  54. /// └─▲───────────────────────┬─┘
  55. /// ByteBuffer│ │ByteBuffer
  56. /// ┌─┴───────────────────────▼─┐
  57. /// │ NIOSSLHandler │
  58. /// └─▲───────────────────────┬─┘
  59. /// ByteBuffer│ │ByteBuffer
  60. /// │ ▼
  61. ///
  62. /// The `TLSVerificationHandler` observes the outcome of the SSL handshake and determines
  63. /// whether a `ClientConnection` should be returned to the user. In either eventuality, the
  64. /// handler removes itself from the pipeline once TLS has been verified. There is also a handler
  65. /// after the multiplexer for observing the initial settings frame, after which it determines that
  66. /// the connection state is `.ready` and removes itself from the channel. Finally there is a
  67. /// delegated error handler which uses the error delegate associated with this connection
  68. /// (see `DelegatingErrorHandler`).
  69. ///
  70. /// See `BaseClientCall` for a description of the pipelines associated with each HTTP/2 stream.
  71. public class ClientConnection {
  72. private let id: String
  73. private let logger: Logger
  74. /// The channel which will handle gRPC calls.
  75. internal var channel: EventLoopFuture<Channel> {
  76. willSet {
  77. self.willSetChannel(to: newValue)
  78. }
  79. didSet {
  80. self.didSetChannel(to: self.channel)
  81. }
  82. }
  83. /// HTTP multiplexer from the `channel` handling gRPC calls.
  84. internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  85. /// The configuration for this client.
  86. internal let configuration: Configuration
  87. internal let scheme: String
  88. internal let authority: String
  89. /// A monitor for the connectivity state.
  90. public let connectivity: ConnectivityStateMonitor
  91. /// The `EventLoop` this connection is using.
  92. public var eventLoop: EventLoop {
  93. return self.channel.eventLoop
  94. }
  95. /// Creates a new connection from the given configuration.
  96. public init(configuration: Configuration) {
  97. self.configuration = configuration
  98. self.scheme = configuration.tls == nil ? "http" : "https"
  99. self.authority = configuration.target.host
  100. let id = String(describing: UUID())
  101. self.id = id
  102. var logger = Logger(subsystem: .clientChannel)
  103. logger[metadataKey: MetadataKey.connectionID] = "\(id)"
  104. self.logger = logger
  105. self.connectivity = ConnectivityStateMonitor(
  106. delegate: configuration.connectivityStateDelegate,
  107. logger: logger
  108. )
  109. let eventLoop = configuration.eventLoopGroup.next()
  110. self.channel = ClientConnection.makeChannel(
  111. configuration: self.configuration,
  112. eventLoop: eventLoop,
  113. connectivity: self.connectivity,
  114. backoffIterator: configuration.connectionBackoff?.makeIterator(),
  115. logger: logger
  116. )
  117. self.multiplexer = self.channel.flatMap {
  118. $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  119. }
  120. // `willSet` and `didSet` are *not* called on initialization, call them explicitly now.
  121. self.willSetChannel(to: self.channel)
  122. self.didSetChannel(to: self.channel)
  123. }
  124. /// Closes the connection to the server.
  125. public func close() -> EventLoopFuture<Void> {
  126. if self.connectivity.state == .shutdown {
  127. // We're already shutdown or in the process of shutting down.
  128. return self.channel.flatMap { $0.closeFuture }
  129. } else {
  130. self.connectivity.initiateUserShutdown()
  131. return self.channel.flatMap { $0.close() }
  132. }
  133. }
  134. }
  135. extension ClientConnection {
  136. /// Register a callback on the close future of the given `channel` to replace the channel (if
  137. /// possible) and also replace the `multiplexer` with that from the new channel.
  138. ///
  139. /// - Parameter channel: The channel that will be set.
  140. private func willSetChannel(to channel: EventLoopFuture<Channel>) {
  141. // If we're about to set the channel and the user has initiated a shutdown (i.e. while the new
  142. // channel was being created) then it is no longer needed.
  143. guard !self.connectivity.userHasInitiatedShutdown else {
  144. channel.whenSuccess { channel in
  145. self.logger.debug("user initiated shutdown during connection, closing channel")
  146. channel.close(mode: .all, promise: nil)
  147. }
  148. return
  149. }
  150. // If we get a channel and it closes then create a new one, if necessary.
  151. channel.flatMap { $0.closeFuture }.whenComplete { result in
  152. switch result {
  153. case .success:
  154. self.logger.debug("client connection shutdown successfully")
  155. case .failure(let error):
  156. self.logger.warning(
  157. "client connection shutdown failed",
  158. metadata: [MetadataKey.error: "\(error)"]
  159. )
  160. }
  161. guard self.connectivity.canAttemptReconnect else {
  162. return
  163. }
  164. // Something went wrong, but we'll try to fix it so let's update our state to reflect that.
  165. self.connectivity.state = .transientFailure
  166. self.logger.debug("client connection channel closed, creating a new one")
  167. self.channel = ClientConnection.makeChannel(
  168. configuration: self.configuration,
  169. eventLoop: channel.eventLoop,
  170. connectivity: self.connectivity,
  171. backoffIterator: self.configuration.connectionBackoff?.makeIterator(),
  172. logger: self.logger
  173. )
  174. }
  175. self.multiplexer = channel.flatMap {
  176. $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  177. }
  178. }
  179. /// Register a callback on the given `channel` to update the connectivity state.
  180. ///
  181. /// - Parameter channel: The channel that was set.
  182. private func didSetChannel(to channel: EventLoopFuture<Channel>) {
  183. channel.whenFailure { _ in
  184. self.connectivity.state = .shutdown
  185. }
  186. }
  187. }
  188. // Note: documentation is inherited.
  189. extension ClientConnection: GRPCChannel {
  190. public func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  191. path: String,
  192. request: Request,
  193. callOptions: CallOptions
  194. ) -> UnaryCall<Request, Response> where Request : GRPCPayload, Response : GRPCPayload {
  195. return UnaryCall(
  196. path: path,
  197. scheme: self.scheme,
  198. authority: self.authority,
  199. callOptions: callOptions,
  200. eventLoop: self.eventLoop,
  201. multiplexer: self.multiplexer,
  202. errorDelegate: self.configuration.errorDelegate,
  203. logger: self.logger,
  204. request: request
  205. )
  206. }
  207. public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  208. path: String,
  209. callOptions: CallOptions
  210. ) -> ClientStreamingCall<Request, Response> {
  211. return ClientStreamingCall(
  212. path: path,
  213. scheme: self.scheme,
  214. authority: self.authority,
  215. callOptions: callOptions,
  216. eventLoop: self.eventLoop,
  217. multiplexer: self.multiplexer,
  218. errorDelegate: self.configuration.errorDelegate,
  219. logger: self.logger
  220. )
  221. }
  222. public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  223. path: String,
  224. request: Request,
  225. callOptions: CallOptions,
  226. handler: @escaping (Response) -> Void
  227. ) -> ServerStreamingCall<Request, Response> {
  228. return ServerStreamingCall(
  229. path: path,
  230. scheme: self.scheme,
  231. authority: self.authority,
  232. callOptions: callOptions,
  233. eventLoop: self.eventLoop,
  234. multiplexer: self.multiplexer,
  235. errorDelegate: self.configuration.errorDelegate,
  236. logger: self.logger,
  237. request: request,
  238. handler: handler
  239. )
  240. }
  241. public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  242. path: String,
  243. callOptions: CallOptions,
  244. handler: @escaping (Response) -> Void
  245. ) -> BidirectionalStreamingCall<Request, Response> {
  246. return BidirectionalStreamingCall(
  247. path: path,
  248. scheme: self.scheme,
  249. authority: self.authority,
  250. callOptions: callOptions,
  251. eventLoop: self.eventLoop,
  252. multiplexer: self.multiplexer,
  253. errorDelegate: self.configuration.errorDelegate,
  254. logger: self.logger,
  255. handler: handler
  256. )
  257. }
  258. }
  259. extension ClientConnection {
  260. /// Attempts to create a new `Channel` using the given configuration.
  261. ///
  262. /// This involves: creating a `ClientBootstrapProtocol`, connecting to a target and verifying that
  263. /// the TLS handshake was successful (if TLS was configured). We _may_ additiionally set a
  264. /// connection timeout and schedule a retry attempt (should the connection fail) if a
  265. /// `ConnectionBackoffIterator` is provided.
  266. ///
  267. /// - Parameter configuration: The configuration to start the connection with.
  268. /// - Parameter eventLoop: The event loop to use for this connection.
  269. /// - Parameter connectivity: A connectivity state monitor.
  270. /// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
  271. /// connection timeouts and backoff to use when attempting to create a connection.
  272. private class func makeChannel(
  273. configuration: Configuration,
  274. eventLoop: EventLoop,
  275. connectivity: ConnectivityStateMonitor,
  276. backoffIterator: ConnectionBackoffIterator?,
  277. logger: Logger
  278. ) -> EventLoopFuture<Channel> {
  279. guard connectivity.state == .idle || connectivity.state == .transientFailure else {
  280. return configuration.eventLoopGroup.next().makeFailedFuture(GRPCStatus.processingError)
  281. }
  282. logger.debug("attempting to connect", metadata: ["target": "\(configuration.target)", "event_loop": "\(eventLoop)"])
  283. connectivity.state = .connecting
  284. let timeoutAndBackoff = backoffIterator?.next()
  285. let bootstrap = self.makeBootstrap(
  286. configuration: configuration,
  287. eventLoop: eventLoop,
  288. timeout: timeoutAndBackoff?.timeout,
  289. connectivityMonitor: connectivity,
  290. logger: logger
  291. )
  292. let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
  293. if configuration.tls != nil {
  294. return channel.verifyTLS().map { channel }
  295. } else {
  296. return channel.eventLoop.makeSucceededFuture(channel)
  297. }
  298. }
  299. // If we don't have backoff then we can't retry, just return the `channel` no matter what
  300. // state we are in.
  301. guard let backoff = timeoutAndBackoff?.backoff else {
  302. logger.debug("backoff exhausted, no more connection attempts will be made")
  303. return channel
  304. }
  305. // If our connection attempt was unsuccessful, schedule another attempt in some time.
  306. return channel.flatMapError { error in
  307. logger.notice("connection attempt failed", metadata: [MetadataKey.error: "\(error)"])
  308. // We will try to connect again: the failure is transient.
  309. connectivity.state = .transientFailure
  310. return ClientConnection.scheduleReconnectAttempt(
  311. in: backoff,
  312. on: channel.eventLoop,
  313. configuration: configuration,
  314. connectivity: connectivity,
  315. backoffIterator: backoffIterator,
  316. logger: logger
  317. )
  318. }
  319. }
  320. /// Schedule an attempt to make a channel in `timeout` seconds on the given `eventLoop`.
  321. private class func scheduleReconnectAttempt(
  322. in timeout: TimeInterval,
  323. on eventLoop: EventLoop,
  324. configuration: Configuration,
  325. connectivity: ConnectivityStateMonitor,
  326. backoffIterator: ConnectionBackoffIterator?,
  327. logger: Logger
  328. ) -> EventLoopFuture<Channel> {
  329. logger.debug("scheduling connection attempt", metadata: ["delay_seconds": "\(timeout)"])
  330. // The `futureResult` of the scheduled task is of type
  331. // `EventLoopFuture<EventLoopFuture<Channel>>`, so we need to `flatMap` it to
  332. // remove a level of indirection.
  333. return eventLoop.scheduleTask(in: .seconds(timeInterval: timeout)) {
  334. ClientConnection.makeChannel(
  335. configuration: configuration,
  336. eventLoop: eventLoop,
  337. connectivity: connectivity,
  338. backoffIterator: backoffIterator,
  339. logger: logger
  340. )
  341. }.futureResult.flatMap { channel in
  342. channel
  343. }
  344. }
  345. /// Makes and configures a `ClientBootstrap` using the provided configuration.
  346. ///
  347. /// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
  348. /// handlers detailed in the documentation for `ClientConnection`.
  349. ///
  350. /// - Parameter configuration: The configuration to prepare the bootstrap with.
  351. /// - Parameter eventLoop: The `EventLoop` to use for the bootstrap.
  352. /// - Parameter timeout: The connection timeout in seconds.
  353. /// - Parameter connectivityMonitor: The connectivity state monitor for the created channel.
  354. private class func makeBootstrap(
  355. configuration: Configuration,
  356. eventLoop: EventLoop,
  357. timeout: TimeInterval?,
  358. connectivityMonitor: ConnectivityStateMonitor,
  359. logger: Logger
  360. ) -> ClientBootstrapProtocol {
  361. // Provide a server hostname if we're using TLS. Prefer the override.
  362. var serverHostname: String? = configuration.tls.map {
  363. if let hostnameOverride = $0.hostnameOverride {
  364. logger.debug("using hostname override for TLS", metadata: ["server-hostname": "\(hostnameOverride)"])
  365. return hostnameOverride
  366. } else {
  367. let host = configuration.target.host
  368. logger.debug("using host from connection target for TLS", metadata: ["server-hostname": "\(host)"])
  369. return host
  370. }
  371. }
  372. if let hostname = serverHostname, hostname.isIPAddress {
  373. logger.debug("IP address cannot be used for TLS SNI extension. No host used", metadata: ["server-hostname": "\(hostname)"])
  374. serverHostname = nil
  375. }
  376. let bootstrap = PlatformSupport.makeClientBootstrap(group: eventLoop, logger: logger)
  377. // Enable SO_REUSEADDR and TCP_NODELAY.
  378. .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  379. .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  380. .channelInitializer { channel in
  381. channel.configureGRPCClient(
  382. tlsConfiguration: configuration.tls?.configuration,
  383. tlsServerHostname: serverHostname,
  384. connectivityMonitor: connectivityMonitor,
  385. errorDelegate: configuration.errorDelegate,
  386. logger: logger
  387. )
  388. }
  389. if let timeout = timeout {
  390. logger.debug("setting connect timeout", metadata: ["timeout_seconds" : "\(timeout)"])
  391. return bootstrap.connectTimeout(.seconds(timeInterval: timeout))
  392. } else {
  393. logger.debug("no connect timeout provided")
  394. return bootstrap
  395. }
  396. }
  397. }
  398. // MARK: - Configuration structures
  399. /// A target to connect to.
  400. public enum ConnectionTarget {
  401. /// The host and port.
  402. case hostAndPort(String, Int)
  403. /// The path of a Unix domain socket.
  404. case unixDomainSocket(String)
  405. /// A NIO socket address.
  406. case socketAddress(SocketAddress)
  407. var host: String {
  408. switch self {
  409. case .hostAndPort(let host, _):
  410. return host
  411. case .socketAddress(.v4(let address)):
  412. return address.host
  413. case .socketAddress(.v6(let address)):
  414. return address.host
  415. case .unixDomainSocket, .socketAddress(.unixDomainSocket):
  416. return "localhost"
  417. }
  418. }
  419. }
  420. extension ClientConnection {
  421. /// The configuration for a connection.
  422. public struct Configuration {
  423. /// The target to connect to.
  424. public var target: ConnectionTarget
  425. /// The event loop group to run the connection on.
  426. public var eventLoopGroup: EventLoopGroup
  427. /// An error delegate which is called when errors are caught. Provided delegates **must not
  428. /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain
  429. /// cycle.
  430. public var errorDelegate: ClientErrorDelegate?
  431. /// A delegate which is called when the connectivity state is changed.
  432. public var connectivityStateDelegate: ConnectivityStateDelegate?
  433. /// TLS configuration for this connection. `nil` if TLS is not desired.
  434. public var tls: TLS?
  435. /// The connection backoff configuration. If no connection retrying is required then this should
  436. /// be `nil`.
  437. public var connectionBackoff: ConnectionBackoff?
  438. /// The HTTP protocol used for this connection.
  439. public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
  440. return self.tls == nil ? .http : .https
  441. }
  442. /// Create a `Configuration` with some pre-defined defaults.
  443. ///
  444. /// - Parameter target: The target to connect to.
  445. /// - Parameter eventLoopGroup: The event loop group to run the connection on.
  446. /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
  447. /// on debug builds.
  448. /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
  449. /// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
  450. /// - Parameter connectionBackoff: The connection backoff configuration to use.
  451. /// - Parameter messageEncoding: Message compression configuration, defaults to no compression.
  452. public init(
  453. target: ConnectionTarget,
  454. eventLoopGroup: EventLoopGroup,
  455. errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
  456. connectivityStateDelegate: ConnectivityStateDelegate? = nil,
  457. tls: Configuration.TLS? = nil,
  458. connectionBackoff: ConnectionBackoff? = ConnectionBackoff()
  459. ) {
  460. self.target = target
  461. self.eventLoopGroup = eventLoopGroup
  462. self.errorDelegate = errorDelegate
  463. self.connectivityStateDelegate = connectivityStateDelegate
  464. self.tls = tls
  465. self.connectionBackoff = connectionBackoff
  466. }
  467. }
  468. }
  469. // MARK: - Configuration helpers/extensions
  470. fileprivate extension ClientBootstrapProtocol {
  471. /// Connect to the given connection target.
  472. ///
  473. /// - Parameter target: The target to connect to.
  474. func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> {
  475. switch target {
  476. case .hostAndPort(let host, let port):
  477. return self.connect(host: host, port: port)
  478. case .unixDomainSocket(let path):
  479. return self.connect(unixDomainSocketPath: path)
  480. case .socketAddress(let address):
  481. return self.connect(to: address)
  482. }
  483. }
  484. }
  485. extension Channel {
  486. /// Configure the channel with TLS.
  487. ///
  488. /// This function adds two handlers to the pipeline: the `NIOSSLClientHandler` to handle TLS, and
  489. /// the `TLSVerificationHandler` which verifies that a successful handshake was completed.
  490. ///
  491. /// - Parameter configuration: The configuration to configure the channel with.
  492. /// - Parameter serverHostname: The server hostname to use if the hostname should be verified.
  493. /// - Parameter errorDelegate: The error delegate to use for the TLS verification handler.
  494. func configureTLS(
  495. _ configuration: TLSConfiguration,
  496. serverHostname: String?,
  497. errorDelegate: ClientErrorDelegate?,
  498. logger: Logger
  499. ) -> EventLoopFuture<Void> {
  500. do {
  501. let sslClientHandler = try NIOSSLClientHandler(
  502. context: try NIOSSLContext(configuration: configuration),
  503. serverHostname: serverHostname
  504. )
  505. return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler(logger: logger))
  506. } catch {
  507. return self.eventLoop.makeFailedFuture(error)
  508. }
  509. }
  510. /// Returns the `verification` future from the `TLSVerificationHandler` in this channels pipeline.
  511. func verifyTLS() -> EventLoopFuture<Void> {
  512. return self.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
  513. $0.verification
  514. }
  515. }
  516. func configureGRPCClient(
  517. tlsConfiguration: TLSConfiguration?,
  518. tlsServerHostname: String?,
  519. connectivityMonitor: ConnectivityStateMonitor,
  520. errorDelegate: ClientErrorDelegate?,
  521. logger: Logger
  522. ) -> EventLoopFuture<Void> {
  523. let tlsConfigured = tlsConfiguration.map {
  524. self.configureTLS($0, serverHostname: tlsServerHostname, errorDelegate: errorDelegate, logger: logger)
  525. }
  526. return (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
  527. self.configureHTTP2Pipeline(mode: .client)
  528. }.flatMap { _ in
  529. let settingsObserver = InitialSettingsObservingHandler(
  530. connectivityStateMonitor: connectivityMonitor,
  531. logger: logger
  532. )
  533. let errorHandler = DelegatingErrorHandler(
  534. logger: logger,
  535. delegate: errorDelegate
  536. )
  537. return self.pipeline.addHandlers(settingsObserver, errorHandler)
  538. }
  539. }
  540. func configureGRPCClient(
  541. errorDelegate: ClientErrorDelegate?,
  542. logger: Logger
  543. ) -> EventLoopFuture<Void> {
  544. return self.configureHTTP2Pipeline(mode: .client).flatMap { _ in
  545. self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  546. }
  547. }
  548. }
  549. fileprivate extension TimeAmount {
  550. /// Creates a new `TimeAmount` from the given time interval in seconds.
  551. ///
  552. /// - Parameter timeInterval: The amount of time in seconds
  553. static func seconds(timeInterval: TimeInterval) -> TimeAmount {
  554. return .nanoseconds(Int64(timeInterval * 1_000_000_000))
  555. }
  556. }
  557. fileprivate extension String {
  558. var isIPAddress: Bool {
  559. // We need some scratch space to let inet_pton write into.
  560. var ipv4Addr = in_addr()
  561. var ipv6Addr = in6_addr()
  562. return self.withCString { ptr in
  563. return inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
  564. inet_pton(AF_INET6, ptr, &ipv6Addr) == 1
  565. }
  566. }
  567. }