PooledChannel.swift 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. import NIOSSL
  20. import SwiftProtobuf
  21. internal final class PooledChannel: GRPCChannel {
  22. private let configuration: GRPCChannelPool.Configuration
  23. private let pool: PoolManager
  24. private let authority: String
  25. private let scheme: String
  26. internal init(configuration: GRPCChannelPool.Configuration) throws {
  27. self.configuration = configuration
  28. self.authority = configuration.target.host
  29. let tlsMode: DefaultChannelProvider.TLSMode
  30. let scheme: String
  31. if let tlsConfiguration = configuration.transportSecurity.tlsConfiguration {
  32. scheme = "https"
  33. if let sslContext = try tlsConfiguration.makeNIOSSLContext() {
  34. tlsMode = .configureWithNIOSSL(.success(sslContext))
  35. } else {
  36. // No SSL context means we're using Network.framework.
  37. tlsMode = .configureWithNetworkFramework
  38. }
  39. } else {
  40. scheme = "http"
  41. tlsMode = .disabled
  42. }
  43. self.scheme = scheme
  44. let provider = DefaultChannelProvider(
  45. connectionTarget: configuration.target,
  46. connectionKeepalive: configuration.keepalive,
  47. connectionIdleTimeout: configuration.idleTimeout,
  48. tlsMode: tlsMode,
  49. tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
  50. httpTargetWindowSize: configuration.http2.targetWindowSize,
  51. httpMaxFrameSize: configuration.http2.targetWindowSize,
  52. errorDelegate: configuration.errorDelegate,
  53. debugChannelInitializer: configuration.debugChannelInitializer
  54. )
  55. self.pool = PoolManager.makeInitializedPoolManager(
  56. using: configuration.eventLoopGroup,
  57. perPoolConfiguration: .init(
  58. maxConnections: configuration.connectionPool.connectionsPerEventLoop,
  59. maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
  60. loadThreshold: configuration.connectionPool.reservationLoadThreshold,
  61. assumedMaxConcurrentStreams: 100,
  62. channelProvider: provider
  63. ),
  64. logger: configuration.backgroundActivityLogger.wrapped
  65. )
  66. }
  67. private func makeStreamChannel(
  68. callOptions: CallOptions
  69. ) -> (EventLoopFuture<Channel>, EventLoop) {
  70. let preferredEventLoop = callOptions.eventLoopPreference.exact
  71. let connectionWaitDeadline = NIODeadline.now() + self.configuration.connectionPool.maxWaitTime
  72. let deadline = min(callOptions.timeLimit.makeDeadline(), connectionWaitDeadline)
  73. let streamChannel = self.pool.makeStream(
  74. preferredEventLoop: preferredEventLoop,
  75. deadline: deadline,
  76. logger: GRPCLogger(wrapping: callOptions.logger)
  77. ) { channel in
  78. return channel.eventLoop.makeSucceededVoidFuture()
  79. }
  80. return (streamChannel.futureResult, preferredEventLoop ?? streamChannel.eventLoop)
  81. }
  82. // MARK: GRPCChannel conformance
  83. internal func makeCall<Request, Response>(
  84. path: String,
  85. type: GRPCCallType,
  86. callOptions: CallOptions,
  87. interceptors: [ClientInterceptor<Request, Response>]
  88. ) -> Call<Request, Response> where Request: Message, Response: Message {
  89. let (stream, eventLoop) = self.makeStreamChannel(callOptions: callOptions)
  90. return Call(
  91. path: path,
  92. type: type,
  93. eventLoop: eventLoop,
  94. options: callOptions,
  95. interceptors: interceptors,
  96. transportFactory: .http2(
  97. channel: stream,
  98. authority: self.authority,
  99. scheme: self.scheme,
  100. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  101. errorDelegate: self.configuration.errorDelegate
  102. )
  103. )
  104. }
  105. internal func makeCall<Request, Response>(
  106. path: String,
  107. type: GRPCCallType,
  108. callOptions: CallOptions,
  109. interceptors: [ClientInterceptor<Request, Response>]
  110. ) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  111. let (stream, eventLoop) = self.makeStreamChannel(callOptions: callOptions)
  112. return Call(
  113. path: path,
  114. type: type,
  115. eventLoop: eventLoop,
  116. options: callOptions,
  117. interceptors: interceptors,
  118. transportFactory: .http2(
  119. channel: stream,
  120. authority: self.authority,
  121. scheme: self.scheme,
  122. maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
  123. errorDelegate: self.configuration.errorDelegate
  124. )
  125. )
  126. }
  127. internal func close(promise: EventLoopPromise<Void>) {
  128. self.pool.shutdown(promise: promise)
  129. }
  130. internal func close() -> EventLoopFuture<Void> {
  131. let promise = self.configuration.eventLoopGroup.next().makePromise(of: Void.self)
  132. self.pool.shutdown(promise: promise)
  133. return promise.futureResult
  134. }
  135. }