Browse Source

Update to SwiftNIO HTTP/2 1.13.0 (#922)

Motivation:

The multiplexer from SwiftNIO HTTP/2 required that the first write on
each stream matched the order in which the streams were created.
Violating this led to a connection error; all in-flight and subsequent
RPCs on that connection would fail. Some of our users noticed this
(#912). SwiftNIO HTTP/2 recently reworked the API around how streams
are created: HTTP/2 stream channels are no longer created with a stream
ID and now deliver the frame payload to the channel pipeline rather than
the entire HTTP/2 frame. This allows for a stream ID to be assigned to a
stream when it attempts to flush its first write, rather than when the
stream is created.

Modifications:

- Increase the minimum HTTP/2 version to 1.13.0
- Move away from deprecated APIs: this required changing the inbound-in
  and outbound-out types for `_GRPCClientChannelHandler` as well as a
  few smaller changes elsewhere.

Result:

- RPCs can be created concurrently without fear of violating any stream
  ID ordering rules
- Resolves #912
George Barnett 5 years ago
parent
commit
46d1a29868

+ 2 - 2
Package.resolved

@@ -24,8 +24,8 @@
         "repositoryURL": "https://github.com/apple/swift-nio-http2.git",
         "repositoryURL": "https://github.com/apple/swift-nio-http2.git",
         "state": {
         "state": {
           "branch": null,
           "branch": null,
-          "revision": "c5d10f4165128c3d0cc0e3c0f0a8ef55947a73a6",
-          "version": "1.12.2"
+          "revision": "e9627350bdb85bde7e0dc69a29799e40961ced72",
+          "version": "1.13.0"
         }
         }
       },
       },
       {
       {

+ 1 - 1
Package.swift

@@ -29,7 +29,7 @@ let package = Package(
     // Main SwiftNIO package
     // Main SwiftNIO package
     .package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
     .package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
     // HTTP2 via SwiftNIO
     // HTTP2 via SwiftNIO
-    .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.12.1"),
+    .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.13.0"),
     // TLS via SwiftNIO
     // TLS via SwiftNIO
     .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
     .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
     // Support for Network.framework where possible.
     // Support for Network.framework where possible.

+ 2 - 4
Sources/GRPC/ClientCalls/ClientCallTransport.swift

@@ -168,13 +168,11 @@ internal class ChannelTransport<Request, Response> {
       multiplexer.whenComplete { result in
       multiplexer.whenComplete { result in
         switch result {
         switch result {
         case .success(let mux):
         case .success(let mux):
-          mux.createStreamChannel(promise: streamPromise) { stream, streamID in
-            var logger = logger
-            logger[metadataKey: MetadataKey.streamID] = "\(streamID)"
+          mux.createStreamChannel(promise: streamPromise) { stream in
             logger.trace("created http/2 stream")
             logger.trace("created http/2 stream")
 
 
             return stream.pipeline.addHandlers([
             return stream.pipeline.addHandlers([
-              _GRPCClientChannelHandler(streamID: streamID, callType: callType, logger: logger),
+              _GRPCClientChannelHandler(callType: callType, logger: logger),
               GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
               GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
               GRPCClientCallHandler(call: call)
               GRPCClientCallHandler(call: call)
             ])
             ])

+ 3 - 3
Sources/GRPC/ClientConnection.swift

@@ -509,7 +509,7 @@ extension ClientConnection {
     public var httpTargetWindowSize: Int
     public var httpTargetWindowSize: Int
 
 
     /// The HTTP protocol used for this connection.
     /// The HTTP protocol used for this connection.
-    public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
+    public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol {
       return self.tls == nil ? .http : .https
       return self.tls == nil ? .http : .https
     }
     }
 
 
@@ -642,7 +642,7 @@ extension Channel {
     }
     }
 
 
     let configuration: EventLoopFuture<Void> = (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
     let configuration: EventLoopFuture<Void> = (tlsConfigured ?? self.eventLoop.makeSucceededFuture(())).flatMap {
-      self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize)
+      self.configureHTTP2Pipeline(mode: .client, targetWindowSize: httpTargetWindowSize, inboundStreamInitializer: nil)
     }.flatMap { _ in
     }.flatMap { _ in
       return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
       return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
         self.pipeline.addHandlers([
         self.pipeline.addHandlers([
@@ -677,7 +677,7 @@ extension Channel {
     errorDelegate: ClientErrorDelegate?,
     errorDelegate: ClientErrorDelegate?,
     logger: Logger
     logger: Logger
   ) -> EventLoopFuture<Void> {
   ) -> EventLoopFuture<Void> {
-    return self.configureHTTP2Pipeline(mode: .client).flatMap { _ in
+    return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in
       self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
       self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
     }
     }
   }
   }

+ 14 - 5
Sources/GRPC/HTTPProtocolSwitcher.swift

@@ -145,13 +145,22 @@ extension HTTPProtocolSwitcher: ChannelInboundHandler, RemovableChannelHandler {
         context.channel.configureHTTP2Pipeline(
         context.channel.configureHTTP2Pipeline(
           mode: .server,
           mode: .server,
           targetWindowSize: httpTargetWindowSize
           targetWindowSize: httpTargetWindowSize
-        ) { (streamChannel, streamID) in
+        ) { streamChannel in
           var logger = self.logger
           var logger = self.logger
-          logger[metadataKey: MetadataKey.streamID] = "\(streamID)"
-          return streamChannel.pipeline.addHandler(HTTP2ToHTTP1ServerCodec(streamID: streamID, normalizeHTTPHeaders: true)).flatMap {
-            self.handlersInitializer(streamChannel, logger)
+
+          // Grab the streamID from the channel.
+          return streamChannel.getOption(HTTP2StreamChannelOptions.streamID).map { streamID in
+            logger[metadataKey: MetadataKey.streamID] = "\(streamID)"
+            return logger
+          }.recover { _ in
+            logger[metadataKey: MetadataKey.streamID] = "<unknown>"
+            return logger
+          }.flatMap { logger in
+            return streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap {
+              self.handlersInitializer(streamChannel, logger)
+            }
           }
           }
-        }.flatMap { multiplexer in
+        }.flatMap { multiplexer -> EventLoopFuture<Void> in
           // Add a keepalive and idle handlers between the two HTTP2 handlers.
           // Add a keepalive and idle handlers between the two HTTP2 handlers.
           let keepaliveHandler = GRPCServerKeepaliveHandler(configuration: self.keepAlive)
           let keepaliveHandler = GRPCServerKeepaliveHandler(configuration: self.keepAlive)
           let idleHandler = GRPCIdleHandler(mode: .server, idleTimeout: self.idleTimeout)
           let idleHandler = GRPCIdleHandler(mode: .server, idleTimeout: self.idleTimeout)

+ 1 - 1
Sources/GRPC/_EmbeddedThroughput.swift

@@ -28,7 +28,7 @@ extension EmbeddedChannel {
     responseType: Response.Type = Response.self
     responseType: Response.Type = Response.self
   ) -> EventLoopFuture<Void> {
   ) -> EventLoopFuture<Void> {
     return self.pipeline.addHandlers([
     return self.pipeline.addHandlers([
-      _GRPCClientChannelHandler(streamID: 1, callType: callType, logger: logger),
+      _GRPCClientChannelHandler(callType: callType, logger: logger),
       GRPCClientCodecHandler(
       GRPCClientCodecHandler(
         serializer: ProtobufSerializer<Request>(),
         serializer: ProtobufSerializer<Request>(),
         deserializer: ProtobufDeserializer<Response>()
         deserializer: ProtobufDeserializer<Response>()

+ 11 - 21
Sources/GRPC/_GRPCClientChannelHandler.swift

@@ -268,18 +268,14 @@ public enum GRPCCallType {
 ///   `public` because it is used within performance tests.
 ///   `public` because it is used within performance tests.
 public final class _GRPCClientChannelHandler {
 public final class _GRPCClientChannelHandler {
   private let logger: Logger
   private let logger: Logger
-  private let streamID: HTTP2StreamID
   private var stateMachine: GRPCClientStateMachine
   private var stateMachine: GRPCClientStateMachine
 
 
   /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
   /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
   ///
   ///
   /// - Parameters:
   /// - Parameters:
-  ///   - streamID: The ID of the HTTP/2 stream that this handler will read and write HTTP/2
-  ///     frames on.
   ///   - callType: Type of RPC call being made.
   ///   - callType: Type of RPC call being made.
   ///   - logger: Logger.
   ///   - logger: Logger.
-  public init(streamID: HTTP2StreamID, callType: GRPCCallType, logger: Logger) {
-    self.streamID = streamID
+  public init(callType: GRPCCallType, logger: Logger) {
     self.logger = logger
     self.logger = logger
     switch callType {
     switch callType {
     case .unary:
     case .unary:
@@ -296,12 +292,12 @@ public final class _GRPCClientChannelHandler {
 
 
 // MARK: - GRPCClientChannelHandler: Inbound
 // MARK: - GRPCClientChannelHandler: Inbound
 extension _GRPCClientChannelHandler: ChannelInboundHandler {
 extension _GRPCClientChannelHandler: ChannelInboundHandler {
-  public typealias InboundIn = HTTP2Frame
+  public typealias InboundIn = HTTP2Frame.FramePayload
   public typealias InboundOut = _RawGRPCClientResponsePart
   public typealias InboundOut = _RawGRPCClientResponsePart
 
 
   public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
   public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
-    let frame = self.unwrapInboundIn(data)
-    switch frame.payload {
+    let payload = self.unwrapInboundIn(data)
+    switch payload {
     case .headers(let content):
     case .headers(let content):
       self.readHeaders(content: content, context: context)
       self.readHeaders(content: content, context: context)
 
 
@@ -437,7 +433,7 @@ extension _GRPCClientChannelHandler: ChannelInboundHandler {
 // MARK: - GRPCClientChannelHandler: Outbound
 // MARK: - GRPCClientChannelHandler: Outbound
 extension _GRPCClientChannelHandler: ChannelOutboundHandler {
 extension _GRPCClientChannelHandler: ChannelOutboundHandler {
   public typealias OutboundIn = _RawGRPCClientRequestPart
   public typealias OutboundIn = _RawGRPCClientRequestPart
-  public typealias OutboundOut = HTTP2Frame
+  public typealias OutboundOut = HTTP2Frame.FramePayload
 
 
   public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
   public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
     switch self.unwrapOutboundIn(data) {
     switch self.unwrapOutboundIn(data) {
@@ -446,8 +442,8 @@ extension _GRPCClientChannelHandler: ChannelOutboundHandler {
       switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
       switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
       case .success(let headers):
       case .success(let headers):
         // We're clear to write some headers. Create an appropriate frame and write it.
         // We're clear to write some headers. Create an appropriate frame and write it.
-        let frame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers)))
-        context.write(self.wrapOutboundOut(frame), promise: promise)
+        let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
+        context.write(self.wrapOutboundOut(framePayload), promise: promise)
 
 
       case .failure(let sendRequestHeadersError):
       case .failure(let sendRequestHeadersError):
         switch sendRequestHeadersError {
         switch sendRequestHeadersError {
@@ -464,11 +460,8 @@ extension _GRPCClientChannelHandler: ChannelOutboundHandler {
       switch result {
       switch result {
       case .success(let buffer):
       case .success(let buffer):
         // We're clear to send a message; wrap it up in an HTTP/2 frame.
         // We're clear to send a message; wrap it up in an HTTP/2 frame.
-        let frame = HTTP2Frame(
-          streamID: self.streamID,
-          payload: .data(.init(data: .byteBuffer(buffer)))
-        )
-        context.write(self.wrapOutboundOut(frame), promise: promise)
+        let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
+        context.write(self.wrapOutboundOut(framePayload), promise: promise)
 
 
       case .failure(let writeError):
       case .failure(let writeError):
         switch writeError {
         switch writeError {
@@ -493,11 +486,8 @@ extension _GRPCClientChannelHandler: ChannelOutboundHandler {
       case .success:
       case .success:
         // We can. Send an empty DATA frame with end-stream set.
         // We can. Send an empty DATA frame with end-stream set.
         let empty = context.channel.allocator.buffer(capacity: 0)
         let empty = context.channel.allocator.buffer(capacity: 0)
-        let frame = HTTP2Frame(
-          streamID: self.streamID,
-          payload: .data(.init(data: .byteBuffer(empty), endStream: true))
-        )
-        context.write(self.wrapOutboundOut(frame), promise: promise)
+        let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(empty), endStream: true))
+        context.write(self.wrapOutboundOut(framePayload), promise: promise)
 
 
       case .failure(let error):
       case .failure(let error):
         // Why can't we close the request stream?
         // Why can't we close the request stream?

+ 6 - 11
Tests/GRPCTests/GRPCStatusCodeTests.swift

@@ -29,18 +29,13 @@ class GRPCStatusCodeTests: GRPCTestCase {
   override func setUp() {
   override func setUp() {
     super.setUp()
     super.setUp()
 
 
-    let handler = _GRPCClientChannelHandler(
-      streamID: .init(1),
-      callType: .unary,
-      logger: self.logger
-    )
-
+    let handler = _GRPCClientChannelHandler(callType: .unary, logger: self.logger)
     self.channel = EmbeddedChannel(handler: handler)
     self.channel = EmbeddedChannel(handler: handler)
   }
   }
 
 
-  func headersFrame(status: HTTPResponseStatus) -> HTTP2Frame {
+  func headersFramePayload(status: HTTPResponseStatus) -> HTTP2Frame.FramePayload {
     let headers: HPACKHeaders = [":status": "\(status.code)"]
     let headers: HPACKHeaders = [":status": "\(status.code)"]
-    return .init(streamID: .init(1), payload: .headers(.init(headers: headers)))
+    return .headers(.init(headers: headers))
   }
   }
 
 
   func sendRequestHead() {
   func sendRequestHead() {
@@ -60,7 +55,7 @@ class GRPCStatusCodeTests: GRPCTestCase {
   func doTestResponseStatus(_ status: HTTPResponseStatus, expected: GRPCStatus.Code) throws {
   func doTestResponseStatus(_ status: HTTPResponseStatus, expected: GRPCStatus.Code) throws {
     // Send the request head so we're in a valid state to receive headers.
     // Send the request head so we're in a valid state to receive headers.
     self.sendRequestHead()
     self.sendRequestHead()
-    XCTAssertThrowsError(try self.channel.writeInbound(self.headersFrame(status: status))) { error in
+    XCTAssertThrowsError(try self.channel.writeInbound(self.headersFramePayload(status: status))) { error in
       guard let withContext = error as? GRPCError.WithContext,
       guard let withContext = error as? GRPCError.WithContext,
             let invalidHTTPStatus = withContext.error as? GRPCError.InvalidHTTPStatus  else {
             let invalidHTTPStatus = withContext.error as? GRPCError.InvalidHTTPStatus  else {
         XCTFail("Unexpected error: \(error)")
         XCTFail("Unexpected error: \(error)")
@@ -113,8 +108,8 @@ class GRPCStatusCodeTests: GRPCTestCase {
     ]
     ]
 
 
     self.sendRequestHead()
     self.sendRequestHead()
-    let headerFrame = HTTP2Frame(streamID: .init(1), payload: .headers(.init(headers: headers)))
-    XCTAssertThrowsError(try self.channel.writeInbound(headerFrame)) { error in
+    let headerFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
+    XCTAssertThrowsError(try self.channel.writeInbound(headerFramePayload)) { error in
       guard let withContext = error as? GRPCError.WithContext,
       guard let withContext = error as? GRPCError.WithContext,
             let invalidHTTPStatus = withContext.error as? GRPCError.InvalidHTTPStatusWithGRPCStatus  else {
             let invalidHTTPStatus = withContext.error as? GRPCError.InvalidHTTPStatusWithGRPCStatus  else {
         XCTFail("Unexpected error: \(error)")
         XCTFail("Unexpected error: \(error)")