ClientConnection.swift 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP2
  19. import NIOSSL
  20. import NIOTLS
  21. import Logging
  22. import SwiftProtobuf
  23. /// Provides a single, managed connection to a server.
  24. ///
  25. /// The connection to the server is provided by a single channel which will attempt to reconnect
  26. /// to the server if the connection is dropped. This connection is guaranteed to always use the same
  27. /// event loop.
  28. ///
  29. /// The connection is initially setup with a handler to verify that TLS was established
  30. /// successfully (assuming TLS is being used).
  31. ///
  32. /// ┌──────────────────────────┐
  33. /// │ DelegatingErrorHandler │
  34. /// └──────────▲───────────────┘
  35. /// HTTP2Frame│
  36. /// ┌──────────┴───────────────┐
  37. /// │ SettingsObservingHandler │
  38. /// └──────────▲───────────────┘
  39. /// HTTP2Frame│
  40. /// │ ⠇ ⠇ ⠇ ⠇
  41. /// │ ┌┴─▼┐ ┌┴─▼┐
  42. /// │ │ | │ | HTTP/2 streams
  43. /// │ └▲─┬┘ └▲─┬┘
  44. /// │ │ │ │ │ HTTP2Frame
  45. /// ┌─┴────────────────┴─▼───┴─▼┐
  46. /// │ HTTP2StreamMultiplexer |
  47. /// └─▲───────────────────────┬─┘
  48. /// HTTP2Frame│ │HTTP2Frame
  49. /// ┌─┴───────────────────────▼─┐
  50. /// │ NIOHTTP2Handler │
  51. /// └─▲───────────────────────┬─┘
  52. /// ByteBuffer│ │ByteBuffer
  53. /// ┌─┴───────────────────────▼─┐
  54. /// │ TLSVerificationHandler │
  55. /// └─▲───────────────────────┬─┘
  56. /// ByteBuffer│ │ByteBuffer
  57. /// ┌─┴───────────────────────▼─┐
  58. /// │ NIOSSLHandler │
  59. /// └─▲───────────────────────┬─┘
  60. /// ByteBuffer│ │ByteBuffer
  61. /// │ ▼
  62. ///
  63. /// The `TLSVerificationHandler` observes the outcome of the SSL handshake and determines
  64. /// whether a `ClientConnection` should be returned to the user. In either eventuality, the
  65. /// handler removes itself from the pipeline once TLS has been verified. There is also a handler
  66. /// after the multiplexer for observing the initial settings frame, after which it determines that
  67. /// the connection state is `.ready` and removes itself from the channel. Finally there is a
  68. /// delegated error handler which uses the error delegate associated with this connection
  69. /// (see `DelegatingErrorHandler`).
  70. ///
  71. /// See `BaseClientCall` for a description of the pipelines associated with each HTTP/2 stream.
  72. public class ClientConnection {
  73. private let connectionManager: ConnectionManager
  74. private func getChannel() -> EventLoopFuture<Channel> {
  75. switch self.configuration.callStartBehavior.wrapped {
  76. case .waitsForConnectivity:
  77. return self.connectionManager.getChannel()
  78. case .fastFailure:
  79. return self.connectionManager.getOptimisticChannel()
  80. }
  81. }
  82. /// HTTP multiplexer from the `channel` handling gRPC calls.
  83. internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> {
  84. return self.getChannel().flatMap {
  85. $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  86. }
  87. }
  88. /// The configuration for this client.
  89. internal let configuration: Configuration
  90. internal let scheme: String
  91. internal let authority: String
  92. /// A monitor for the connectivity state.
  93. public var connectivity: ConnectivityStateMonitor {
  94. return self.connectionManager.monitor
  95. }
  96. /// The `EventLoop` this connection is using.
  97. public var eventLoop: EventLoop {
  98. return self.connectionManager.eventLoop
  99. }
  100. /// Creates a new connection from the given configuration. Prefer using
  101. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  102. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  103. ///
  104. /// - Important: Users should prefer using `ClientConnection.secure(group:)` to build a connection
  105. /// with TLS, or `ClientConnection.insecure(group:)` to build a connection without TLS.
  106. public init(configuration: Configuration) {
  107. self.configuration = configuration
  108. self.scheme = configuration.tls == nil ? "http" : "https"
  109. self.authority = configuration.target.host
  110. self.connectionManager = ConnectionManager(
  111. configuration: configuration,
  112. logger: Logger(subsystem: .clientChannel)
  113. )
  114. }
  115. /// Closes the connection to the server.
  116. public func close() -> EventLoopFuture<Void> {
  117. return self.connectionManager.shutdown()
  118. }
  119. private func loggerWithRequestID(_ requestID: String) -> Logger {
  120. var logger = self.connectionManager.logger
  121. logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
  122. return logger
  123. }
  124. private func makeRequestHead(path: String, options: CallOptions, requestID: String) -> _GRPCRequestHead {
  125. return _GRPCRequestHead(
  126. scheme: self.scheme,
  127. path: path,
  128. host: self.authority,
  129. requestID: requestID,
  130. options: options
  131. )
  132. }
  133. }
  134. // MARK: - Unary
  135. extension ClientConnection: GRPCChannel {
  136. private func makeUnaryCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  137. serializer: Serializer,
  138. deserializer: Deserializer,
  139. path: String,
  140. request: Serializer.Input,
  141. callOptions: CallOptions
  142. ) -> UnaryCall<Serializer.Input, Deserializer.Output> {
  143. let requestID = callOptions.requestIDProvider.requestID()
  144. let logger = self.loggerWithRequestID(requestID)
  145. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  146. let call = UnaryCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  147. multiplexer: self.multiplexer,
  148. serializer: serializer,
  149. deserializer: deserializer,
  150. callOptions: callOptions,
  151. errorDelegate: self.configuration.errorDelegate,
  152. logger: logger
  153. )
  154. call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
  155. return call
  156. }
  157. /// A unary call using `SwiftProtobuf.Message` messages.
  158. public func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  159. path: String,
  160. request: Request,
  161. callOptions: CallOptions
  162. ) -> UnaryCall<Request, Response> {
  163. return self.makeUnaryCall(
  164. serializer: ProtobufSerializer(),
  165. deserializer: ProtobufDeserializer(),
  166. path: path,
  167. request: request,
  168. callOptions: callOptions
  169. )
  170. }
  171. /// A unary call using `GRPCPayload` messages.
  172. public func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  173. path: String,
  174. request: Request,
  175. callOptions: CallOptions
  176. ) -> UnaryCall<Request, Response> {
  177. return self.makeUnaryCall(
  178. serializer: GRPCPayloadSerializer(),
  179. deserializer: GRPCPayloadDeserializer(),
  180. path: path,
  181. request: request,
  182. callOptions: callOptions
  183. )
  184. }
  185. }
  186. // MARK: - Client Streaming
  187. extension ClientConnection {
  188. private func makeClientStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  189. serializer: Serializer,
  190. deserializer: Deserializer,
  191. path: String,
  192. callOptions: CallOptions
  193. ) -> ClientStreamingCall<Serializer.Input, Deserializer.Output> {
  194. let requestID = callOptions.requestIDProvider.requestID()
  195. let logger = self.loggerWithRequestID(requestID)
  196. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  197. let call = ClientStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  198. multiplexer: self.multiplexer,
  199. serializer: serializer,
  200. deserializer: deserializer,
  201. callOptions: callOptions,
  202. errorDelegate: self.configuration.errorDelegate,
  203. logger: logger
  204. )
  205. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  206. return call
  207. }
  208. /// A client streaming call using `SwiftProtobuf.Message` messages.
  209. public func makeClientStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  210. path: String,
  211. callOptions: CallOptions
  212. ) -> ClientStreamingCall<Request, Response> {
  213. return self.makeClientStreamingCall(
  214. serializer: ProtobufSerializer(),
  215. deserializer: ProtobufDeserializer(),
  216. path: path,
  217. callOptions: callOptions
  218. )
  219. }
  220. /// A client streaming call using `GRPCPayload` messages.
  221. public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  222. path: String,
  223. callOptions: CallOptions
  224. ) -> ClientStreamingCall<Request, Response> {
  225. return self.makeClientStreamingCall(
  226. serializer: GRPCPayloadSerializer(),
  227. deserializer: GRPCPayloadDeserializer(),
  228. path: path,
  229. callOptions: callOptions
  230. )
  231. }
  232. }
  233. // MARK: - Server Streaming
  234. extension ClientConnection {
  235. private func makeServerStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  236. serializer: Serializer,
  237. deserializer: Deserializer,
  238. path: String,
  239. request: Serializer.Input,
  240. callOptions: CallOptions,
  241. handler: @escaping (Deserializer.Output) -> Void
  242. ) -> ServerStreamingCall<Serializer.Input, Deserializer.Output> {
  243. let requestID = callOptions.requestIDProvider.requestID()
  244. let logger = self.loggerWithRequestID(requestID)
  245. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  246. let call = ServerStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  247. multiplexer: multiplexer,
  248. serializer: serializer,
  249. deserializer: deserializer,
  250. callOptions: callOptions,
  251. errorDelegate: self.configuration.errorDelegate,
  252. logger: logger,
  253. responseHandler: handler
  254. )
  255. call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
  256. return call
  257. }
  258. /// A server streaming call using `SwiftProtobuf.Message` messages.
  259. public func makeServerStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  260. path: String,
  261. request: Request,
  262. callOptions: CallOptions,
  263. handler: @escaping (Response) -> Void
  264. ) -> ServerStreamingCall<Request, Response> {
  265. return self.makeServerStreamingCall(
  266. serializer: ProtobufSerializer(),
  267. deserializer: ProtobufDeserializer(),
  268. path: path,
  269. request: request,
  270. callOptions: callOptions,
  271. handler: handler
  272. )
  273. }
  274. /// A server streaming call using `GRPCPayload` messages.
  275. public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  276. path: String,
  277. request: Request,
  278. callOptions: CallOptions,
  279. handler: @escaping (Response) -> Void
  280. ) -> ServerStreamingCall<Request, Response> {
  281. return self.makeServerStreamingCall(
  282. serializer: GRPCPayloadSerializer(),
  283. deserializer: GRPCPayloadDeserializer(),
  284. path: path,
  285. request: request,
  286. callOptions: callOptions,
  287. handler: handler
  288. )
  289. }
  290. }
  291. // MARK: - Bidirectional Streaming
  292. extension ClientConnection {
  293. private func makeBidirectionalStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  294. serializer: Serializer,
  295. deserializer: Deserializer,
  296. path: String,
  297. callOptions: CallOptions,
  298. handler: @escaping (Deserializer.Output) -> Void
  299. ) -> BidirectionalStreamingCall<Serializer.Input, Deserializer.Output> {
  300. let requestID = callOptions.requestIDProvider.requestID()
  301. let logger = self.loggerWithRequestID(requestID)
  302. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  303. let call = BidirectionalStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  304. multiplexer: multiplexer,
  305. serializer: serializer,
  306. deserializer: deserializer,
  307. callOptions: callOptions,
  308. errorDelegate: self.configuration.errorDelegate,
  309. logger: logger,
  310. responseHandler: handler
  311. )
  312. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  313. return call
  314. }
  315. /// A bidirectional streaming call using `SwiftProtobuf.Message` messages.
  316. public func makeBidirectionalStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  317. path: String,
  318. callOptions: CallOptions,
  319. handler: @escaping (Response) -> Void
  320. ) -> BidirectionalStreamingCall<Request, Response> {
  321. return self.makeBidirectionalStreamingCall(
  322. serializer: ProtobufSerializer(),
  323. deserializer: ProtobufDeserializer(),
  324. path: path,
  325. callOptions: callOptions,
  326. handler: handler
  327. )
  328. }
  329. /// A bidirectional streaming call using `GRPCPayload` messages.
  330. public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  331. path: String,
  332. callOptions: CallOptions,
  333. handler: @escaping (Response) -> Void
  334. ) -> BidirectionalStreamingCall<Request, Response> {
  335. return self.makeBidirectionalStreamingCall(
  336. serializer: GRPCPayloadSerializer(),
  337. deserializer: GRPCPayloadDeserializer(),
  338. path: path,
  339. callOptions: callOptions,
  340. handler: handler
  341. )
  342. }
  343. }
  344. // MARK: - Configuration structures
  345. /// A target to connect to.
  346. public struct ConnectionTarget {
  347. internal enum Wrapped {
  348. case hostAndPort(String, Int)
  349. case unixDomainSocket(String)
  350. case socketAddress(SocketAddress)
  351. }
  352. internal var wrapped: Wrapped
  353. private init(_ wrapped: Wrapped) {
  354. self.wrapped = wrapped
  355. }
  356. /// The host and port.
  357. public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget {
  358. return ConnectionTarget(.hostAndPort(host, port))
  359. }
  360. /// The path of a Unix domain socket.
  361. public static func unixDomainSocket(_ path: String) -> ConnectionTarget {
  362. return ConnectionTarget(.unixDomainSocket(path))
  363. }
  364. /// A NIO socket address.
  365. public static func socketAddress(_ address: SocketAddress) -> ConnectionTarget {
  366. return ConnectionTarget(.socketAddress(address))
  367. }
  368. var host: String {
  369. switch self.wrapped {
  370. case .hostAndPort(let host, _):
  371. return host
  372. case .socketAddress(.v4(let address)):
  373. return address.host
  374. case .socketAddress(.v6(let address)):
  375. return address.host
  376. case .unixDomainSocket, .socketAddress(.unixDomainSocket):
  377. return "localhost"
  378. }
  379. }
  380. }
  381. /// The connectivity behavior to use when starting an RPC.
  382. public struct CallStartBehavior: Hashable {
  383. internal enum Behavior: Hashable {
  384. case waitsForConnectivity
  385. case fastFailure
  386. }
  387. internal var wrapped: Behavior
  388. private init(_ wrapped: Behavior) {
  389. self.wrapped = wrapped
  390. }
  391. /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start
  392. /// an RPC. Doing so may involve multiple connection attempts.
  393. ///
  394. /// This is the preferred, and default, behaviour.
  395. public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity)
  396. /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed
  397. /// quickly rather than waiting for an active connection. The behaviour depends on the current
  398. /// connectivity state:
  399. ///
  400. /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails.
  401. /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt
  402. /// fails.
  403. /// - Ready: a connection is already active: the RPC will be started using that connection.
  404. /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to
  405. /// connect again. The RPC will fail immediately.
  406. /// - Shutdown: the connection is shutdown, the RPC will fail immediately.
  407. public static let fastFailure = CallStartBehavior(.fastFailure)
  408. }
  409. extension ClientConnection {
  410. /// The configuration for a connection.
  411. public struct Configuration {
  412. /// The target to connect to.
  413. public var target: ConnectionTarget
  414. /// The event loop group to run the connection on.
  415. public var eventLoopGroup: EventLoopGroup
  416. /// An error delegate which is called when errors are caught. Provided delegates **must not
  417. /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain
  418. /// cycle.
  419. public var errorDelegate: ClientErrorDelegate?
  420. /// A delegate which is called when the connectivity state is changed.
  421. public var connectivityStateDelegate: ConnectivityStateDelegate?
  422. /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is
  423. /// provided but the queue is `nil` then one will be created by gRPC.
  424. public var connectivityStateDelegateQueue: DispatchQueue?
  425. /// TLS configuration for this connection. `nil` if TLS is not desired.
  426. public var tls: TLS?
  427. /// The connection backoff configuration. If no connection retrying is required then this should
  428. /// be `nil`.
  429. public var connectionBackoff: ConnectionBackoff?
  430. /// The connection keepalive configuration.
  431. public var connectionKeepalive: ClientConnectionKeepalive
  432. /// The amount of time to wait before closing the connection. The idle timeout will start only
  433. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  434. ///
  435. /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
  436. public var connectionIdleTimeout: TimeAmount
  437. /// The behavior used to determine when an RPC should start. That is, whether it should wait for
  438. /// an active connection or fail quickly if no connection is currently available.
  439. public var callStartBehavior: CallStartBehavior
  440. /// The HTTP/2 flow control target window size.
  441. public var httpTargetWindowSize: Int
  442. /// The HTTP protocol used for this connection.
  443. public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
  444. return self.tls == nil ? .http : .https
  445. }
  446. /// Create a `Configuration` with some pre-defined defaults. Prefer using
  447. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  448. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  449. ///
  450. /// - Parameter target: The target to connect to.
  451. /// - Parameter eventLoopGroup: The event loop group to run the connection on.
  452. /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
  453. /// on debug builds.
  454. /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
  455. /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the
  456. /// `connectivityStateDelegate`.
  457. /// - Parameter tls: TLS configuration, defaulting to `nil`.
  458. /// - Parameter connectionBackoff: The connection backoff configuration to use.
  459. /// - Parameter connectionKeepalive: The keepalive configuration to use.
  460. /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 5 minutes.
  461. /// - Parameter callStartBehavior: The behavior used to determine when a call should start in
  462. /// relation to its underlying connection. Defaults to `waitsForConnectivity`.
  463. /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size.
  464. public init(
  465. target: ConnectionTarget,
  466. eventLoopGroup: EventLoopGroup,
  467. errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
  468. connectivityStateDelegate: ConnectivityStateDelegate? = nil,
  469. connectivityStateDelegateQueue: DispatchQueue? = nil,
  470. tls: Configuration.TLS? = nil,
  471. connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
  472. connectionKeepalive: ClientConnectionKeepalive = ClientConnectionKeepalive(),
  473. connectionIdleTimeout: TimeAmount = .minutes(5),
  474. callStartBehavior: CallStartBehavior = .waitsForConnectivity,
  475. httpTargetWindowSize: Int = 65535
  476. ) {
  477. self.target = target
  478. self.eventLoopGroup = eventLoopGroup
  479. self.errorDelegate = errorDelegate
  480. self.connectivityStateDelegate = connectivityStateDelegate
  481. self.connectivityStateDelegateQueue = connectivityStateDelegateQueue
  482. self.tls = tls
  483. self.connectionBackoff = connectionBackoff
  484. self.connectionKeepalive = connectionKeepalive
  485. self.connectionIdleTimeout = connectionIdleTimeout
  486. self.callStartBehavior = callStartBehavior
  487. self.httpTargetWindowSize = httpTargetWindowSize
  488. }
  489. }
  490. }
  491. // MARK: - Configuration helpers/extensions
  492. extension ClientBootstrapProtocol {
  493. /// Connect to the given connection target.
  494. ///
  495. /// - Parameter target: The target to connect to.
  496. func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> {
  497. switch target.wrapped {
  498. case .hostAndPort(let host, let port):
  499. return self.connect(host: host, port: port)
  500. case .unixDomainSocket(let path):
  501. return self.connect(unixDomainSocketPath: path)
  502. case .socketAddress(let address):
  503. return self.connect(to: address)
  504. }
  505. }
  506. }
  507. extension Channel {
  508. /// Configure the channel with TLS.
  509. ///
  510. /// This function adds two handlers to the pipeline: the `NIOSSLClientHandler` to handle TLS, and
  511. /// the `TLSVerificationHandler` which verifies that a successful handshake was completed.
  512. ///
  513. /// - Parameter configuration: The configuration to configure the channel with.
  514. /// - Parameter serverHostname: The server hostname to use if the hostname should be verified.
  515. /// - Parameter errorDelegate: The error delegate to use for the TLS verification handler.
  516. func configureTLS(
  517. _ configuration: TLSConfiguration,
  518. serverHostname: String?,
  519. errorDelegate: ClientErrorDelegate?,
  520. logger: Logger
  521. ) -> EventLoopFuture<Void> {
  522. do {
  523. let sslClientHandler = try NIOSSLClientHandler(
  524. context: try NIOSSLContext(configuration: configuration),
  525. serverHostname: serverHostname
  526. )
  527. return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler(logger: logger))
  528. } catch {
  529. return self.eventLoop.makeFailedFuture(error)
  530. }
  531. }
  532. func configureGRPCClient(
  533. httpTargetWindowSize: Int,
  534. tlsConfiguration: TLSConfiguration?,
  535. tlsServerHostname: String?,
  536. connectionManager: ConnectionManager,
  537. connectionKeepalive: ClientConnectionKeepalive,
  538. connectionIdleTimeout: TimeAmount,
  539. errorDelegate: ClientErrorDelegate?,
  540. logger: Logger
  541. ) -> EventLoopFuture<Void> {
  542. let tlsConfigured = tlsConfiguration.map {
  543. self.configureTLS($0, serverHostname: tlsServerHostname, errorDelegate: errorDelegate, logger: logger)
  544. }
  545. return (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
  546. self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize)
  547. }.flatMap { _ in
  548. return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
  549. self.pipeline.addHandlers([
  550. GRPCClientKeepaliveHandler(configuration: connectionKeepalive),
  551. GRPCIdleHandler(mode: .client(connectionManager), idleTimeout: connectionIdleTimeout)],
  552. position: .after(http2Handler)
  553. )
  554. }.flatMap {
  555. let errorHandler = DelegatingErrorHandler(
  556. logger: logger,
  557. delegate: errorDelegate
  558. )
  559. return self.pipeline.addHandler(errorHandler)
  560. }
  561. }
  562. }
  563. func configureGRPCClient(
  564. errorDelegate: ClientErrorDelegate?,
  565. logger: Logger
  566. ) -> EventLoopFuture<Void> {
  567. return self.configureHTTP2Pipeline(mode: .client).flatMap { _ in
  568. self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  569. }
  570. }
  571. }
  572. extension TimeAmount {
  573. /// Creates a new `TimeAmount` from the given time interval in seconds.
  574. ///
  575. /// - Parameter timeInterval: The amount of time in seconds
  576. static func seconds(timeInterval: TimeInterval) -> TimeAmount {
  577. return .nanoseconds(Int64(timeInterval * 1_000_000_000))
  578. }
  579. }
  580. extension String {
  581. var isIPAddress: Bool {
  582. // We need some scratch space to let inet_pton write into.
  583. var ipv4Addr = in_addr()
  584. var ipv6Addr = in6_addr()
  585. return self.withCString { ptr in
  586. return inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
  587. inet_pton(AF_INET6, ptr, &ipv6Addr) == 1
  588. }
  589. }
  590. }