ClientConnection.swift 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP2
  19. import NIOSSL
  20. import NIOTLS
  21. import Logging
  22. import SwiftProtobuf
  23. /// Provides a single, managed connection to a server.
  24. ///
  25. /// The connection to the server is provided by a single channel which will attempt to reconnect
  26. /// to the server if the connection is dropped. This connection is guaranteed to always use the same
  27. /// event loop.
  28. ///
  29. /// The connection is initially setup with a handler to verify that TLS was established
  30. /// successfully (assuming TLS is being used).
  31. ///
  32. /// ┌──────────────────────────┐
  33. /// │ DelegatingErrorHandler │
  34. /// └──────────▲───────────────┘
  35. /// HTTP2Frame│
  36. /// ┌──────────┴───────────────┐
  37. /// │ SettingsObservingHandler │
  38. /// └──────────▲───────────────┘
  39. /// HTTP2Frame│
  40. /// │ ⠇ ⠇ ⠇ ⠇
  41. /// │ ┌┴─▼┐ ┌┴─▼┐
  42. /// │ │ | │ | HTTP/2 streams
  43. /// │ └▲─┬┘ └▲─┬┘
  44. /// │ │ │ │ │ HTTP2Frame
  45. /// ┌─┴────────────────┴─▼───┴─▼┐
  46. /// │ HTTP2StreamMultiplexer |
  47. /// └─▲───────────────────────┬─┘
  48. /// HTTP2Frame│ │HTTP2Frame
  49. /// ┌─┴───────────────────────▼─┐
  50. /// │ NIOHTTP2Handler │
  51. /// └─▲───────────────────────┬─┘
  52. /// ByteBuffer│ │ByteBuffer
  53. /// ┌─┴───────────────────────▼─┐
  54. /// │ TLSVerificationHandler │
  55. /// └─▲───────────────────────┬─┘
  56. /// ByteBuffer│ │ByteBuffer
  57. /// ┌─┴───────────────────────▼─┐
  58. /// │ NIOSSLHandler │
  59. /// └─▲───────────────────────┬─┘
  60. /// ByteBuffer│ │ByteBuffer
  61. /// │ ▼
  62. ///
  63. /// The `TLSVerificationHandler` observes the outcome of the SSL handshake and determines
  64. /// whether a `ClientConnection` should be returned to the user. In either eventuality, the
  65. /// handler removes itself from the pipeline once TLS has been verified. There is also a handler
  66. /// after the multiplexer for observing the initial settings frame, after which it determines that
  67. /// the connection state is `.ready` and removes itself from the channel. Finally there is a
  68. /// delegated error handler which uses the error delegate associated with this connection
  69. /// (see `DelegatingErrorHandler`).
  70. ///
  71. /// See `BaseClientCall` for a description of the pipelines associated with each HTTP/2 stream.
  72. public class ClientConnection {
  73. private let connectionManager: ConnectionManager
  74. private func getChannel() -> EventLoopFuture<Channel> {
  75. switch self.configuration.callStartBehavior.wrapped {
  76. case .waitsForConnectivity:
  77. return self.connectionManager.getChannel()
  78. case .fastFailure:
  79. return self.connectionManager.getOptimisticChannel()
  80. }
  81. }
  82. /// HTTP multiplexer from the `channel` handling gRPC calls.
  83. internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> {
  84. return self.getChannel().flatMap {
  85. $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  86. }
  87. }
  88. /// The configuration for this client.
  89. internal let configuration: Configuration
  90. internal let scheme: String
  91. internal let authority: String
  92. /// A monitor for the connectivity state.
  93. public var connectivity: ConnectivityStateMonitor {
  94. return self.connectionManager.monitor
  95. }
  96. /// The `EventLoop` this connection is using.
  97. public var eventLoop: EventLoop {
  98. return self.connectionManager.eventLoop
  99. }
  100. /// Creates a new connection from the given configuration. Prefer using
  101. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  102. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  103. ///
  104. /// - Important: Users should prefer using `ClientConnection.secure(group:)` to build a connection
  105. /// with TLS, or `ClientConnection.insecure(group:)` to build a connection without TLS.
  106. public init(configuration: Configuration) {
  107. self.configuration = configuration
  108. self.scheme = configuration.tls == nil ? "http" : "https"
  109. self.authority = configuration.target.host
  110. self.connectionManager = ConnectionManager(
  111. configuration: configuration,
  112. logger: configuration.backgroundActivityLogger
  113. )
  114. }
  115. /// Closes the connection to the server.
  116. public func close() -> EventLoopFuture<Void> {
  117. return self.connectionManager.shutdown()
  118. }
  119. /// Extracts a logger and request ID from the call options and returns them. The logger will
  120. /// be populated with the request ID (if applicable) and any metadata from the connection manager.
  121. private func populatedLoggerAndRequestID(from callOptions: CallOptions) -> (Logger, String?) {
  122. var logger = callOptions.logger
  123. self.connectionManager.appendMetadata(to: &logger)
  124. // Attach the request ID.
  125. let requestID = callOptions.requestIDProvider.requestID()
  126. if let requestID = requestID {
  127. logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
  128. }
  129. return (logger, requestID)
  130. }
  131. private func makeRequestHead(path: String, options: CallOptions, requestID: String?) -> _GRPCRequestHead {
  132. return _GRPCRequestHead(
  133. scheme: self.scheme,
  134. path: path,
  135. host: self.authority,
  136. options: options,
  137. requestID: requestID
  138. )
  139. }
  140. }
  141. // MARK: - Unary
  142. extension ClientConnection: GRPCChannel {
  143. private func makeUnaryCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  144. serializer: Serializer,
  145. deserializer: Deserializer,
  146. path: String,
  147. request: Serializer.Input,
  148. callOptions: CallOptions
  149. ) -> UnaryCall<Serializer.Input, Deserializer.Output> {
  150. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  151. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  152. let call = UnaryCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  153. multiplexer: self.multiplexer,
  154. serializer: serializer,
  155. deserializer: deserializer,
  156. callOptions: callOptions,
  157. errorDelegate: self.configuration.errorDelegate,
  158. logger: logger
  159. )
  160. call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
  161. return call
  162. }
  163. /// A unary call using `SwiftProtobuf.Message` messages.
  164. public func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  165. path: String,
  166. request: Request,
  167. callOptions: CallOptions
  168. ) -> UnaryCall<Request, Response> {
  169. return self.makeUnaryCall(
  170. serializer: ProtobufSerializer(),
  171. deserializer: ProtobufDeserializer(),
  172. path: path,
  173. request: request,
  174. callOptions: callOptions
  175. )
  176. }
  177. /// A unary call using `GRPCPayload` messages.
  178. public func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  179. path: String,
  180. request: Request,
  181. callOptions: CallOptions
  182. ) -> UnaryCall<Request, Response> {
  183. return self.makeUnaryCall(
  184. serializer: GRPCPayloadSerializer(),
  185. deserializer: GRPCPayloadDeserializer(),
  186. path: path,
  187. request: request,
  188. callOptions: callOptions
  189. )
  190. }
  191. }
  192. // MARK: - Client Streaming
  193. extension ClientConnection {
  194. private func makeClientStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  195. serializer: Serializer,
  196. deserializer: Deserializer,
  197. path: String,
  198. callOptions: CallOptions
  199. ) -> ClientStreamingCall<Serializer.Input, Deserializer.Output> {
  200. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  201. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  202. let call = ClientStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  203. multiplexer: self.multiplexer,
  204. serializer: serializer,
  205. deserializer: deserializer,
  206. callOptions: callOptions,
  207. errorDelegate: self.configuration.errorDelegate,
  208. logger: logger
  209. )
  210. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  211. return call
  212. }
  213. /// A client streaming call using `SwiftProtobuf.Message` messages.
  214. public func makeClientStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  215. path: String,
  216. callOptions: CallOptions
  217. ) -> ClientStreamingCall<Request, Response> {
  218. return self.makeClientStreamingCall(
  219. serializer: ProtobufSerializer(),
  220. deserializer: ProtobufDeserializer(),
  221. path: path,
  222. callOptions: callOptions
  223. )
  224. }
  225. /// A client streaming call using `GRPCPayload` messages.
  226. public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  227. path: String,
  228. callOptions: CallOptions
  229. ) -> ClientStreamingCall<Request, Response> {
  230. return self.makeClientStreamingCall(
  231. serializer: GRPCPayloadSerializer(),
  232. deserializer: GRPCPayloadDeserializer(),
  233. path: path,
  234. callOptions: callOptions
  235. )
  236. }
  237. }
  238. // MARK: - Server Streaming
  239. extension ClientConnection {
  240. private func makeServerStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  241. serializer: Serializer,
  242. deserializer: Deserializer,
  243. path: String,
  244. request: Serializer.Input,
  245. callOptions: CallOptions,
  246. handler: @escaping (Deserializer.Output) -> Void
  247. ) -> ServerStreamingCall<Serializer.Input, Deserializer.Output> {
  248. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  249. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  250. let call = ServerStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  251. multiplexer: multiplexer,
  252. serializer: serializer,
  253. deserializer: deserializer,
  254. callOptions: callOptions,
  255. errorDelegate: self.configuration.errorDelegate,
  256. logger: logger,
  257. responseHandler: handler
  258. )
  259. call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
  260. return call
  261. }
  262. /// A server streaming call using `SwiftProtobuf.Message` messages.
  263. public func makeServerStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  264. path: String,
  265. request: Request,
  266. callOptions: CallOptions,
  267. handler: @escaping (Response) -> Void
  268. ) -> ServerStreamingCall<Request, Response> {
  269. return self.makeServerStreamingCall(
  270. serializer: ProtobufSerializer(),
  271. deserializer: ProtobufDeserializer(),
  272. path: path,
  273. request: request,
  274. callOptions: callOptions,
  275. handler: handler
  276. )
  277. }
  278. /// A server streaming call using `GRPCPayload` messages.
  279. public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  280. path: String,
  281. request: Request,
  282. callOptions: CallOptions,
  283. handler: @escaping (Response) -> Void
  284. ) -> ServerStreamingCall<Request, Response> {
  285. return self.makeServerStreamingCall(
  286. serializer: GRPCPayloadSerializer(),
  287. deserializer: GRPCPayloadDeserializer(),
  288. path: path,
  289. request: request,
  290. callOptions: callOptions,
  291. handler: handler
  292. )
  293. }
  294. }
  295. // MARK: - Bidirectional Streaming
  296. extension ClientConnection {
  297. private func makeBidirectionalStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  298. serializer: Serializer,
  299. deserializer: Deserializer,
  300. path: String,
  301. callOptions: CallOptions,
  302. handler: @escaping (Deserializer.Output) -> Void
  303. ) -> BidirectionalStreamingCall<Serializer.Input, Deserializer.Output> {
  304. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  305. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  306. let call = BidirectionalStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  307. multiplexer: multiplexer,
  308. serializer: serializer,
  309. deserializer: deserializer,
  310. callOptions: callOptions,
  311. errorDelegate: self.configuration.errorDelegate,
  312. logger: logger,
  313. responseHandler: handler
  314. )
  315. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  316. return call
  317. }
  318. /// A bidirectional streaming call using `SwiftProtobuf.Message` messages.
  319. public func makeBidirectionalStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  320. path: String,
  321. callOptions: CallOptions,
  322. handler: @escaping (Response) -> Void
  323. ) -> BidirectionalStreamingCall<Request, Response> {
  324. return self.makeBidirectionalStreamingCall(
  325. serializer: ProtobufSerializer(),
  326. deserializer: ProtobufDeserializer(),
  327. path: path,
  328. callOptions: callOptions,
  329. handler: handler
  330. )
  331. }
  332. /// A bidirectional streaming call using `GRPCPayload` messages.
  333. public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  334. path: String,
  335. callOptions: CallOptions,
  336. handler: @escaping (Response) -> Void
  337. ) -> BidirectionalStreamingCall<Request, Response> {
  338. return self.makeBidirectionalStreamingCall(
  339. serializer: GRPCPayloadSerializer(),
  340. deserializer: GRPCPayloadDeserializer(),
  341. path: path,
  342. callOptions: callOptions,
  343. handler: handler
  344. )
  345. }
  346. }
  347. // MARK: - Configuration structures
  348. /// A target to connect to.
  349. public struct ConnectionTarget {
  350. internal enum Wrapped {
  351. case hostAndPort(String, Int)
  352. case unixDomainSocket(String)
  353. case socketAddress(SocketAddress)
  354. }
  355. internal var wrapped: Wrapped
  356. private init(_ wrapped: Wrapped) {
  357. self.wrapped = wrapped
  358. }
  359. /// The host and port.
  360. public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget {
  361. return ConnectionTarget(.hostAndPort(host, port))
  362. }
  363. /// The path of a Unix domain socket.
  364. public static func unixDomainSocket(_ path: String) -> ConnectionTarget {
  365. return ConnectionTarget(.unixDomainSocket(path))
  366. }
  367. /// A NIO socket address.
  368. public static func socketAddress(_ address: SocketAddress) -> ConnectionTarget {
  369. return ConnectionTarget(.socketAddress(address))
  370. }
  371. var host: String {
  372. switch self.wrapped {
  373. case .hostAndPort(let host, _):
  374. return host
  375. case .socketAddress(.v4(let address)):
  376. return address.host
  377. case .socketAddress(.v6(let address)):
  378. return address.host
  379. case .unixDomainSocket, .socketAddress(.unixDomainSocket):
  380. return "localhost"
  381. }
  382. }
  383. }
  384. /// The connectivity behavior to use when starting an RPC.
  385. public struct CallStartBehavior: Hashable {
  386. internal enum Behavior: Hashable {
  387. case waitsForConnectivity
  388. case fastFailure
  389. }
  390. internal var wrapped: Behavior
  391. private init(_ wrapped: Behavior) {
  392. self.wrapped = wrapped
  393. }
  394. /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start
  395. /// an RPC. Doing so may involve multiple connection attempts.
  396. ///
  397. /// This is the preferred, and default, behaviour.
  398. public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity)
  399. /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed
  400. /// quickly rather than waiting for an active connection. The behaviour depends on the current
  401. /// connectivity state:
  402. ///
  403. /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails.
  404. /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt
  405. /// fails.
  406. /// - Ready: a connection is already active: the RPC will be started using that connection.
  407. /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to
  408. /// connect again. The RPC will fail immediately.
  409. /// - Shutdown: the connection is shutdown, the RPC will fail immediately.
  410. public static let fastFailure = CallStartBehavior(.fastFailure)
  411. }
  412. extension ClientConnection {
  413. /// The configuration for a connection.
  414. public struct Configuration {
  415. /// The target to connect to.
  416. public var target: ConnectionTarget
  417. /// The event loop group to run the connection on.
  418. public var eventLoopGroup: EventLoopGroup
  419. /// An error delegate which is called when errors are caught. Provided delegates **must not
  420. /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain
  421. /// cycle.
  422. public var errorDelegate: ClientErrorDelegate?
  423. /// A delegate which is called when the connectivity state is changed.
  424. public var connectivityStateDelegate: ConnectivityStateDelegate?
  425. /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is
  426. /// provided but the queue is `nil` then one will be created by gRPC.
  427. public var connectivityStateDelegateQueue: DispatchQueue?
  428. /// TLS configuration for this connection. `nil` if TLS is not desired.
  429. public var tls: TLS?
  430. /// The connection backoff configuration. If no connection retrying is required then this should
  431. /// be `nil`.
  432. public var connectionBackoff: ConnectionBackoff?
  433. /// The connection keepalive configuration.
  434. public var connectionKeepalive: ClientConnectionKeepalive
  435. /// The amount of time to wait before closing the connection. The idle timeout will start only
  436. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  437. ///
  438. /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
  439. public var connectionIdleTimeout: TimeAmount
  440. /// The behavior used to determine when an RPC should start. That is, whether it should wait for
  441. /// an active connection or fail quickly if no connection is currently available.
  442. public var callStartBehavior: CallStartBehavior
  443. /// The HTTP/2 flow control target window size.
  444. public var httpTargetWindowSize: Int
  445. /// The HTTP protocol used for this connection.
  446. public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
  447. return self.tls == nil ? .http : .https
  448. }
  449. /// A logger for background information (such as connectivity state). A separate logger for
  450. /// requests may be provided in the `CallOptions`.
  451. ///
  452. /// Defaults to a no-op logger.
  453. public var backgroundActivityLogger: Logger
  454. /// Create a `Configuration` with some pre-defined defaults. Prefer using
  455. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  456. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  457. ///
  458. /// - Parameter target: The target to connect to.
  459. /// - Parameter eventLoopGroup: The event loop group to run the connection on.
  460. /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
  461. /// on debug builds.
  462. /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
  463. /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the
  464. /// `connectivityStateDelegate`.
  465. /// - Parameter tls: TLS configuration, defaulting to `nil`.
  466. /// - Parameter connectionBackoff: The connection backoff configuration to use.
  467. /// - Parameter connectionKeepalive: The keepalive configuration to use.
  468. /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 5 minutes.
  469. /// - Parameter callStartBehavior: The behavior used to determine when a call should start in
  470. /// relation to its underlying connection. Defaults to `waitsForConnectivity`.
  471. /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size.
  472. /// - Parameter logger: A logger for background information (such as connectivity state).
  473. /// Defaults to a no-op logger.
  474. public init(
  475. target: ConnectionTarget,
  476. eventLoopGroup: EventLoopGroup,
  477. errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
  478. connectivityStateDelegate: ConnectivityStateDelegate? = nil,
  479. connectivityStateDelegateQueue: DispatchQueue? = nil,
  480. tls: Configuration.TLS? = nil,
  481. connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
  482. connectionKeepalive: ClientConnectionKeepalive = ClientConnectionKeepalive(),
  483. connectionIdleTimeout: TimeAmount = .minutes(5),
  484. callStartBehavior: CallStartBehavior = .waitsForConnectivity,
  485. httpTargetWindowSize: Int = 65535,
  486. backgroundActivityLogger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() })
  487. ) {
  488. self.target = target
  489. self.eventLoopGroup = eventLoopGroup
  490. self.errorDelegate = errorDelegate
  491. self.connectivityStateDelegate = connectivityStateDelegate
  492. self.connectivityStateDelegateQueue = connectivityStateDelegateQueue
  493. self.tls = tls
  494. self.connectionBackoff = connectionBackoff
  495. self.connectionKeepalive = connectionKeepalive
  496. self.connectionIdleTimeout = connectionIdleTimeout
  497. self.callStartBehavior = callStartBehavior
  498. self.httpTargetWindowSize = httpTargetWindowSize
  499. self.backgroundActivityLogger = backgroundActivityLogger
  500. }
  501. }
  502. }
  503. // MARK: - Configuration helpers/extensions
  504. extension ClientBootstrapProtocol {
  505. /// Connect to the given connection target.
  506. ///
  507. /// - Parameter target: The target to connect to.
  508. func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> {
  509. switch target.wrapped {
  510. case .hostAndPort(let host, let port):
  511. return self.connect(host: host, port: port)
  512. case .unixDomainSocket(let path):
  513. return self.connect(unixDomainSocketPath: path)
  514. case .socketAddress(let address):
  515. return self.connect(to: address)
  516. }
  517. }
  518. }
  519. extension Channel {
  520. /// Configure the channel with TLS.
  521. ///
  522. /// This function adds two handlers to the pipeline: the `NIOSSLClientHandler` to handle TLS, and
  523. /// the `TLSVerificationHandler` which verifies that a successful handshake was completed.
  524. ///
  525. /// - Parameter configuration: The configuration to configure the channel with.
  526. /// - Parameter serverHostname: The server hostname to use if the hostname should be verified.
  527. /// - Parameter errorDelegate: The error delegate to use for the TLS verification handler.
  528. func configureTLS(
  529. _ configuration: TLSConfiguration,
  530. serverHostname: String?,
  531. errorDelegate: ClientErrorDelegate?,
  532. logger: Logger
  533. ) -> EventLoopFuture<Void> {
  534. do {
  535. let sslClientHandler = try NIOSSLClientHandler(
  536. context: try NIOSSLContext(configuration: configuration),
  537. serverHostname: serverHostname
  538. )
  539. return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler(logger: logger))
  540. } catch {
  541. return self.eventLoop.makeFailedFuture(error)
  542. }
  543. }
  544. func configureGRPCClient(
  545. httpTargetWindowSize: Int,
  546. tlsConfiguration: TLSConfiguration?,
  547. tlsServerHostname: String?,
  548. connectionManager: ConnectionManager,
  549. connectionKeepalive: ClientConnectionKeepalive,
  550. connectionIdleTimeout: TimeAmount,
  551. errorDelegate: ClientErrorDelegate?,
  552. logger: Logger
  553. ) -> EventLoopFuture<Void> {
  554. let tlsConfigured = tlsConfiguration.map {
  555. self.configureTLS($0, serverHostname: tlsServerHostname, errorDelegate: errorDelegate, logger: logger)
  556. }
  557. return (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
  558. self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize)
  559. }.flatMap { _ in
  560. return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
  561. self.pipeline.addHandlers([
  562. GRPCClientKeepaliveHandler(configuration: connectionKeepalive),
  563. GRPCIdleHandler(mode: .client(connectionManager), idleTimeout: connectionIdleTimeout)],
  564. position: .after(http2Handler)
  565. )
  566. }.flatMap {
  567. let errorHandler = DelegatingErrorHandler(
  568. logger: logger,
  569. delegate: errorDelegate
  570. )
  571. return self.pipeline.addHandler(errorHandler)
  572. }
  573. }
  574. }
  575. func configureGRPCClient(
  576. errorDelegate: ClientErrorDelegate?,
  577. logger: Logger
  578. ) -> EventLoopFuture<Void> {
  579. return self.configureHTTP2Pipeline(mode: .client).flatMap { _ in
  580. self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  581. }
  582. }
  583. }
  584. extension TimeAmount {
  585. /// Creates a new `TimeAmount` from the given time interval in seconds.
  586. ///
  587. /// - Parameter timeInterval: The amount of time in seconds
  588. static func seconds(timeInterval: TimeInterval) -> TimeAmount {
  589. return .nanoseconds(Int64(timeInterval * 1_000_000_000))
  590. }
  591. }
  592. extension String {
  593. var isIPAddress: Bool {
  594. // We need some scratch space to let inet_pton write into.
  595. var ipv4Addr = in_addr()
  596. var ipv6Addr = in6_addr()
  597. return self.withCString { ptr in
  598. return inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
  599. inet_pton(AF_INET6, ptr, &ipv6Addr) == 1
  600. }
  601. }
  602. }