Browse Source

Add a sendable view to ClientConnectionHandler/ServerConnectionManagementHandler (#1895)

Motivation:

In #1875 the `ClientConnectionHandler` and
`ServerConnectionManagementHandler` had conformance added to
`NIOHTTP2StreamDelegate`. This, in turn, requires that they are
`Sendable`. However, they aren't really `Sendable` as most of their API
relies on being on the correct event-loop.

Modifications:

- Add stream-delegate views over each where the conformance ensures that
  the methods are called on the appropriate event loop

Result:

Fewer warnings
George Barnett 1 year ago
parent
commit
54560d7be9

+ 39 - 5
Sources/GRPCHTTP2Core/Client/Connection/ClientConnectionHandler.swift

@@ -152,10 +152,10 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
   func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
     switch event {
     case let event as NIOHTTP2StreamCreatedEvent:
-      self.streamCreated(event.streamID, channel: context.channel)
+      self._streamCreated(event.streamID, channel: context.channel)
 
     case let event as StreamClosedEvent:
-      self.streamClosed(event.streamID, channel: context.channel)
+      self._streamClosed(event.streamID, channel: context.channel)
 
     default:
       ()
@@ -250,8 +250,42 @@ final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutboundHandl
   }
 }
 
-extension ClientConnectionHandler: NIOHTTP2StreamDelegate {
-  func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
+extension ClientConnectionHandler {
+  struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
+    // @unchecked is okay: the only methods do the appropriate event-loop dance.
+
+    private let handler: ClientConnectionHandler
+
+    init(_ handler: ClientConnectionHandler) {
+      self.handler = handler
+    }
+
+    func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
+      if self.handler.eventLoop.inEventLoop {
+        self.handler._streamCreated(id, channel: channel)
+      } else {
+        self.handler.eventLoop.execute {
+          self.handler._streamCreated(id, channel: channel)
+        }
+      }
+    }
+
+    func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
+      if self.handler.eventLoop.inEventLoop {
+        self.handler._streamClosed(id, channel: channel)
+      } else {
+        self.handler.eventLoop.execute {
+          self.handler._streamClosed(id, channel: channel)
+        }
+      }
+    }
+  }
+
+  var http2StreamDelegate: HTTP2StreamDelegate {
+    return HTTP2StreamDelegate(self)
+  }
+
+  private func _streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
     self.eventLoop.assertInEventLoop()
 
     // Stream created, so the connection isn't idle.
@@ -259,7 +293,7 @@ extension ClientConnectionHandler: NIOHTTP2StreamDelegate {
     self.state.streamOpened(id)
   }
 
-  func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
+  private func _streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
     guard let context = self.context else { return }
     self.eventLoop.assertInEventLoop()
 

+ 39 - 5
Sources/GRPCHTTP2Core/Server/Connection/ServerConnectionManagementHandler.swift

@@ -274,10 +274,10 @@ final class ServerConnectionManagementHandler: ChannelDuplexHandler {
   func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
     switch event {
     case let event as NIOHTTP2StreamCreatedEvent:
-      self.streamCreated(event.streamID, channel: context.channel)
+      self._streamCreated(event.streamID, channel: context.channel)
 
     case let event as StreamClosedEvent:
-      self.streamClosed(event.streamID, channel: context.channel)
+      self._streamClosed(event.streamID, channel: context.channel)
 
     case is ChannelShouldQuiesceEvent:
       self.initiateGracefulShutdown(context: context)
@@ -333,14 +333,48 @@ final class ServerConnectionManagementHandler: ChannelDuplexHandler {
   }
 }
 
-extension ServerConnectionManagementHandler: NIOHTTP2StreamDelegate {
-  func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
+extension ServerConnectionManagementHandler {
+  struct HTTP2StreamDelegate: @unchecked Sendable, NIOHTTP2StreamDelegate {
+    // @unchecked is okay: the only methods do the appropriate event-loop dance.
+
+    private let handler: ServerConnectionManagementHandler
+
+    init(_ handler: ServerConnectionManagementHandler) {
+      self.handler = handler
+    }
+
+    func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
+      if self.handler.eventLoop.inEventLoop {
+        self.handler._streamCreated(id, channel: channel)
+      } else {
+        self.handler.eventLoop.execute {
+          self.handler._streamCreated(id, channel: channel)
+        }
+      }
+    }
+
+    func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
+      if self.handler.eventLoop.inEventLoop {
+        self.handler._streamClosed(id, channel: channel)
+      } else {
+        self.handler.eventLoop.execute {
+          self.handler._streamClosed(id, channel: channel)
+        }
+      }
+    }
+  }
+
+  var http2StreamDelegate: HTTP2StreamDelegate {
+    return HTTP2StreamDelegate(self)
+  }
+
+  private func _streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
     // The connection isn't idle if a stream is open.
     self.maxIdleTimer?.cancel()
     self.state.streamOpened(id)
   }
 
-  func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
+  private func _streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
     guard let context = self.context else { return }
 
     switch self.state.streamClosed(id) {

+ 1 - 1
Tests/GRPCHTTP2CoreTests/Client/Connection/ClientConnectionHandlerTests.swift

@@ -246,7 +246,7 @@ extension ClientConnectionHandlerTests {
         keepaliveWithoutCalls: allowKeepaliveWithoutCalls
       )
 
-      self.streamDelegate = handler
+      self.streamDelegate = handler.http2StreamDelegate
       self.channel = EmbeddedChannel(handler: handler, loop: loop)
     }
 

+ 1 - 1
Tests/GRPCHTTP2CoreTests/Server/Connection/ServerConnectionManagementHandlerTests.swift

@@ -379,7 +379,7 @@ extension ServerConnectionManagementHandlerTests {
         clock: self.clock
       )
 
-      self.streamDelegate = handler
+      self.streamDelegate = handler.http2StreamDelegate
       self.syncView = handler.syncView
       self.channel = EmbeddedChannel(handler: handler, loop: loop)
     }