NIOChannelPipeline+GRPC.swift 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. extension ChannelPipeline.SynchronousOperations {
  22. @_spi(Package) public typealias HTTP2ConnectionChannel = NIOAsyncChannel<HTTP2Frame, HTTP2Frame>
  23. @_spi(Package) public typealias HTTP2StreamMultiplexer = NIOHTTP2Handler.AsyncStreamMultiplexer<
  24. (NIOAsyncChannel<RPCRequestPart, RPCResponsePart>, EventLoopFuture<MethodDescriptor>)
  25. >
  26. @_spi(Package)
  27. public func configureGRPCServerPipeline(
  28. channel: any Channel,
  29. compressionConfig: HTTP2ServerTransport.Config.Compression,
  30. keepaliveConfig: HTTP2ServerTransport.Config.Keepalive,
  31. connectionConfig: HTTP2ServerTransport.Config.Connection,
  32. http2Config: HTTP2ServerTransport.Config.HTTP2,
  33. rpcConfig: HTTP2ServerTransport.Config.RPC,
  34. useTLS: Bool
  35. ) throws -> (HTTP2ConnectionChannel, HTTP2StreamMultiplexer) {
  36. let serverConnectionHandler = ServerConnectionManagementHandler(
  37. eventLoop: self.eventLoop,
  38. maxIdleTime: connectionConfig.maxIdleTime.map { TimeAmount($0) },
  39. maxAge: connectionConfig.maxAge.map { TimeAmount($0) },
  40. maxGraceTime: connectionConfig.maxGraceTime.map { TimeAmount($0) },
  41. keepaliveTime: TimeAmount(keepaliveConfig.time),
  42. keepaliveTimeout: TimeAmount(keepaliveConfig.timeout),
  43. allowKeepaliveWithoutCalls: keepaliveConfig.permitWithoutCalls,
  44. minPingIntervalWithoutCalls: TimeAmount(keepaliveConfig.minPingIntervalWithoutCalls)
  45. )
  46. let flushNotificationHandler = GRPCServerFlushNotificationHandler(
  47. serverConnectionManagementHandler: serverConnectionHandler
  48. )
  49. try self.addHandler(flushNotificationHandler)
  50. var http2HandlerConnectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration()
  51. var http2HandlerHTTP2Settings = HTTP2Settings([
  52. HTTP2Setting(parameter: .initialWindowSize, value: http2Config.targetWindowSize),
  53. HTTP2Setting(parameter: .maxFrameSize, value: http2Config.maxFrameSize),
  54. HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize),
  55. ])
  56. if let maxConcurrentStreams = http2Config.maxConcurrentStreams {
  57. http2HandlerHTTP2Settings.append(
  58. HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)
  59. )
  60. }
  61. http2HandlerConnectionConfiguration.initialSettings = http2HandlerHTTP2Settings
  62. var http2HandlerStreamConfiguration = NIOHTTP2Handler.StreamConfiguration()
  63. http2HandlerStreamConfiguration.targetWindowSize = http2Config.targetWindowSize
  64. let streamMultiplexer = try self.configureAsyncHTTP2Pipeline(
  65. mode: .server,
  66. configuration: NIOHTTP2Handler.Configuration(
  67. connection: http2HandlerConnectionConfiguration,
  68. stream: http2HandlerStreamConfiguration
  69. )
  70. ) { streamChannel in
  71. return streamChannel.eventLoop.makeCompletedFuture {
  72. let methodDescriptorPromise = streamChannel.eventLoop.makePromise(of: MethodDescriptor.self)
  73. let streamHandler = GRPCServerStreamHandler(
  74. scheme: useTLS ? .https : .http,
  75. acceptedEncodings: compressionConfig.enabledAlgorithms,
  76. maximumPayloadSize: rpcConfig.maxRequestPayloadSize,
  77. methodDescriptorPromise: methodDescriptorPromise
  78. )
  79. try streamChannel.pipeline.syncOperations.addHandler(streamHandler)
  80. let asyncStreamChannel = try NIOAsyncChannel<RPCRequestPart, RPCResponsePart>(
  81. wrappingChannelSynchronously: streamChannel
  82. )
  83. return (asyncStreamChannel, methodDescriptorPromise.futureResult)
  84. }
  85. }
  86. try self.addHandler(serverConnectionHandler)
  87. let connectionChannel = try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(
  88. wrappingChannelSynchronously: channel
  89. )
  90. return (connectionChannel, streamMultiplexer)
  91. }
  92. }