NIOChannelPipeline+GRPC.swift 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package import GRPCCore
  17. package import NIOCore
  18. internal import NIOHPACK
  19. package import NIOHTTP2
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. extension ChannelPipeline.SynchronousOperations {
  22. package typealias HTTP2ConnectionChannel = NIOAsyncChannel<HTTP2Frame, HTTP2Frame>
  23. package typealias HTTP2StreamMultiplexer = NIOHTTP2Handler.AsyncStreamMultiplexer<
  24. (NIOAsyncChannel<RPCRequestPart, RPCResponsePart>, EventLoopFuture<MethodDescriptor>)
  25. >
  26. package func configureGRPCServerPipeline(
  27. channel: any Channel,
  28. compressionConfig: HTTP2ServerTransport.Config.Compression,
  29. connectionConfig: HTTP2ServerTransport.Config.Connection,
  30. http2Config: HTTP2ServerTransport.Config.HTTP2,
  31. rpcConfig: HTTP2ServerTransport.Config.RPC,
  32. requireALPN: Bool,
  33. scheme: Scheme
  34. ) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) {
  35. let serverConnectionHandler = ServerConnectionManagementHandler(
  36. eventLoop: self.eventLoop,
  37. maxIdleTime: connectionConfig.maxIdleTime.map { TimeAmount($0) },
  38. maxAge: connectionConfig.maxAge.map { TimeAmount($0) },
  39. maxGraceTime: connectionConfig.maxGraceTime.map { TimeAmount($0) },
  40. keepaliveTime: TimeAmount(connectionConfig.keepalive.time),
  41. keepaliveTimeout: TimeAmount(connectionConfig.keepalive.timeout),
  42. allowKeepaliveWithoutCalls: connectionConfig.keepalive.clientBehavior.allowWithoutCalls,
  43. minPingIntervalWithoutCalls: TimeAmount(
  44. connectionConfig.keepalive.clientBehavior.minPingIntervalWithoutCalls
  45. ),
  46. requireALPN: requireALPN
  47. )
  48. let flushNotificationHandler = GRPCServerFlushNotificationHandler(
  49. serverConnectionManagementHandler: serverConnectionHandler
  50. )
  51. try self.addHandler(flushNotificationHandler)
  52. let clampedTargetWindowSize = self.clampTargetWindowSize(http2Config.targetWindowSize)
  53. let clampedMaxFrameSize = self.clampMaxFrameSize(http2Config.maxFrameSize)
  54. var http2HandlerConnectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration()
  55. var http2HandlerHTTP2Settings = HTTP2Settings([
  56. HTTP2Setting(parameter: .initialWindowSize, value: clampedTargetWindowSize),
  57. HTTP2Setting(parameter: .maxFrameSize, value: clampedMaxFrameSize),
  58. HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize),
  59. ])
  60. if let maxConcurrentStreams = http2Config.maxConcurrentStreams {
  61. http2HandlerHTTP2Settings.append(
  62. HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)
  63. )
  64. }
  65. http2HandlerConnectionConfiguration.initialSettings = http2HandlerHTTP2Settings
  66. var http2HandlerStreamConfiguration = NIOHTTP2Handler.StreamConfiguration()
  67. http2HandlerStreamConfiguration.targetWindowSize = clampedTargetWindowSize
  68. let streamMultiplexer = try self.configureAsyncHTTP2Pipeline(
  69. mode: .server,
  70. streamDelegate: serverConnectionHandler.http2StreamDelegate,
  71. configuration: NIOHTTP2Handler.Configuration(
  72. connection: http2HandlerConnectionConfiguration,
  73. stream: http2HandlerStreamConfiguration
  74. )
  75. ) { streamChannel in
  76. return streamChannel.eventLoop.makeCompletedFuture {
  77. let methodDescriptorPromise = streamChannel.eventLoop.makePromise(of: MethodDescriptor.self)
  78. let streamHandler = GRPCServerStreamHandler(
  79. scheme: scheme,
  80. acceptedEncodings: compressionConfig.enabledAlgorithms,
  81. maximumPayloadSize: rpcConfig.maxRequestPayloadSize,
  82. methodDescriptorPromise: methodDescriptorPromise
  83. )
  84. try streamChannel.pipeline.syncOperations.addHandler(streamHandler)
  85. let asyncStreamChannel = try NIOAsyncChannel<RPCRequestPart, RPCResponsePart>(
  86. wrappingChannelSynchronously: streamChannel
  87. )
  88. return (asyncStreamChannel, methodDescriptorPromise.futureResult)
  89. }
  90. }
  91. try self.addHandler(serverConnectionHandler)
  92. let connectionChannel = try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(
  93. wrappingChannelSynchronously: channel
  94. )
  95. return (connectionChannel, streamMultiplexer)
  96. }
  97. }
  98. extension ChannelPipeline.SynchronousOperations {
  99. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  100. package func configureGRPCClientPipeline(
  101. channel: any Channel,
  102. config: GRPCChannel.Config
  103. ) throws -> (
  104. NIOAsyncChannel<ClientConnectionEvent, Void>,
  105. NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
  106. ) {
  107. let clampedTargetWindowSize = self.clampTargetWindowSize(config.http2.targetWindowSize)
  108. let clampedMaxFrameSize = self.clampMaxFrameSize(config.http2.maxFrameSize)
  109. // Use NIOs defaults as a starting point.
  110. var http2 = NIOHTTP2Handler.Configuration()
  111. http2.stream.targetWindowSize = clampedTargetWindowSize
  112. http2.connection.initialSettings = [
  113. // Disallow servers from creating push streams.
  114. HTTP2Setting(parameter: .enablePush, value: 0),
  115. // Set the initial window size and max frame size to the clamped configured values.
  116. HTTP2Setting(parameter: .initialWindowSize, value: clampedTargetWindowSize),
  117. HTTP2Setting(parameter: .maxFrameSize, value: clampedMaxFrameSize),
  118. // Use NIOs default max header list size (16kB)
  119. HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize),
  120. ]
  121. let connectionHandler = ClientConnectionHandler(
  122. eventLoop: self.eventLoop,
  123. maxIdleTime: config.connection.maxIdleTime.map { TimeAmount($0) },
  124. keepaliveTime: config.connection.keepalive.map { TimeAmount($0.time) },
  125. keepaliveTimeout: config.connection.keepalive.map { TimeAmount($0.timeout) },
  126. keepaliveWithoutCalls: config.connection.keepalive?.allowWithoutCalls ?? false
  127. )
  128. let multiplexer = try self.configureAsyncHTTP2Pipeline(
  129. mode: .client,
  130. streamDelegate: connectionHandler.http2StreamDelegate,
  131. configuration: http2
  132. ) { stream in
  133. // Shouldn't happen, push-promises are disabled so the server shouldn't be able to
  134. // open streams.
  135. stream.close()
  136. }
  137. try self.addHandler(connectionHandler)
  138. let connection = try NIOAsyncChannel(
  139. wrappingChannelSynchronously: channel,
  140. configuration: NIOAsyncChannel.Configuration(
  141. inboundType: ClientConnectionEvent.self,
  142. outboundType: Void.self
  143. )
  144. )
  145. return (connection, multiplexer)
  146. }
  147. }
  148. extension ChannelPipeline.SynchronousOperations {
  149. /// Max frame size must be in the range `2^14 ..< 2^24` (RFC 9113 § 4.2).
  150. fileprivate func clampMaxFrameSize(_ maxFrameSize: Int) -> Int {
  151. let clampedMaxFrameSize: Int
  152. if maxFrameSize >= (1 << 24) {
  153. clampedMaxFrameSize = (1 << 24) - 1
  154. } else if maxFrameSize < (1 << 14) {
  155. clampedMaxFrameSize = (1 << 14)
  156. } else {
  157. clampedMaxFrameSize = maxFrameSize
  158. }
  159. return clampedMaxFrameSize
  160. }
  161. /// Window size which mustn't exceed `2^31 - 1` (RFC 9113 § 6.5.2).
  162. internal func clampTargetWindowSize(_ targetWindowSize: Int) -> Int {
  163. min(targetWindowSize, (1 << 31) - 1)
  164. }
  165. }