ClientConnection.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOTransportServices
  19. import NIOHTTP2
  20. import NIOSSL
  21. import NIOTLS
  22. import Logging
  23. import SwiftProtobuf
  24. /// Provides a single, managed connection to a server.
  25. ///
  26. /// The connection to the server is provided by a single channel which will attempt to reconnect
  27. /// to the server if the connection is dropped. This connection is guaranteed to always use the same
  28. /// event loop.
  29. ///
  30. /// The connection is initially setup with a handler to verify that TLS was established
  31. /// successfully (assuming TLS is being used).
  32. ///
  33. /// ┌──────────────────────────┐
  34. /// │ DelegatingErrorHandler │
  35. /// └──────────▲───────────────┘
  36. /// HTTP2Frame│
  37. /// ┌──────────┴───────────────┐
  38. /// │ SettingsObservingHandler │
  39. /// └──────────▲───────────────┘
  40. /// HTTP2Frame│
  41. /// │ ⠇ ⠇ ⠇ ⠇
  42. /// │ ┌┴─▼┐ ┌┴─▼┐
  43. /// │ │ | │ | HTTP/2 streams
  44. /// │ └▲─┬┘ └▲─┬┘
  45. /// │ │ │ │ │ HTTP2Frame
  46. /// ┌─┴────────────────┴─▼───┴─▼┐
  47. /// │ HTTP2StreamMultiplexer |
  48. /// └─▲───────────────────────┬─┘
  49. /// HTTP2Frame│ │HTTP2Frame
  50. /// ┌─┴───────────────────────▼─┐
  51. /// │ NIOHTTP2Handler │
  52. /// └─▲───────────────────────┬─┘
  53. /// ByteBuffer│ │ByteBuffer
  54. /// ┌─┴───────────────────────▼─┐
  55. /// │ TLSVerificationHandler │
  56. /// └─▲───────────────────────┬─┘
  57. /// ByteBuffer│ │ByteBuffer
  58. /// ┌─┴───────────────────────▼─┐
  59. /// │ NIOSSLHandler │
  60. /// └─▲───────────────────────┬─┘
  61. /// ByteBuffer│ │ByteBuffer
  62. /// │ ▼
  63. ///
  64. /// The `TLSVerificationHandler` observes the outcome of the SSL handshake and determines
  65. /// whether a `ClientConnection` should be returned to the user. In either eventuality, the
  66. /// handler removes itself from the pipeline once TLS has been verified. There is also a handler
  67. /// after the multiplexer for observing the initial settings frame, after which it determines that
  68. /// the connection state is `.ready` and removes itself from the channel. Finally there is a
  69. /// delegated error handler which uses the error delegate associated with this connection
  70. /// (see `DelegatingErrorHandler`).
  71. ///
  72. /// See `BaseClientCall` for a description of the pipelines associated with each HTTP/2 stream.
  73. public class ClientConnection {
  74. private let connectionManager: ConnectionManager
  75. private func getChannel() -> EventLoopFuture<Channel> {
  76. switch self.configuration.callStartBehavior.wrapped {
  77. case .waitsForConnectivity:
  78. return self.connectionManager.getChannel()
  79. case .fastFailure:
  80. return self.connectionManager.getOptimisticChannel()
  81. }
  82. }
  83. /// HTTP multiplexer from the `channel` handling gRPC calls.
  84. internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> {
  85. return self.getChannel().flatMap {
  86. $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  87. }
  88. }
  89. /// The configuration for this client.
  90. internal let configuration: Configuration
  91. internal let scheme: String
  92. internal let authority: String
  93. /// A monitor for the connectivity state.
  94. public var connectivity: ConnectivityStateMonitor {
  95. return self.connectionManager.monitor
  96. }
  97. /// The `EventLoop` this connection is using.
  98. public var eventLoop: EventLoop {
  99. return self.connectionManager.eventLoop
  100. }
  101. /// Creates a new connection from the given configuration. Prefer using
  102. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  103. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  104. ///
  105. /// - Important: Users should prefer using `ClientConnection.secure(group:)` to build a connection
  106. /// with TLS, or `ClientConnection.insecure(group:)` to build a connection without TLS.
  107. public init(configuration: Configuration) {
  108. self.configuration = configuration
  109. self.scheme = configuration.tls == nil ? "http" : "https"
  110. self.authority = configuration.target.host
  111. self.connectionManager = ConnectionManager(
  112. configuration: configuration,
  113. logger: configuration.backgroundActivityLogger
  114. )
  115. }
  116. /// Closes the connection to the server.
  117. public func close() -> EventLoopFuture<Void> {
  118. return self.connectionManager.shutdown()
  119. }
  120. /// Extracts a logger and request ID from the call options and returns them. The logger will
  121. /// be populated with the request ID (if applicable) and any metadata from the connection manager.
  122. private func populatedLoggerAndRequestID(from callOptions: CallOptions) -> (Logger, String?) {
  123. var logger = callOptions.logger
  124. self.connectionManager.appendMetadata(to: &logger)
  125. // Attach the request ID.
  126. let requestID = callOptions.requestIDProvider.requestID()
  127. if let requestID = requestID {
  128. logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
  129. }
  130. return (logger, requestID)
  131. }
  132. private func makeRequestHead(path: String, options: CallOptions, requestID: String?) -> _GRPCRequestHead {
  133. return _GRPCRequestHead(
  134. scheme: self.scheme,
  135. path: path,
  136. host: self.authority,
  137. options: options,
  138. requestID: requestID
  139. )
  140. }
  141. }
  142. // MARK: - Unary
  143. extension ClientConnection: GRPCChannel {
  144. private func makeUnaryCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  145. serializer: Serializer,
  146. deserializer: Deserializer,
  147. path: String,
  148. request: Serializer.Input,
  149. callOptions: CallOptions
  150. ) -> UnaryCall<Serializer.Input, Deserializer.Output> {
  151. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  152. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  153. let call = UnaryCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  154. multiplexer: self.multiplexer,
  155. serializer: serializer,
  156. deserializer: deserializer,
  157. callOptions: callOptions,
  158. errorDelegate: self.configuration.errorDelegate,
  159. logger: logger
  160. )
  161. call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
  162. return call
  163. }
  164. /// A unary call using `SwiftProtobuf.Message` messages.
  165. public func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  166. path: String,
  167. request: Request,
  168. callOptions: CallOptions
  169. ) -> UnaryCall<Request, Response> {
  170. return self.makeUnaryCall(
  171. serializer: ProtobufSerializer(),
  172. deserializer: ProtobufDeserializer(),
  173. path: path,
  174. request: request,
  175. callOptions: callOptions
  176. )
  177. }
  178. /// A unary call using `GRPCPayload` messages.
  179. public func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  180. path: String,
  181. request: Request,
  182. callOptions: CallOptions
  183. ) -> UnaryCall<Request, Response> {
  184. return self.makeUnaryCall(
  185. serializer: GRPCPayloadSerializer(),
  186. deserializer: GRPCPayloadDeserializer(),
  187. path: path,
  188. request: request,
  189. callOptions: callOptions
  190. )
  191. }
  192. }
  193. // MARK: - Client Streaming
  194. extension ClientConnection {
  195. private func makeClientStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  196. serializer: Serializer,
  197. deserializer: Deserializer,
  198. path: String,
  199. callOptions: CallOptions
  200. ) -> ClientStreamingCall<Serializer.Input, Deserializer.Output> {
  201. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  202. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  203. let call = ClientStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  204. multiplexer: self.multiplexer,
  205. serializer: serializer,
  206. deserializer: deserializer,
  207. callOptions: callOptions,
  208. errorDelegate: self.configuration.errorDelegate,
  209. logger: logger
  210. )
  211. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  212. return call
  213. }
  214. /// A client streaming call using `SwiftProtobuf.Message` messages.
  215. public func makeClientStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  216. path: String,
  217. callOptions: CallOptions
  218. ) -> ClientStreamingCall<Request, Response> {
  219. return self.makeClientStreamingCall(
  220. serializer: ProtobufSerializer(),
  221. deserializer: ProtobufDeserializer(),
  222. path: path,
  223. callOptions: callOptions
  224. )
  225. }
  226. /// A client streaming call using `GRPCPayload` messages.
  227. public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  228. path: String,
  229. callOptions: CallOptions
  230. ) -> ClientStreamingCall<Request, Response> {
  231. return self.makeClientStreamingCall(
  232. serializer: GRPCPayloadSerializer(),
  233. deserializer: GRPCPayloadDeserializer(),
  234. path: path,
  235. callOptions: callOptions
  236. )
  237. }
  238. }
  239. // MARK: - Server Streaming
  240. extension ClientConnection {
  241. private func makeServerStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  242. serializer: Serializer,
  243. deserializer: Deserializer,
  244. path: String,
  245. request: Serializer.Input,
  246. callOptions: CallOptions,
  247. handler: @escaping (Deserializer.Output) -> Void
  248. ) -> ServerStreamingCall<Serializer.Input, Deserializer.Output> {
  249. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  250. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  251. let call = ServerStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  252. multiplexer: multiplexer,
  253. serializer: serializer,
  254. deserializer: deserializer,
  255. callOptions: callOptions,
  256. errorDelegate: self.configuration.errorDelegate,
  257. logger: logger,
  258. responseHandler: handler
  259. )
  260. call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
  261. return call
  262. }
  263. /// A server streaming call using `SwiftProtobuf.Message` messages.
  264. public func makeServerStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  265. path: String,
  266. request: Request,
  267. callOptions: CallOptions,
  268. handler: @escaping (Response) -> Void
  269. ) -> ServerStreamingCall<Request, Response> {
  270. return self.makeServerStreamingCall(
  271. serializer: ProtobufSerializer(),
  272. deserializer: ProtobufDeserializer(),
  273. path: path,
  274. request: request,
  275. callOptions: callOptions,
  276. handler: handler
  277. )
  278. }
  279. /// A server streaming call using `GRPCPayload` messages.
  280. public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  281. path: String,
  282. request: Request,
  283. callOptions: CallOptions,
  284. handler: @escaping (Response) -> Void
  285. ) -> ServerStreamingCall<Request, Response> {
  286. return self.makeServerStreamingCall(
  287. serializer: GRPCPayloadSerializer(),
  288. deserializer: GRPCPayloadDeserializer(),
  289. path: path,
  290. request: request,
  291. callOptions: callOptions,
  292. handler: handler
  293. )
  294. }
  295. }
  296. // MARK: - Bidirectional Streaming
  297. extension ClientConnection {
  298. private func makeBidirectionalStreamingCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  299. serializer: Serializer,
  300. deserializer: Deserializer,
  301. path: String,
  302. callOptions: CallOptions,
  303. handler: @escaping (Deserializer.Output) -> Void
  304. ) -> BidirectionalStreamingCall<Serializer.Input, Deserializer.Output> {
  305. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  306. logger.debug("starting rpc", metadata: ["path": "\(path)"])
  307. let call = BidirectionalStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  308. multiplexer: multiplexer,
  309. serializer: serializer,
  310. deserializer: deserializer,
  311. callOptions: callOptions,
  312. errorDelegate: self.configuration.errorDelegate,
  313. logger: logger,
  314. responseHandler: handler
  315. )
  316. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  317. return call
  318. }
  319. /// A bidirectional streaming call using `SwiftProtobuf.Message` messages.
  320. public func makeBidirectionalStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  321. path: String,
  322. callOptions: CallOptions,
  323. handler: @escaping (Response) -> Void
  324. ) -> BidirectionalStreamingCall<Request, Response> {
  325. return self.makeBidirectionalStreamingCall(
  326. serializer: ProtobufSerializer(),
  327. deserializer: ProtobufDeserializer(),
  328. path: path,
  329. callOptions: callOptions,
  330. handler: handler
  331. )
  332. }
  333. /// A bidirectional streaming call using `GRPCPayload` messages.
  334. public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  335. path: String,
  336. callOptions: CallOptions,
  337. handler: @escaping (Response) -> Void
  338. ) -> BidirectionalStreamingCall<Request, Response> {
  339. return self.makeBidirectionalStreamingCall(
  340. serializer: GRPCPayloadSerializer(),
  341. deserializer: GRPCPayloadDeserializer(),
  342. path: path,
  343. callOptions: callOptions,
  344. handler: handler
  345. )
  346. }
  347. }
  348. // MARK: - Configuration structures
  349. /// A target to connect to.
  350. public struct ConnectionTarget {
  351. internal enum Wrapped {
  352. case hostAndPort(String, Int)
  353. case unixDomainSocket(String)
  354. case socketAddress(SocketAddress)
  355. }
  356. internal var wrapped: Wrapped
  357. private init(_ wrapped: Wrapped) {
  358. self.wrapped = wrapped
  359. }
  360. /// The host and port.
  361. public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget {
  362. return ConnectionTarget(.hostAndPort(host, port))
  363. }
  364. /// The path of a Unix domain socket.
  365. public static func unixDomainSocket(_ path: String) -> ConnectionTarget {
  366. return ConnectionTarget(.unixDomainSocket(path))
  367. }
  368. /// A NIO socket address.
  369. public static func socketAddress(_ address: SocketAddress) -> ConnectionTarget {
  370. return ConnectionTarget(.socketAddress(address))
  371. }
  372. var host: String {
  373. switch self.wrapped {
  374. case .hostAndPort(let host, _):
  375. return host
  376. case .socketAddress(.v4(let address)):
  377. return address.host
  378. case .socketAddress(.v6(let address)):
  379. return address.host
  380. case .unixDomainSocket, .socketAddress(.unixDomainSocket):
  381. return "localhost"
  382. }
  383. }
  384. }
  385. /// The connectivity behavior to use when starting an RPC.
  386. public struct CallStartBehavior: Hashable {
  387. internal enum Behavior: Hashable {
  388. case waitsForConnectivity
  389. case fastFailure
  390. }
  391. internal var wrapped: Behavior
  392. private init(_ wrapped: Behavior) {
  393. self.wrapped = wrapped
  394. }
  395. /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start
  396. /// an RPC. Doing so may involve multiple connection attempts.
  397. ///
  398. /// This is the preferred, and default, behaviour.
  399. public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity)
  400. /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed
  401. /// quickly rather than waiting for an active connection. The behaviour depends on the current
  402. /// connectivity state:
  403. ///
  404. /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails.
  405. /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt
  406. /// fails.
  407. /// - Ready: a connection is already active: the RPC will be started using that connection.
  408. /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to
  409. /// connect again. The RPC will fail immediately.
  410. /// - Shutdown: the connection is shutdown, the RPC will fail immediately.
  411. public static let fastFailure = CallStartBehavior(.fastFailure)
  412. }
  413. extension ClientConnection {
  414. /// The configuration for a connection.
  415. public struct Configuration {
  416. /// The target to connect to.
  417. public var target: ConnectionTarget
  418. /// The event loop group to run the connection on.
  419. public var eventLoopGroup: EventLoopGroup
  420. /// An error delegate which is called when errors are caught. Provided delegates **must not
  421. /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain
  422. /// cycle.
  423. public var errorDelegate: ClientErrorDelegate?
  424. /// A delegate which is called when the connectivity state is changed.
  425. public var connectivityStateDelegate: ConnectivityStateDelegate?
  426. /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is
  427. /// provided but the queue is `nil` then one will be created by gRPC.
  428. public var connectivityStateDelegateQueue: DispatchQueue?
  429. /// TLS configuration for this connection. `nil` if TLS is not desired.
  430. public var tls: TLS?
  431. /// The connection backoff configuration. If no connection retrying is required then this should
  432. /// be `nil`.
  433. public var connectionBackoff: ConnectionBackoff?
  434. /// The connection keepalive configuration.
  435. public var connectionKeepalive: ClientConnectionKeepalive
  436. /// The amount of time to wait before closing the connection. The idle timeout will start only
  437. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  438. ///
  439. /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
  440. public var connectionIdleTimeout: TimeAmount
  441. /// The behavior used to determine when an RPC should start. That is, whether it should wait for
  442. /// an active connection or fail quickly if no connection is currently available.
  443. public var callStartBehavior: CallStartBehavior
  444. /// The HTTP/2 flow control target window size.
  445. public var httpTargetWindowSize: Int
  446. /// The HTTP protocol used for this connection.
  447. public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
  448. return self.tls == nil ? .http : .https
  449. }
  450. /// A logger for background information (such as connectivity state). A separate logger for
  451. /// requests may be provided in the `CallOptions`.
  452. ///
  453. /// Defaults to a no-op logger.
  454. public var backgroundActivityLogger: Logger
  455. /// A channel initializer which will be run after gRPC has initialized each channel. This may be
  456. /// used to add additional handlers to the pipeline and is intended for debugging.
  457. ///
  458. /// - Warning: The initializer closure may be invoked *multiple times*.
  459. public var debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
  460. /// Create a `Configuration` with some pre-defined defaults. Prefer using
  461. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  462. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  463. ///
  464. /// - Parameter target: The target to connect to.
  465. /// - Parameter eventLoopGroup: The event loop group to run the connection on.
  466. /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
  467. /// on debug builds.
  468. /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
  469. /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the
  470. /// `connectivityStateDelegate`.
  471. /// - Parameter tls: TLS configuration, defaulting to `nil`.
  472. /// - Parameter connectionBackoff: The connection backoff configuration to use.
  473. /// - Parameter connectionKeepalive: The keepalive configuration to use.
  474. /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 5 minutes.
  475. /// - Parameter callStartBehavior: The behavior used to determine when a call should start in
  476. /// relation to its underlying connection. Defaults to `waitsForConnectivity`.
  477. /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size.
  478. /// - Parameter backgroundActivityLogger: A logger for background information (such as
  479. /// connectivity state). Defaults to a no-op logger.
  480. /// - Parameter debugChannelInitializer: A channel initializer will be called after gRPC has
  481. /// initialized the channel. Defaults to `nil`.
  482. public init(
  483. target: ConnectionTarget,
  484. eventLoopGroup: EventLoopGroup,
  485. errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
  486. connectivityStateDelegate: ConnectivityStateDelegate? = nil,
  487. connectivityStateDelegateQueue: DispatchQueue? = nil,
  488. tls: Configuration.TLS? = nil,
  489. connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
  490. connectionKeepalive: ClientConnectionKeepalive = ClientConnectionKeepalive(),
  491. connectionIdleTimeout: TimeAmount = .minutes(5),
  492. callStartBehavior: CallStartBehavior = .waitsForConnectivity,
  493. httpTargetWindowSize: Int = 65535,
  494. backgroundActivityLogger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
  495. debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)? = nil
  496. ) {
  497. self.target = target
  498. self.eventLoopGroup = eventLoopGroup
  499. self.errorDelegate = errorDelegate
  500. self.connectivityStateDelegate = connectivityStateDelegate
  501. self.connectivityStateDelegateQueue = connectivityStateDelegateQueue
  502. self.tls = tls
  503. self.connectionBackoff = connectionBackoff
  504. self.connectionKeepalive = connectionKeepalive
  505. self.connectionIdleTimeout = connectionIdleTimeout
  506. self.callStartBehavior = callStartBehavior
  507. self.httpTargetWindowSize = httpTargetWindowSize
  508. self.backgroundActivityLogger = backgroundActivityLogger
  509. self.debugChannelInitializer = debugChannelInitializer
  510. }
  511. }
  512. }
  513. // MARK: - Configuration helpers/extensions
  514. extension ClientBootstrapProtocol {
  515. /// Connect to the given connection target.
  516. ///
  517. /// - Parameter target: The target to connect to.
  518. func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> {
  519. switch target.wrapped {
  520. case .hostAndPort(let host, let port):
  521. return self.connect(host: host, port: port)
  522. case .unixDomainSocket(let path):
  523. return self.connect(unixDomainSocketPath: path)
  524. case .socketAddress(let address):
  525. return self.connect(to: address)
  526. }
  527. }
  528. }
  529. extension Channel {
  530. /// Configure the channel with TLS.
  531. ///
  532. /// This function adds two handlers to the pipeline: the `NIOSSLClientHandler` to handle TLS, and
  533. /// the `TLSVerificationHandler` which verifies that a successful handshake was completed.
  534. ///
  535. /// - Parameter configuration: The configuration to configure the channel with.
  536. /// - Parameter serverHostname: The server hostname to use if the hostname should be verified.
  537. /// - Parameter errorDelegate: The error delegate to use for the TLS verification handler.
  538. func configureTLS(
  539. _ configuration: TLSConfiguration,
  540. serverHostname: String?,
  541. errorDelegate: ClientErrorDelegate?,
  542. logger: Logger
  543. ) -> EventLoopFuture<Void> {
  544. do {
  545. let sslClientHandler = try NIOSSLClientHandler(
  546. context: try NIOSSLContext(configuration: configuration),
  547. serverHostname: serverHostname
  548. )
  549. return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler(logger: logger))
  550. } catch {
  551. return self.eventLoop.makeFailedFuture(error)
  552. }
  553. }
  554. func configureGRPCClient(
  555. httpTargetWindowSize: Int,
  556. tlsConfiguration: TLSConfiguration?,
  557. tlsServerHostname: String?,
  558. connectionManager: ConnectionManager,
  559. connectionKeepalive: ClientConnectionKeepalive,
  560. connectionIdleTimeout: TimeAmount,
  561. errorDelegate: ClientErrorDelegate?,
  562. requiresZeroLengthWriteWorkaround: Bool,
  563. logger: Logger
  564. ) -> EventLoopFuture<Void> {
  565. let tlsConfigured = tlsConfiguration.map {
  566. self.configureTLS($0, serverHostname: tlsServerHostname, errorDelegate: errorDelegate, logger: logger)
  567. }
  568. let configuration: EventLoopFuture<Void> = (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
  569. self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize)
  570. }.flatMap { _ in
  571. return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
  572. self.pipeline.addHandlers([
  573. GRPCClientKeepaliveHandler(configuration: connectionKeepalive),
  574. GRPCIdleHandler(mode: .client(connectionManager), idleTimeout: connectionIdleTimeout)],
  575. position: .after(http2Handler)
  576. )
  577. }.flatMap {
  578. let errorHandler = DelegatingErrorHandler(
  579. logger: logger,
  580. delegate: errorDelegate
  581. )
  582. return self.pipeline.addHandler(errorHandler)
  583. }
  584. }
  585. #if canImport(Network)
  586. // This availability guard is arguably unnecessary, but we add it anyway.
  587. if requiresZeroLengthWriteWorkaround, #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  588. return configuration.flatMap {
  589. self.pipeline.addHandler(NIOFilterEmptyWritesHandler(), position: .first)
  590. }
  591. } else {
  592. return configuration
  593. }
  594. #else
  595. return configuration
  596. #endif
  597. }
  598. func configureGRPCClient(
  599. errorDelegate: ClientErrorDelegate?,
  600. logger: Logger
  601. ) -> EventLoopFuture<Void> {
  602. return self.configureHTTP2Pipeline(mode: .client).flatMap { _ in
  603. self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  604. }
  605. }
  606. }
  607. extension TimeAmount {
  608. /// Creates a new `TimeAmount` from the given time interval in seconds.
  609. ///
  610. /// - Parameter timeInterval: The amount of time in seconds
  611. static func seconds(timeInterval: TimeInterval) -> TimeAmount {
  612. return .nanoseconds(Int64(timeInterval * 1_000_000_000))
  613. }
  614. }
  615. extension String {
  616. var isIPAddress: Bool {
  617. // We need some scratch space to let inet_pton write into.
  618. var ipv4Addr = in_addr()
  619. var ipv6Addr = in6_addr()
  620. return self.withCString { ptr in
  621. return inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
  622. inet_pton(AF_INET6, ptr, &ipv6Addr) == 1
  623. }
  624. }
  625. }