PooledChannel.swift 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. import NIOSSL
  20. import SwiftProtobuf
  21. @usableFromInline
  22. internal final class PooledChannel: GRPCChannel {
  23. @usableFromInline
  24. internal let _configuration: GRPCChannelPool.Configuration
  25. @usableFromInline
  26. internal let _pool: PoolManager
  27. @usableFromInline
  28. internal let _authority: String
  29. @usableFromInline
  30. internal let _scheme: String
  31. @inlinable
  32. internal init(configuration: GRPCChannelPool.Configuration) throws {
  33. self._configuration = configuration
  34. self._authority = configuration.target.host
  35. let tlsMode: DefaultChannelProvider.TLSMode
  36. let scheme: String
  37. if let tlsConfiguration = configuration.transportSecurity.tlsConfiguration {
  38. scheme = "https"
  39. if let sslContext = try tlsConfiguration.makeNIOSSLContext() {
  40. tlsMode = .configureWithNIOSSL(.success(sslContext))
  41. } else {
  42. // No SSL context means we're using Network.framework.
  43. tlsMode = .configureWithNetworkFramework
  44. }
  45. } else {
  46. scheme = "http"
  47. tlsMode = .disabled
  48. }
  49. self._scheme = scheme
  50. let provider = DefaultChannelProvider(
  51. connectionTarget: configuration.target,
  52. connectionKeepalive: configuration.keepalive,
  53. connectionIdleTimeout: configuration.idleTimeout,
  54. tlsMode: tlsMode,
  55. tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
  56. httpTargetWindowSize: configuration.http2.targetWindowSize,
  57. httpMaxFrameSize: configuration.http2.targetWindowSize,
  58. errorDelegate: configuration.errorDelegate,
  59. debugChannelInitializer: configuration.debugChannelInitializer
  60. )
  61. self._pool = PoolManager.makeInitializedPoolManager(
  62. using: configuration.eventLoopGroup,
  63. perPoolConfiguration: .init(
  64. maxConnections: configuration.connectionPool.connectionsPerEventLoop,
  65. maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
  66. loadThreshold: configuration.connectionPool.reservationLoadThreshold,
  67. assumedMaxConcurrentStreams: 100,
  68. channelProvider: provider
  69. ),
  70. logger: configuration.backgroundActivityLogger.wrapped
  71. )
  72. }
  73. @inlinable
  74. internal func _makeStreamChannel(
  75. callOptions: CallOptions
  76. ) -> (EventLoopFuture<Channel>, EventLoop) {
  77. let preferredEventLoop = callOptions.eventLoopPreference.exact
  78. let connectionWaitDeadline = NIODeadline.now() + self._configuration.connectionPool.maxWaitTime
  79. let deadline = min(callOptions.timeLimit.makeDeadline(), connectionWaitDeadline)
  80. let streamChannel = self._pool.makeStream(
  81. preferredEventLoop: preferredEventLoop,
  82. deadline: deadline,
  83. logger: GRPCLogger(wrapping: callOptions.logger)
  84. ) { channel in
  85. return channel.eventLoop.makeSucceededVoidFuture()
  86. }
  87. return (streamChannel.futureResult, preferredEventLoop ?? streamChannel.eventLoop)
  88. }
  89. // MARK: GRPCChannel conformance
  90. @inlinable
  91. internal func makeCall<Request, Response>(
  92. path: String,
  93. type: GRPCCallType,
  94. callOptions: CallOptions,
  95. interceptors: [ClientInterceptor<Request, Response>]
  96. ) -> Call<Request, Response> where Request: Message, Response: Message {
  97. let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)
  98. return Call(
  99. path: path,
  100. type: type,
  101. eventLoop: eventLoop,
  102. options: callOptions,
  103. interceptors: interceptors,
  104. transportFactory: .http2(
  105. channel: stream,
  106. authority: self._authority,
  107. scheme: self._scheme,
  108. maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength,
  109. errorDelegate: self._configuration.errorDelegate
  110. )
  111. )
  112. }
  113. @inlinable
  114. internal func makeCall<Request, Response>(
  115. path: String,
  116. type: GRPCCallType,
  117. callOptions: CallOptions,
  118. interceptors: [ClientInterceptor<Request, Response>]
  119. ) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  120. let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)
  121. return Call(
  122. path: path,
  123. type: type,
  124. eventLoop: eventLoop,
  125. options: callOptions,
  126. interceptors: interceptors,
  127. transportFactory: .http2(
  128. channel: stream,
  129. authority: self._authority,
  130. scheme: self._scheme,
  131. maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength,
  132. errorDelegate: self._configuration.errorDelegate
  133. )
  134. )
  135. }
  136. @inlinable
  137. internal func close(promise: EventLoopPromise<Void>) {
  138. self._pool.shutdown(promise: promise)
  139. }
  140. @inlinable
  141. internal func close() -> EventLoopFuture<Void> {
  142. let promise = self._configuration.eventLoopGroup.next().makePromise(of: Void.self)
  143. self._pool.shutdown(promise: promise)
  144. return promise.futureResult
  145. }
  146. }