Prechádzať zdrojové kódy

Simplfy top-level streamin handling in server transports (#1979)

Modification:

The top-level connection handling code in the two NIO H2 transport is a
little hard to follow because the nesting gets quite deep. So much so
that it took me some time to notice an unnecessary task group in there.

Modifications:

- Split the top-level server code into a couple of functions
- Remove unnecessary task group

Result:

Easier code to follow
George Barnett 1 rok pred
rodič
commit
6c38d1185f

+ 67 - 51
Sources/GRPCHTTP2TransportNIOPosix/HTTP2ServerTransport+Posix.swift

@@ -18,6 +18,7 @@ import GRPCCore
 import GRPCHTTP2Core
 import NIOCore
 import NIOExtras
+import NIOHTTP2
 import NIOPosix
 
 extension HTTP2ServerTransport {
@@ -211,64 +212,79 @@ extension HTTP2ServerTransport {
       }
 
       try await serverChannel.executeThenClose { inbound in
-        try await withThrowingDiscardingTaskGroup { serverTaskGroup in
+        try await withThrowingDiscardingTaskGroup { group in
           for try await (connectionChannel, streamMultiplexer) in inbound {
-            serverTaskGroup.addTask {
-              try await connectionChannel
-                .executeThenClose { connectionInbound, connectionOutbound in
-                  await withDiscardingTaskGroup { connectionTaskGroup in
-                    connectionTaskGroup.addTask {
-                      do {
-                        for try await _ in connectionInbound {}
-                      } catch {
-                        // We don't want to close the channel if one connection throws.
-                        return
-                      }
-                    }
-
-                    connectionTaskGroup.addTask {
-                      await withDiscardingTaskGroup { streamTaskGroup in
-                        do {
-                          for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
-                          {
-                            streamTaskGroup.addTask {
-                              // It's okay to ignore these errors:
-                              // - If we get an error because the http2Stream failed to close, then there's nothing we can do
-                              // - If we get an error because the inner closure threw, then the only possible scenario in which
-                              // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
-                              // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
-                              try? await http2Stream.executeThenClose { inbound, outbound in
-                                guard let descriptor = try? await methodDescriptor.get() else {
-                                  return
-                                }
-                                let rpcStream = RPCStream(
-                                  descriptor: descriptor,
-                                  inbound: RPCAsyncSequence(wrapping: inbound),
-                                  outbound: RPCWriter.Closable(
-                                    wrapping: ServerConnection.Stream.Outbound(
-                                      responseWriter: outbound,
-                                      http2Stream: http2Stream
-                                    )
-                                  )
-                                )
-                                await streamHandler(rpcStream)
-                              }
-                            }
-                          }
-                        } catch {
-                          // We don't want to close the whole connection if one stream throws.
-                          return
-                        }
-                      }
-                    }
-                  }
-                }
+            group.addTask {
+              try await self.handleConnection(
+                connectionChannel,
+                multiplexer: streamMultiplexer,
+                streamHandler: streamHandler
+              )
             }
           }
         }
       }
     }
 
+    private func handleConnection(
+      _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
+      multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
+      streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
+    ) async throws {
+      try await connection.executeThenClose { inbound, _ in
+        await withDiscardingTaskGroup { group in
+          group.addTask {
+            do {
+              for try await _ in inbound {}
+            } catch {
+              // We don't want to close the channel if one connection throws.
+              return
+            }
+          }
+
+          do {
+            for try await (stream, descriptor) in multiplexer.inbound {
+              group.addTask {
+                await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
+              }
+            }
+          } catch {
+            return
+          }
+        }
+      }
+    }
+
+    private func handleStream(
+      _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
+      handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
+      descriptor: EventLoopFuture<MethodDescriptor>
+    ) async {
+      // It's okay to ignore these errors:
+      // - If we get an error because the http2Stream failed to close, then there's nothing we can do
+      // - If we get an error because the inner closure threw, then the only possible scenario in which
+      // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
+      // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
+      try? await stream.executeThenClose { inbound, outbound in
+        guard let descriptor = try? await descriptor.get() else {
+          return
+        }
+
+        let rpcStream = RPCStream(
+          descriptor: descriptor,
+          inbound: RPCAsyncSequence(wrapping: inbound),
+          outbound: RPCWriter.Closable(
+            wrapping: ServerConnection.Stream.Outbound(
+              responseWriter: outbound,
+              http2Stream: stream
+            )
+          )
+        )
+
+        await streamHandler(rpcStream)
+      }
+    }
+
     public func stopListening() {
       self.serverQuiescingHelper.initiateShutdown(promise: nil)
     }

+ 67 - 51
Sources/GRPCHTTP2TransportNIOTransportServices/HTTP2ServerTransport+TransportServices.swift

@@ -19,6 +19,7 @@ import GRPCCore
 import GRPCHTTP2Core
 import NIOCore
 import NIOExtras
+import NIOHTTP2
 import NIOTransportServices
 
 extension HTTP2ServerTransport {
@@ -202,64 +203,79 @@ extension HTTP2ServerTransport {
       }
 
       try await serverChannel.executeThenClose { inbound in
-        try await withThrowingDiscardingTaskGroup { serverTaskGroup in
+        try await withThrowingDiscardingTaskGroup { group in
           for try await (connectionChannel, streamMultiplexer) in inbound {
-            serverTaskGroup.addTask {
-              try await connectionChannel
-                .executeThenClose { connectionInbound, connectionOutbound in
-                  await withDiscardingTaskGroup { connectionTaskGroup in
-                    connectionTaskGroup.addTask {
-                      do {
-                        for try await _ in connectionInbound {}
-                      } catch {
-                        // We don't want to close the channel if one connection throws.
-                        return
-                      }
-                    }
-
-                    connectionTaskGroup.addTask {
-                      await withDiscardingTaskGroup { streamTaskGroup in
-                        do {
-                          for try await (http2Stream, methodDescriptor) in streamMultiplexer.inbound
-                          {
-                            streamTaskGroup.addTask {
-                              // It's okay to ignore these errors:
-                              // - If we get an error because the http2Stream failed to close, then there's nothing we can do
-                              // - If we get an error because the inner closure threw, then the only possible scenario in which
-                              // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
-                              // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
-                              try? await http2Stream.executeThenClose { inbound, outbound in
-                                guard let descriptor = try? await methodDescriptor.get() else {
-                                  return
-                                }
-                                let rpcStream = RPCStream(
-                                  descriptor: descriptor,
-                                  inbound: RPCAsyncSequence(wrapping: inbound),
-                                  outbound: RPCWriter.Closable(
-                                    wrapping: ServerConnection.Stream.Outbound(
-                                      responseWriter: outbound,
-                                      http2Stream: http2Stream
-                                    )
-                                  )
-                                )
-                                await streamHandler(rpcStream)
-                              }
-                            }
-                          }
-                        } catch {
-                          // We don't want to close the whole connection if one stream throws.
-                          return
-                        }
-                      }
-                    }
-                  }
-                }
+            group.addTask {
+              try await self.handleConnection(
+                connectionChannel,
+                multiplexer: streamMultiplexer,
+                streamHandler: streamHandler
+              )
             }
           }
         }
       }
     }
 
+    private func handleConnection(
+      _ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
+      multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
+      streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void
+    ) async throws {
+      try await connection.executeThenClose { inbound, _ in
+        await withDiscardingTaskGroup { group in
+          group.addTask {
+            do {
+              for try await _ in inbound {}
+            } catch {
+              // We don't want to close the channel if one connection throws.
+              return
+            }
+          }
+
+          do {
+            for try await (stream, descriptor) in multiplexer.inbound {
+              group.addTask {
+                await self.handleStream(stream, handler: streamHandler, descriptor: descriptor)
+              }
+            }
+          } catch {
+            return
+          }
+        }
+      }
+    }
+
+    private func handleStream(
+      _ stream: NIOAsyncChannel<RPCRequestPart, RPCResponsePart>,
+      handler streamHandler: @escaping @Sendable (RPCStream<Inbound, Outbound>) async -> Void,
+      descriptor: EventLoopFuture<MethodDescriptor>
+    ) async {
+      // It's okay to ignore these errors:
+      // - If we get an error because the http2Stream failed to close, then there's nothing we can do
+      // - If we get an error because the inner closure threw, then the only possible scenario in which
+      // that could happen is if methodDescriptor.get() throws - in which case, it means we never got
+      // the RPC metadata, which means we can't do anything either and it's okay to just kill the stream.
+      try? await stream.executeThenClose { inbound, outbound in
+        guard let descriptor = try? await descriptor.get() else {
+          return
+        }
+
+        let rpcStream = RPCStream(
+          descriptor: descriptor,
+          inbound: RPCAsyncSequence(wrapping: inbound),
+          outbound: RPCWriter.Closable(
+            wrapping: ServerConnection.Stream.Outbound(
+              responseWriter: outbound,
+              http2Stream: stream
+            )
+          )
+        )
+
+        await streamHandler(rpcStream)
+      }
+    }
+
     public func stopListening() {
       self.serverQuiescingHelper.initiateShutdown(promise: nil)
     }