浏览代码

Use new NIO API for doing synchronous pipeline operations (#1149)

Motivation:

SwiftNIO 2.27.0 added new API which allows channel handlers to be added
synchronously when the caller know they are on the appropriate event
loop. Doing so allows allocations to be saved.

Modifications:

- Use the synchronous operations helpers where possible.

Result:

Fewer allocations.
George Barnett 4 年之前
父节点
当前提交
4750840eab

+ 1 - 1
Package.swift

@@ -26,7 +26,7 @@ let package = Package(
   dependencies: [
     // GRPC dependencies:
     // Main SwiftNIO package
-    .package(url: "https://github.com/apple/swift-nio.git", from: "2.22.0"),
+    .package(url: "https://github.com/apple/swift-nio.git", from: "2.27.0"),
     // HTTP2 via SwiftNIO
     .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.16.1"),
     // TLS via SwiftNIO

+ 34 - 18
Sources/GRPC/GRPCServerPipelineConfigurator.swift

@@ -90,6 +90,8 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
       channel: channel,
       targetWindowSize: self.configuration.httpTargetWindowSize
     ) { stream in
+      // TODO: use sync options when NIO HTTP/2 support for them is released
+      // https://github.com/apple/swift-nio-http2/pull/283
       stream.getOption(HTTP2StreamChannelOptions.streamID).map { streamID -> Logger in
         logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
         return logger
@@ -135,16 +137,21 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
 
     // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
     // to then insert our keepalive and idle handlers between. We can just add everything together.
-    var handlers: [ChannelHandler] = []
-    handlers.reserveCapacity(3)
-    handlers.append(self.makeHTTP2Handler())
-    handlers.append(self.makeIdleHandler())
-    handlers.append(self.makeHTTP2Multiplexer(for: context.channel))
-
-    // Now configure the pipeline with the handlers.
-    context.channel.pipeline.addHandlers(handlers).whenComplete { result in
-      self.configurationCompleted(result: result, context: context)
+    let result: Result<Void, Error>
+
+    do {
+      // This is only ever called as a result of reading a user inbound event or reading inbound so
+      // we'll be on the right event loop and sync operations are fine.
+      let sync = context.pipeline.syncOperations
+      try sync.addHandler(self.makeHTTP2Handler())
+      try sync.addHandler(self.makeIdleHandler())
+      try sync.addHandler(self.makeHTTP2Multiplexer(for: context.channel))
+      result = .success(())
+    } catch {
+      result = .failure(error)
     }
+
+    self.configurationCompleted(result: result, context: context)
   }
 
   /// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
@@ -152,16 +159,25 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan
     // We're now configuring the pipeline.
     self.state = .configuring
 
-    context.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
-      context.pipeline.addHandlers([
-        WebCORSHandler(),
-        GRPCWebToHTTP2ServerCodec(scheme: self.configuration.tls == nil ? "http" : "https"),
-        // There's no need to normalize headers for HTTP/1.
-        self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger),
-      ])
-    }.whenComplete { result in
-      self.configurationCompleted(result: result, context: context)
+    let result: Result<Void, Error>
+    do {
+      // This is only ever called as a result of reading a user inbound event or reading inbound so
+      // we'll be on the right event loop and sync operations are fine.
+      let sync = context.pipeline.syncOperations
+      try sync.configureHTTPServerPipeline(withErrorHandling: true)
+      try sync.addHandler(WebCORSHandler())
+      let scheme = self.configuration.tls == nil ? "http" : "https"
+      try sync.addHandler(GRPCWebToHTTP2ServerCodec(scheme: scheme))
+      // There's no need to normalize headers for HTTP/1.
+      try sync.addHandler(
+        self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger)
+      )
+      result = .success(())
+    } catch {
+      result = .failure(error)
     }
+
+    self.configurationCompleted(result: result, context: context)
   }
 
   /// Attempts to determine the HTTP version from the buffer and then configure the pipeline

+ 14 - 5
Sources/GRPC/Interceptor/ClientTransportFactory.swift

@@ -225,13 +225,22 @@ private struct HTTP2ClientTransportFactory<Request, Response> {
         let streamPromise = self.multiplexer.eventLoop.makePromise(of: Channel.self)
 
         multiplexer.createStreamChannel(promise: streamPromise) { streamChannel in
-          streamChannel.pipeline.addHandlers([
-            GRPCClientChannelHandler(
+          // This initializer will always occur on the appropriate event loop, sync operations are
+          // fine here.
+          let syncOperations = streamChannel.pipeline.syncOperations
+
+          do {
+            let clientHandler = GRPCClientChannelHandler(
               callType: transport.callDetails.type,
               logger: transport.logger
-            ),
-            transport,
-          ])
+            )
+            try syncOperations.addHandler(clientHandler)
+            try syncOperations.addHandler(transport)
+          } catch {
+            return streamChannel.eventLoop.makeFailedFuture(error)
+          }
+
+          return streamChannel.eventLoop.makeSucceededVoidFuture()
         }
 
         // We don't need the stream, but we do need to know it was correctly configured.

+ 24 - 32
Sources/GRPC/Server.swift

@@ -101,36 +101,33 @@ public final class Server {
         configuration.logger[metadataKey: MetadataKey.remoteAddress] = channel.remoteAddress
           .map { "\($0)" } ?? "n/a"
 
-        var configured: EventLoopFuture<Void>
-        let configurator = GRPCServerPipelineConfigurator(configuration: configuration)
-
-        if let tls = configuration.tls {
-          configured = channel.configureTLS(configuration: tls).flatMap {
-            channel.pipeline.addHandler(configurator)
+        do {
+          let sync = channel.pipeline.syncOperations
+          if let tls = configuration.tls {
+            try sync.configureTLS(configuration: tls)
           }
-        } else {
-          configured = channel.pipeline.addHandler(configurator)
-        }
 
-        // Work around the zero length write issue, if needed.
-        let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
-          group: configuration.eventLoopGroup,
-          hasTLS: configuration.tls != nil
-        )
-        if requiresZeroLengthWorkaround,
-          #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
-          configured = configured.flatMap {
-            channel.pipeline.addHandler(NIOFilterEmptyWritesHandler())
+          // Configures the pipeline based on whether the connection uses TLS or not.
+          try sync.addHandler(GRPCServerPipelineConfigurator(configuration: configuration))
+
+          // Work around the zero length write issue, if needed.
+          let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
+            group: configuration.eventLoopGroup,
+            hasTLS: configuration.tls != nil
+          )
+          if requiresZeroLengthWorkaround,
+            #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
+            try sync.addHandler(NIOFilterEmptyWritesHandler())
           }
+        } catch {
+          return channel.eventLoop.makeFailedFuture(error)
         }
 
-        // Add the debug initializer, if there is one.
+        // Run the debug initializer, if there is one.
         if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
-          return configured.flatMap {
-            debugAcceptedChannelInitializer(channel)
-          }
+          return debugAcceptedChannelInitializer(channel)
         } else {
-          return configured
+          return channel.eventLoop.makeSucceededVoidFuture()
         }
       }
 
@@ -337,19 +334,14 @@ extension Server {
   }
 }
 
-private extension Channel {
+extension ChannelPipeline.SynchronousOperations {
   /// Configure an SSL handler on the channel.
   ///
   /// - Parameters:
   ///   - configuration: The configuration to use when creating the handler.
-  /// - Returns: A future which will be succeeded when the pipeline has been configured.
-  func configureTLS(configuration: Server.Configuration.TLS) -> EventLoopFuture<Void> {
-    do {
-      let context = try NIOSSLContext(configuration: configuration.configuration)
-      return self.pipeline.addHandler(NIOSSLServerHandler(context: context))
-    } catch {
-      return self.pipeline.eventLoop.makeFailedFuture(error)
-    }
+  fileprivate func configureTLS(configuration: Server.Configuration.TLS) throws {
+    let context = try NIOSSLContext(configuration: configuration.configuration)
+    try self.addHandler(NIOSSLServerHandler(context: context))
   }
 }