PooledChannel.swift 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. import NIOSSL
  20. import SwiftProtobuf
  21. @usableFromInline
  22. internal final class PooledChannel: GRPCChannel {
  23. @usableFromInline
  24. internal let _configuration: GRPCChannelPool.Configuration
  25. @usableFromInline
  26. internal let _pool: PoolManager
  27. @usableFromInline
  28. internal let _authority: String
  29. @usableFromInline
  30. internal let _scheme: String
  31. @inlinable
  32. internal init(configuration: GRPCChannelPool.Configuration) throws {
  33. self._configuration = configuration
  34. self._authority = configuration.target.host
  35. let tlsMode: DefaultChannelProvider.TLSMode
  36. let scheme: String
  37. if let tlsConfiguration = configuration.transportSecurity.tlsConfiguration {
  38. scheme = "https"
  39. if let sslContext = try tlsConfiguration.makeNIOSSLContext() {
  40. tlsMode = .configureWithNIOSSL(.success(sslContext))
  41. } else {
  42. // No SSL context means we're using Network.framework.
  43. tlsMode = .configureWithNetworkFramework
  44. }
  45. } else {
  46. scheme = "http"
  47. tlsMode = .disabled
  48. }
  49. self._scheme = scheme
  50. let provider = DefaultChannelProvider(
  51. connectionTarget: configuration.target,
  52. connectionKeepalive: configuration.keepalive,
  53. connectionIdleTimeout: configuration.idleTimeout,
  54. tlsMode: tlsMode,
  55. tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
  56. httpTargetWindowSize: configuration.http2.targetWindowSize,
  57. httpMaxFrameSize: configuration.http2.targetWindowSize,
  58. errorDelegate: configuration.errorDelegate,
  59. debugChannelInitializer: configuration.debugChannelInitializer
  60. )
  61. self._pool = PoolManager.makeInitializedPoolManager(
  62. using: configuration.eventLoopGroup,
  63. perPoolConfiguration: .init(
  64. maxConnections: configuration.connectionPool.connectionsPerEventLoop,
  65. maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
  66. loadThreshold: configuration.connectionPool.reservationLoadThreshold,
  67. assumedMaxConcurrentStreams: 100,
  68. connectionBackoff: configuration.connectionBackoff,
  69. channelProvider: provider
  70. ),
  71. logger: configuration.backgroundActivityLogger.wrapped
  72. )
  73. }
  74. @inlinable
  75. internal func _makeStreamChannel(
  76. callOptions: CallOptions
  77. ) -> (EventLoopFuture<Channel>, EventLoop) {
  78. let preferredEventLoop = callOptions.eventLoopPreference.exact
  79. let connectionWaitDeadline = NIODeadline.now() + self._configuration.connectionPool.maxWaitTime
  80. let deadline = min(callOptions.timeLimit.makeDeadline(), connectionWaitDeadline)
  81. let streamChannel = self._pool.makeStream(
  82. preferredEventLoop: preferredEventLoop,
  83. deadline: deadline,
  84. logger: GRPCLogger(wrapping: callOptions.logger)
  85. ) { channel in
  86. return channel.eventLoop.makeSucceededVoidFuture()
  87. }
  88. return (streamChannel.futureResult, preferredEventLoop ?? streamChannel.eventLoop)
  89. }
  90. // MARK: GRPCChannel conformance
  91. @inlinable
  92. internal func makeCall<Request, Response>(
  93. path: String,
  94. type: GRPCCallType,
  95. callOptions: CallOptions,
  96. interceptors: [ClientInterceptor<Request, Response>]
  97. ) -> Call<Request, Response> where Request: Message, Response: Message {
  98. let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)
  99. return Call(
  100. path: path,
  101. type: type,
  102. eventLoop: eventLoop,
  103. options: callOptions,
  104. interceptors: interceptors,
  105. transportFactory: .http2(
  106. channel: stream,
  107. authority: self._authority,
  108. scheme: self._scheme,
  109. maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength,
  110. errorDelegate: self._configuration.errorDelegate
  111. )
  112. )
  113. }
  114. @inlinable
  115. internal func makeCall<Request, Response>(
  116. path: String,
  117. type: GRPCCallType,
  118. callOptions: CallOptions,
  119. interceptors: [ClientInterceptor<Request, Response>]
  120. ) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  121. let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)
  122. return Call(
  123. path: path,
  124. type: type,
  125. eventLoop: eventLoop,
  126. options: callOptions,
  127. interceptors: interceptors,
  128. transportFactory: .http2(
  129. channel: stream,
  130. authority: self._authority,
  131. scheme: self._scheme,
  132. maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength,
  133. errorDelegate: self._configuration.errorDelegate
  134. )
  135. )
  136. }
  137. @inlinable
  138. internal func close(promise: EventLoopPromise<Void>) {
  139. self._pool.shutdown(promise: promise)
  140. }
  141. @inlinable
  142. internal func close() -> EventLoopFuture<Void> {
  143. let promise = self._configuration.eventLoopGroup.next().makePromise(of: Void.self)
  144. self._pool.shutdown(promise: promise)
  145. return promise.futureResult
  146. }
  147. }