ClientConnection.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHTTP2
  20. import NIOSSL
  21. import NIOTLS
  22. import NIOTransportServices
  23. import SwiftProtobuf
  24. /// Provides a single, managed connection to a server.
  25. ///
  26. /// The connection to the server is provided by a single channel which will attempt to reconnect
  27. /// to the server if the connection is dropped. This connection is guaranteed to always use the same
  28. /// event loop.
  29. ///
  30. /// The connection is initially setup with a handler to verify that TLS was established
  31. /// successfully (assuming TLS is being used).
  32. ///
  33. /// ┌──────────────────────────┐
  34. /// │ DelegatingErrorHandler │
  35. /// └──────────▲───────────────┘
  36. /// HTTP2Frame│
  37. /// ┌──────────┴───────────────┐
  38. /// │ SettingsObservingHandler │
  39. /// └──────────▲───────────────┘
  40. /// HTTP2Frame│
  41. /// │ ⠇ ⠇ ⠇ ⠇
  42. /// │ ┌┴─▼┐ ┌┴─▼┐
  43. /// │ │ | │ | HTTP/2 streams
  44. /// │ └▲─┬┘ └▲─┬┘
  45. /// │ │ │ │ │ HTTP2Frame
  46. /// ┌─┴────────────────┴─▼───┴─▼┐
  47. /// │ HTTP2StreamMultiplexer |
  48. /// └─▲───────────────────────┬─┘
  49. /// HTTP2Frame│ │HTTP2Frame
  50. /// ┌─┴───────────────────────▼─┐
  51. /// │ NIOHTTP2Handler │
  52. /// └─▲───────────────────────┬─┘
  53. /// ByteBuffer│ │ByteBuffer
  54. /// ┌─┴───────────────────────▼─┐
  55. /// │ TLSVerificationHandler │
  56. /// └─▲───────────────────────┬─┘
  57. /// ByteBuffer│ │ByteBuffer
  58. /// ┌─┴───────────────────────▼─┐
  59. /// │ NIOSSLHandler │
  60. /// └─▲───────────────────────┬─┘
  61. /// ByteBuffer│ │ByteBuffer
  62. /// │ ▼
  63. ///
  64. /// The `TLSVerificationHandler` observes the outcome of the SSL handshake and determines
  65. /// whether a `ClientConnection` should be returned to the user. In either eventuality, the
  66. /// handler removes itself from the pipeline once TLS has been verified. There is also a handler
  67. /// after the multiplexer for observing the initial settings frame, after which it determines that
  68. /// the connection state is `.ready` and removes itself from the channel. Finally there is a
  69. /// delegated error handler which uses the error delegate associated with this connection
  70. /// (see `DelegatingErrorHandler`).
  71. ///
  72. /// See `BaseClientCall` for a description of the pipelines associated with each HTTP/2 stream.
  73. public class ClientConnection {
  74. private let connectionManager: ConnectionManager
  75. private func getChannel() -> EventLoopFuture<Channel> {
  76. switch self.configuration.callStartBehavior.wrapped {
  77. case .waitsForConnectivity:
  78. return self.connectionManager.getChannel()
  79. case .fastFailure:
  80. return self.connectionManager.getOptimisticChannel()
  81. }
  82. }
  83. /// HTTP multiplexer from the `channel` handling gRPC calls.
  84. internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> {
  85. return self.getChannel().flatMap {
  86. $0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  87. }
  88. }
  89. /// The configuration for this client.
  90. internal let configuration: Configuration
  91. internal let scheme: String
  92. internal let authority: String
  93. /// A monitor for the connectivity state.
  94. public var connectivity: ConnectivityStateMonitor {
  95. return self.connectionManager.monitor
  96. }
  97. /// The `EventLoop` this connection is using.
  98. public var eventLoop: EventLoop {
  99. return self.connectionManager.eventLoop
  100. }
  101. /// Creates a new connection from the given configuration. Prefer using
  102. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  103. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  104. ///
  105. /// - Important: Users should prefer using `ClientConnection.secure(group:)` to build a connection
  106. /// with TLS, or `ClientConnection.insecure(group:)` to build a connection without TLS.
  107. public init(configuration: Configuration) {
  108. self.configuration = configuration
  109. self.scheme = configuration.tls == nil ? "http" : "https"
  110. self.authority = configuration.target.host
  111. self.connectionManager = ConnectionManager(
  112. configuration: configuration,
  113. logger: configuration.backgroundActivityLogger
  114. )
  115. }
  116. /// Closes the connection to the server.
  117. public func close() -> EventLoopFuture<Void> {
  118. return self.connectionManager.shutdown()
  119. }
  120. /// Extracts a logger and request ID from the call options and returns them. The logger will
  121. /// be populated with the request ID (if applicable) and any metadata from the connection manager.
  122. private func populatedLoggerAndRequestID(from callOptions: CallOptions) -> (Logger, String?) {
  123. var logger = callOptions.logger
  124. self.connectionManager.appendMetadata(to: &logger)
  125. // Attach the request ID.
  126. let requestID = callOptions.requestIDProvider.requestID()
  127. if let requestID = requestID {
  128. logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
  129. }
  130. return (logger, requestID)
  131. }
  132. private func makeRequestHead(path: String, options: CallOptions,
  133. requestID: String?) -> _GRPCRequestHead {
  134. return _GRPCRequestHead(
  135. scheme: self.scheme,
  136. path: path,
  137. host: self.authority,
  138. options: options,
  139. requestID: requestID
  140. )
  141. }
  142. }
  143. // MARK: - Unary
  144. extension ClientConnection: GRPCChannel {
  145. private func makeUnaryCall<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  146. serializer: Serializer,
  147. deserializer: Deserializer,
  148. path: String,
  149. request: Serializer.Input,
  150. callOptions: CallOptions
  151. ) -> UnaryCall<Serializer.Input, Deserializer.Output> {
  152. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  153. let call = UnaryCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  154. multiplexer: self.multiplexer,
  155. serializer: serializer,
  156. deserializer: deserializer,
  157. callOptions: callOptions,
  158. errorDelegate: self.configuration.errorDelegate,
  159. logger: logger
  160. )
  161. call.send(
  162. self.makeRequestHead(path: path, options: callOptions, requestID: requestID),
  163. request: request
  164. )
  165. return call
  166. }
  167. /// A unary call using `SwiftProtobuf.Message` messages.
  168. public func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  169. path: String,
  170. request: Request,
  171. callOptions: CallOptions
  172. ) -> UnaryCall<Request, Response> {
  173. return self.makeUnaryCall(
  174. serializer: ProtobufSerializer(),
  175. deserializer: ProtobufDeserializer(),
  176. path: path,
  177. request: request,
  178. callOptions: callOptions
  179. )
  180. }
  181. /// A unary call using `GRPCPayload` messages.
  182. public func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  183. path: String,
  184. request: Request,
  185. callOptions: CallOptions
  186. ) -> UnaryCall<Request, Response> {
  187. return self.makeUnaryCall(
  188. serializer: GRPCPayloadSerializer(),
  189. deserializer: GRPCPayloadDeserializer(),
  190. path: path,
  191. request: request,
  192. callOptions: callOptions
  193. )
  194. }
  195. }
  196. // MARK: - Client Streaming
  197. extension ClientConnection {
  198. private func makeClientStreamingCall<
  199. Serializer: MessageSerializer,
  200. Deserializer: MessageDeserializer
  201. >(
  202. serializer: Serializer,
  203. deserializer: Deserializer,
  204. path: String,
  205. callOptions: CallOptions
  206. ) -> ClientStreamingCall<Serializer.Input, Deserializer.Output> {
  207. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  208. let call = ClientStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  209. multiplexer: self.multiplexer,
  210. serializer: serializer,
  211. deserializer: deserializer,
  212. callOptions: callOptions,
  213. errorDelegate: self.configuration.errorDelegate,
  214. logger: logger
  215. )
  216. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  217. return call
  218. }
  219. /// A client streaming call using `SwiftProtobuf.Message` messages.
  220. public func makeClientStreamingCall<
  221. Request: SwiftProtobuf.Message,
  222. Response: SwiftProtobuf.Message
  223. >(
  224. path: String,
  225. callOptions: CallOptions
  226. ) -> ClientStreamingCall<Request, Response> {
  227. return self.makeClientStreamingCall(
  228. serializer: ProtobufSerializer(),
  229. deserializer: ProtobufDeserializer(),
  230. path: path,
  231. callOptions: callOptions
  232. )
  233. }
  234. /// A client streaming call using `GRPCPayload` messages.
  235. public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  236. path: String,
  237. callOptions: CallOptions
  238. ) -> ClientStreamingCall<Request, Response> {
  239. return self.makeClientStreamingCall(
  240. serializer: GRPCPayloadSerializer(),
  241. deserializer: GRPCPayloadDeserializer(),
  242. path: path,
  243. callOptions: callOptions
  244. )
  245. }
  246. }
  247. // MARK: - Server Streaming
  248. extension ClientConnection {
  249. private func makeServerStreamingCall<
  250. Serializer: MessageSerializer,
  251. Deserializer: MessageDeserializer
  252. >(
  253. serializer: Serializer,
  254. deserializer: Deserializer,
  255. path: String,
  256. request: Serializer.Input,
  257. callOptions: CallOptions,
  258. handler: @escaping (Deserializer.Output) -> Void
  259. ) -> ServerStreamingCall<Serializer.Input, Deserializer.Output> {
  260. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  261. let call = ServerStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  262. multiplexer: self.multiplexer,
  263. serializer: serializer,
  264. deserializer: deserializer,
  265. callOptions: callOptions,
  266. errorDelegate: self.configuration.errorDelegate,
  267. logger: logger,
  268. responseHandler: handler
  269. )
  270. call.send(
  271. self.makeRequestHead(path: path, options: callOptions, requestID: requestID),
  272. request: request
  273. )
  274. return call
  275. }
  276. /// A server streaming call using `SwiftProtobuf.Message` messages.
  277. public func makeServerStreamingCall<
  278. Request: SwiftProtobuf.Message,
  279. Response: SwiftProtobuf.Message
  280. >(
  281. path: String,
  282. request: Request,
  283. callOptions: CallOptions,
  284. handler: @escaping (Response) -> Void
  285. ) -> ServerStreamingCall<Request, Response> {
  286. return self.makeServerStreamingCall(
  287. serializer: ProtobufSerializer(),
  288. deserializer: ProtobufDeserializer(),
  289. path: path,
  290. request: request,
  291. callOptions: callOptions,
  292. handler: handler
  293. )
  294. }
  295. /// A server streaming call using `GRPCPayload` messages.
  296. public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  297. path: String,
  298. request: Request,
  299. callOptions: CallOptions,
  300. handler: @escaping (Response) -> Void
  301. ) -> ServerStreamingCall<Request, Response> {
  302. return self.makeServerStreamingCall(
  303. serializer: GRPCPayloadSerializer(),
  304. deserializer: GRPCPayloadDeserializer(),
  305. path: path,
  306. request: request,
  307. callOptions: callOptions,
  308. handler: handler
  309. )
  310. }
  311. }
  312. // MARK: - Bidirectional Streaming
  313. extension ClientConnection {
  314. private func makeBidirectionalStreamingCall<
  315. Serializer: MessageSerializer,
  316. Deserializer: MessageDeserializer
  317. >(
  318. serializer: Serializer,
  319. deserializer: Deserializer,
  320. path: String,
  321. callOptions: CallOptions,
  322. handler: @escaping (Deserializer.Output) -> Void
  323. ) -> BidirectionalStreamingCall<Serializer.Input, Deserializer.Output> {
  324. let (logger, requestID) = self.populatedLoggerAndRequestID(from: callOptions)
  325. let call = BidirectionalStreamingCall<Serializer.Input, Deserializer.Output>.makeOnHTTP2Stream(
  326. multiplexer: self.multiplexer,
  327. serializer: serializer,
  328. deserializer: deserializer,
  329. callOptions: callOptions,
  330. errorDelegate: self.configuration.errorDelegate,
  331. logger: logger,
  332. responseHandler: handler
  333. )
  334. call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
  335. return call
  336. }
  337. /// A bidirectional streaming call using `SwiftProtobuf.Message` messages.
  338. public func makeBidirectionalStreamingCall<
  339. Request: SwiftProtobuf.Message,
  340. Response: SwiftProtobuf.Message
  341. >(
  342. path: String,
  343. callOptions: CallOptions,
  344. handler: @escaping (Response) -> Void
  345. ) -> BidirectionalStreamingCall<Request, Response> {
  346. return self.makeBidirectionalStreamingCall(
  347. serializer: ProtobufSerializer(),
  348. deserializer: ProtobufDeserializer(),
  349. path: path,
  350. callOptions: callOptions,
  351. handler: handler
  352. )
  353. }
  354. /// A bidirectional streaming call using `GRPCPayload` messages.
  355. public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  356. path: String,
  357. callOptions: CallOptions,
  358. handler: @escaping (Response) -> Void
  359. ) -> BidirectionalStreamingCall<Request, Response> {
  360. return self.makeBidirectionalStreamingCall(
  361. serializer: GRPCPayloadSerializer(),
  362. deserializer: GRPCPayloadDeserializer(),
  363. path: path,
  364. callOptions: callOptions,
  365. handler: handler
  366. )
  367. }
  368. }
  369. // MARK: - Configuration structures
  370. /// A target to connect to.
  371. public struct ConnectionTarget {
  372. internal enum Wrapped {
  373. case hostAndPort(String, Int)
  374. case unixDomainSocket(String)
  375. case socketAddress(SocketAddress)
  376. }
  377. internal var wrapped: Wrapped
  378. private init(_ wrapped: Wrapped) {
  379. self.wrapped = wrapped
  380. }
  381. /// The host and port.
  382. public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget {
  383. return ConnectionTarget(.hostAndPort(host, port))
  384. }
  385. /// The path of a Unix domain socket.
  386. public static func unixDomainSocket(_ path: String) -> ConnectionTarget {
  387. return ConnectionTarget(.unixDomainSocket(path))
  388. }
  389. /// A NIO socket address.
  390. public static func socketAddress(_ address: SocketAddress) -> ConnectionTarget {
  391. return ConnectionTarget(.socketAddress(address))
  392. }
  393. var host: String {
  394. switch self.wrapped {
  395. case let .hostAndPort(host, _):
  396. return host
  397. case let .socketAddress(.v4(address)):
  398. return address.host
  399. case let .socketAddress(.v6(address)):
  400. return address.host
  401. case .unixDomainSocket, .socketAddress(.unixDomainSocket):
  402. return "localhost"
  403. }
  404. }
  405. }
  406. /// The connectivity behavior to use when starting an RPC.
  407. public struct CallStartBehavior: Hashable {
  408. internal enum Behavior: Hashable {
  409. case waitsForConnectivity
  410. case fastFailure
  411. }
  412. internal var wrapped: Behavior
  413. private init(_ wrapped: Behavior) {
  414. self.wrapped = wrapped
  415. }
  416. /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start
  417. /// an RPC. Doing so may involve multiple connection attempts.
  418. ///
  419. /// This is the preferred, and default, behaviour.
  420. public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity)
  421. /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed
  422. /// quickly rather than waiting for an active connection. The behaviour depends on the current
  423. /// connectivity state:
  424. ///
  425. /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails.
  426. /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt
  427. /// fails.
  428. /// - Ready: a connection is already active: the RPC will be started using that connection.
  429. /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to
  430. /// connect again. The RPC will fail immediately.
  431. /// - Shutdown: the connection is shutdown, the RPC will fail immediately.
  432. public static let fastFailure = CallStartBehavior(.fastFailure)
  433. }
  434. extension ClientConnection {
  435. /// The configuration for a connection.
  436. public struct Configuration {
  437. /// The target to connect to.
  438. public var target: ConnectionTarget
  439. /// The event loop group to run the connection on.
  440. public var eventLoopGroup: EventLoopGroup
  441. /// An error delegate which is called when errors are caught. Provided delegates **must not
  442. /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain
  443. /// cycle.
  444. public var errorDelegate: ClientErrorDelegate?
  445. /// A delegate which is called when the connectivity state is changed.
  446. public var connectivityStateDelegate: ConnectivityStateDelegate?
  447. /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is
  448. /// provided but the queue is `nil` then one will be created by gRPC.
  449. public var connectivityStateDelegateQueue: DispatchQueue?
  450. /// TLS configuration for this connection. `nil` if TLS is not desired.
  451. public var tls: TLS?
  452. /// The connection backoff configuration. If no connection retrying is required then this should
  453. /// be `nil`.
  454. public var connectionBackoff: ConnectionBackoff?
  455. /// The connection keepalive configuration.
  456. public var connectionKeepalive: ClientConnectionKeepalive
  457. /// The amount of time to wait before closing the connection. The idle timeout will start only
  458. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  459. ///
  460. /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
  461. public var connectionIdleTimeout: TimeAmount
  462. /// The behavior used to determine when an RPC should start. That is, whether it should wait for
  463. /// an active connection or fail quickly if no connection is currently available.
  464. public var callStartBehavior: CallStartBehavior
  465. /// The HTTP/2 flow control target window size.
  466. public var httpTargetWindowSize: Int
  467. /// The HTTP protocol used for this connection.
  468. public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol {
  469. return self.tls == nil ? .http : .https
  470. }
  471. /// A logger for background information (such as connectivity state). A separate logger for
  472. /// requests may be provided in the `CallOptions`.
  473. ///
  474. /// Defaults to a no-op logger.
  475. public var backgroundActivityLogger: Logger
  476. /// A channel initializer which will be run after gRPC has initialized each channel. This may be
  477. /// used to add additional handlers to the pipeline and is intended for debugging.
  478. ///
  479. /// - Warning: The initializer closure may be invoked *multiple times*.
  480. public var debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
  481. /// Create a `Configuration` with some pre-defined defaults. Prefer using
  482. /// `ClientConnection.secure(group:)` to build a connection secured with TLS or
  483. /// `ClientConnection.insecure(group:)` to build a plaintext connection.
  484. ///
  485. /// - Parameter target: The target to connect to.
  486. /// - Parameter eventLoopGroup: The event loop group to run the connection on.
  487. /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
  488. /// on debug builds.
  489. /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
  490. /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the
  491. /// `connectivityStateDelegate`.
  492. /// - Parameter tls: TLS configuration, defaulting to `nil`.
  493. /// - Parameter connectionBackoff: The connection backoff configuration to use.
  494. /// - Parameter connectionKeepalive: The keepalive configuration to use.
  495. /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 30 minutes.
  496. /// - Parameter callStartBehavior: The behavior used to determine when a call should start in
  497. /// relation to its underlying connection. Defaults to `waitsForConnectivity`.
  498. /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size.
  499. /// - Parameter backgroundActivityLogger: A logger for background information (such as
  500. /// connectivity state). Defaults to a no-op logger.
  501. /// - Parameter debugChannelInitializer: A channel initializer will be called after gRPC has
  502. /// initialized the channel. Defaults to `nil`.
  503. public init(
  504. target: ConnectionTarget,
  505. eventLoopGroup: EventLoopGroup,
  506. errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
  507. connectivityStateDelegate: ConnectivityStateDelegate? = nil,
  508. connectivityStateDelegateQueue: DispatchQueue? = nil,
  509. tls: Configuration.TLS? = nil,
  510. connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
  511. connectionKeepalive: ClientConnectionKeepalive = ClientConnectionKeepalive(),
  512. connectionIdleTimeout: TimeAmount = .minutes(30),
  513. callStartBehavior: CallStartBehavior = .waitsForConnectivity,
  514. httpTargetWindowSize: Int = 65535,
  515. backgroundActivityLogger: Logger = Logger(
  516. label: "io.grpc",
  517. factory: { _ in SwiftLogNoOpLogHandler() }
  518. ),
  519. debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)? = nil
  520. ) {
  521. self.target = target
  522. self.eventLoopGroup = eventLoopGroup
  523. self.errorDelegate = errorDelegate
  524. self.connectivityStateDelegate = connectivityStateDelegate
  525. self.connectivityStateDelegateQueue = connectivityStateDelegateQueue
  526. self.tls = tls
  527. self.connectionBackoff = connectionBackoff
  528. self.connectionKeepalive = connectionKeepalive
  529. self.connectionIdleTimeout = connectionIdleTimeout
  530. self.callStartBehavior = callStartBehavior
  531. self.httpTargetWindowSize = httpTargetWindowSize
  532. self.backgroundActivityLogger = backgroundActivityLogger
  533. self.debugChannelInitializer = debugChannelInitializer
  534. }
  535. }
  536. }
  537. // MARK: - Configuration helpers/extensions
  538. extension ClientBootstrapProtocol {
  539. /// Connect to the given connection target.
  540. ///
  541. /// - Parameter target: The target to connect to.
  542. func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> {
  543. switch target.wrapped {
  544. case let .hostAndPort(host, port):
  545. return self.connect(host: host, port: port)
  546. case let .unixDomainSocket(path):
  547. return self.connect(unixDomainSocketPath: path)
  548. case let .socketAddress(address):
  549. return self.connect(to: address)
  550. }
  551. }
  552. }
  553. extension Channel {
  554. /// Configure the channel with TLS.
  555. ///
  556. /// This function adds two handlers to the pipeline: the `NIOSSLClientHandler` to handle TLS, and
  557. /// the `TLSVerificationHandler` which verifies that a successful handshake was completed.
  558. ///
  559. /// - Parameter configuration: The configuration to configure the channel with.
  560. /// - Parameter serverHostname: The server hostname to use if the hostname should be verified.
  561. /// - Parameter errorDelegate: The error delegate to use for the TLS verification handler.
  562. func configureTLS(
  563. _ configuration: TLSConfiguration,
  564. serverHostname: String?,
  565. errorDelegate: ClientErrorDelegate?,
  566. logger: Logger
  567. ) -> EventLoopFuture<Void> {
  568. do {
  569. let sslClientHandler = try NIOSSLClientHandler(
  570. context: try NIOSSLContext(configuration: configuration),
  571. serverHostname: serverHostname
  572. )
  573. return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler(logger: logger))
  574. } catch {
  575. return self.eventLoop.makeFailedFuture(error)
  576. }
  577. }
  578. func configureGRPCClient(
  579. httpTargetWindowSize: Int,
  580. tlsConfiguration: TLSConfiguration?,
  581. tlsServerHostname: String?,
  582. connectionManager: ConnectionManager,
  583. connectionKeepalive: ClientConnectionKeepalive,
  584. connectionIdleTimeout: TimeAmount,
  585. errorDelegate: ClientErrorDelegate?,
  586. requiresZeroLengthWriteWorkaround: Bool,
  587. logger: Logger
  588. ) -> EventLoopFuture<Void> {
  589. let tlsConfigured = tlsConfiguration.map {
  590. self.configureTLS(
  591. $0,
  592. serverHostname: tlsServerHostname,
  593. errorDelegate: errorDelegate,
  594. logger: logger
  595. )
  596. }
  597. let configuration: EventLoopFuture<Void> = (
  598. tlsConfigured ?? self.eventLoop
  599. .makeSucceededFuture(())
  600. ).flatMap {
  601. self.configureHTTP2Pipeline(
  602. mode: .client,
  603. targetWindowSize: httpTargetWindowSize,
  604. inboundStreamInitializer: nil
  605. )
  606. }.flatMap { _ in
  607. self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
  608. self.pipeline.addHandlers(
  609. [
  610. GRPCClientKeepaliveHandler(configuration: connectionKeepalive),
  611. GRPCIdleHandler(
  612. mode: .client(connectionManager),
  613. logger: logger,
  614. idleTimeout: connectionIdleTimeout
  615. ),
  616. ],
  617. position: .after(http2Handler)
  618. )
  619. }.flatMap {
  620. let errorHandler = DelegatingErrorHandler(
  621. logger: logger,
  622. delegate: errorDelegate
  623. )
  624. return self.pipeline.addHandler(errorHandler)
  625. }
  626. }
  627. #if canImport(Network)
  628. // This availability guard is arguably unnecessary, but we add it anyway.
  629. if requiresZeroLengthWriteWorkaround, #available(
  630. OSX 10.14,
  631. iOS 12.0,
  632. tvOS 12.0,
  633. watchOS 6.0,
  634. *
  635. ) {
  636. return configuration.flatMap {
  637. self.pipeline.addHandler(NIOFilterEmptyWritesHandler(), position: .first)
  638. }
  639. } else {
  640. return configuration
  641. }
  642. #else
  643. return configuration
  644. #endif
  645. }
  646. func configureGRPCClient(
  647. errorDelegate: ClientErrorDelegate?,
  648. logger: Logger
  649. ) -> EventLoopFuture<Void> {
  650. return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in
  651. self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
  652. }
  653. }
  654. }
  655. extension TimeAmount {
  656. /// Creates a new `TimeAmount` from the given time interval in seconds.
  657. ///
  658. /// - Parameter timeInterval: The amount of time in seconds
  659. static func seconds(timeInterval: TimeInterval) -> TimeAmount {
  660. return .nanoseconds(Int64(timeInterval * 1_000_000_000))
  661. }
  662. }
  663. extension String {
  664. var isIPAddress: Bool {
  665. // We need some scratch space to let inet_pton write into.
  666. var ipv4Addr = in_addr()
  667. var ipv6Addr = in6_addr()
  668. return self.withCString { ptr in
  669. inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
  670. inet_pton(AF_INET6, ptr, &ipv6Addr) == 1
  671. }
  672. }
  673. }