Browse Source

Add configuration for HTTP/2 max concurrent streams (#1226)

Quirin Schweigert 4 years ago
parent
commit
46021d0bb8

+ 14 - 1
Sources/GRPC/GRPCServerPipelineConfigurator.swift

@@ -15,6 +15,7 @@
  */
 import Logging
 import NIO
+import NIOHPACK
 import NIOHTTP1
 import NIOHTTP2
 import NIOTLS
@@ -78,7 +79,19 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
 
   /// Makes an HTTP/2 handler.
   private func makeHTTP2Handler() -> NIOHTTP2Handler {
-    return .init(mode: .server)
+    return .init(
+      mode: .server,
+      initialSettings: [
+        HTTP2Setting(
+          parameter: .maxConcurrentStreams,
+          value: self.configuration.httpMaxConcurrentStreams
+        ),
+        HTTP2Setting(
+          parameter: .maxHeaderListSize,
+          value: HPACKDecoder.defaultMaxHeaderListSize
+        ),
+      ]
+    )
   }
 
   /// Makes an HTTP/2 multiplexer suitable handling gRPC requests.

+ 7 - 0
Sources/GRPC/Server.swift

@@ -324,6 +324,13 @@ extension Server {
     /// The HTTP/2 flow control target window size. Defaults to 65535.
     public var httpTargetWindowSize: Int = 65535
 
+    /// The HTTP/2 max number of concurrent streams. Defaults to 100. Must be non-negative.
+    public var httpMaxConcurrentStreams: Int = 100 {
+      willSet {
+        precondition(newValue >= 0, "httpMaxConcurrentStreams must be non-negative")
+      }
+    }
+
     /// The root server logger. Accepted connections will branch from this logger and RPCs on
     /// each connection will use a logger branched from the connections logger. This logger is made
     /// available to service providers via `context`. Defaults to a no-op logger.

+ 8 - 0
Sources/GRPC/ServerBuilder.swift

@@ -160,6 +160,14 @@ extension Server.Builder {
   }
 }
 
+extension Server.Builder {
+  @discardableResult
+  public func withHTTPMaxConcurrentStreams(_ httpMaxConcurrentStreams: Int) -> Self {
+    self.configuration.httpMaxConcurrentStreams = httpMaxConcurrentStreams
+    return self
+  }
+}
+
 extension Server.Builder {
   /// Sets the root server logger. Accepted connections will branch from this logger and RPCs on
   /// each connection will use a logger branched from the connections logger. This logger is made

+ 90 - 0
Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift

@@ -0,0 +1,90 @@
+/*
+ * Copyright 2021, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import EchoImplementation
+import EchoModel
+@testable import GRPC
+import NIO
+import NIOHTTP2
+import XCTest
+
+class HTTP2MaxConcurrentStreamsTests: GRPCTestCase {
+  struct Constants {
+    static let testTimeout: TimeInterval = 10
+
+    static let defaultMaxNumberOfConcurrentStreams =
+      nioDefaultSettings.first(where: { $0.parameter == .maxConcurrentStreams })!.value
+
+    static let testNumberOfConcurrentStreams: Int = defaultMaxNumberOfConcurrentStreams + 20
+  }
+
+  func testHTTP2MaxConcurrentStreamsSetting() {
+    let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
+    defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
+
+    let server = try! Server.insecure(group: eventLoopGroup)
+      .withLogger(self.serverLogger)
+      .withHTTPMaxConcurrentStreams(Constants.testNumberOfConcurrentStreams)
+      .withServiceProviders([EchoProvider()])
+      .bind(host: "localhost", port: 0)
+      .wait()
+
+    defer { XCTAssertNoThrow(try server.initiateGracefulShutdown().wait()) }
+
+    let clientConnection = ClientConnection.insecure(group: eventLoopGroup)
+      .withBackgroundActivityLogger(self.clientLogger)
+      .connect(host: "localhost", port: server.channel.localAddress!.port!)
+
+    defer { XCTAssertNoThrow(try clientConnection.close().wait()) }
+
+    let echoClient = Echo_EchoClient(
+      channel: clientConnection,
+      defaultCallOptions: CallOptions(logger: self.clientLogger)
+    )
+
+    var clientStreamingCalls =
+      (0 ..< Constants.testNumberOfConcurrentStreams)
+        .map { _ in echoClient.collect() }
+
+    let allMessagesSentExpectation = self.expectation(description: "all messages sent")
+
+    let sendMessageFutures = clientStreamingCalls
+      .map { $0.sendMessage(.with { $0.text = "Hi!" }) }
+
+    EventLoopFuture<Void>
+      .whenAllSucceed(sendMessageFutures, on: eventLoopGroup.next())
+      .assertSuccess(fulfill: allMessagesSentExpectation)
+
+    self.wait(for: [allMessagesSentExpectation], timeout: Constants.testTimeout)
+
+    let lastCall = clientStreamingCalls.popLast()!
+
+    let lastCallCompletedExpectation = self.expectation(description: "last call completed")
+    _ = lastCall.sendEnd()
+
+    lastCall.status.assertSuccess(fulfill: lastCallCompletedExpectation)
+
+    self.wait(for: [lastCallCompletedExpectation], timeout: Constants.testTimeout)
+
+    let allCallsCompletedExpectation = self.expectation(description: "all calls completed")
+    let endFutures = clientStreamingCalls.map { $0.sendEnd() }
+
+    EventLoopFuture<Void>
+      .whenAllSucceed(endFutures, on: eventLoopGroup.next())
+      .assertSuccess(fulfill: allCallsCompletedExpectation)
+
+    self.wait(for: [allCallsCompletedExpectation], timeout: Constants.testTimeout)
+  }
+}