Browse Source

Eliminate unnecessary allocations by using single-element writes (#1965)

Using the collection-based write was wrapping single elements into a collection and write(contentsOf:) was being called.
This always results in the element being wrapped in a Deque in the NIOAsyncWriter.
This commit moves the write(element:) method to be required for the RPCWriter protocol so that we can call the single element write and avoid this boxing to save some allocations.
Gustavo Cairo 1 year ago
parent
commit
42b27f3191

+ 5 - 0
Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift

@@ -31,6 +31,11 @@ struct MapRPCWriter<Value, Mapped>: RPCWriterProtocol {
     self.transform = transform
   }
 
+  @inlinable
+  func write(_ element: Element) async throws {
+    try await self.base.write(self.transform(element))
+  }
+
   @inlinable
   func write(contentsOf elements: some Sequence<Value>) async throws {
     let transformed = elements.lazy.map { self.transform($0) }

+ 5 - 0
Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift

@@ -31,6 +31,11 @@ struct MessageToRPCResponsePartWriter<Serializer: MessageSerializer>: RPCWriterP
     self.base = RPCWriter(wrapping: base)
   }
 
+  @inlinable
+  func write(_ element: Element) async throws {
+    try await self.base.write(.message(self.serializer.serialize(element)))
+  }
+
   @inlinable
   func write(contentsOf elements: some Sequence<Serializer.Message>) async throws {
     let requestParts = try elements.map { message -> RPCResponsePart in

+ 5 - 0
Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift

@@ -31,6 +31,11 @@ struct SerializingRPCWriter<Serializer: MessageSerializer>: RPCWriterProtocol {
     self.base = RPCWriter(wrapping: base)
   }
 
+  @inlinable
+  func write(_ element: Element) async throws {
+    try await self.base.write(self.serializer.serialize(element))
+  }
+
   @inlinable
   func write(contentsOf elements: some Sequence<Serializer.Message>) async throws {
     let requestParts = try elements.map { message in

+ 12 - 1
Sources/GRPCCore/Streaming/RPCWriter+Closable.swift

@@ -28,9 +28,20 @@ extension RPCWriter {
       self.writer = other
     }
 
+    /// Writes a single element.
+    ///
+    /// This function suspends until the element has been accepted. Implementers can use this
+    /// to exert backpressure on callers.
+    ///
+    /// - Parameter element: The element to write.
+    @inlinable
+    public func write(_ element: Element) async throws {
+      try await self.writer.write(element)
+    }
+
     /// Writes a sequence of elements.
     ///
-    /// This function suspends until the elements have been accepted. Implements can use this
+    /// This function suspends until the elements have been accepted. Implementers can use this
     /// to exert backpressure on callers.
     ///
     /// - Parameter elements: The elements to write.

+ 11 - 1
Sources/GRPCCore/Streaming/RPCWriter.swift

@@ -26,9 +26,19 @@ public struct RPCWriter<Element>: Sendable, RPCWriterProtocol {
     self.writer = other
   }
 
+  /// Writes a single element.
+  ///
+  /// This function suspends until the element has been accepted. Implementers can use this
+  /// to exert backpressure on callers.
+  ///
+  /// - Parameter element: The element to write.
+  public func write(_ element: Element) async throws {
+    try await self.writer.write(element)
+  }
+
   /// Writes a sequence of elements.
   ///
-  /// This function suspends until the elements have been accepted. Implements can use this
+  /// This function suspends until the elements have been accepted. Implementers can use this
   /// to exert backpressure on callers.
   ///
   /// - Parameter elements: The elements to write.

+ 9 - 8
Sources/GRPCCore/Streaming/RPCWriterProtocol.swift

@@ -20,9 +20,17 @@ public protocol RPCWriterProtocol<Element>: Sendable {
   /// The type of value written.
   associatedtype Element
 
+  /// Writes a single element.
+  ///
+  /// This function suspends until the element has been accepted. Implementers can use this
+  /// to exert backpressure on callers.
+  ///
+  /// - Parameter element: The element to write.
+  func write(_ element: Element) async throws
+
   /// Writes a sequence of elements.
   ///
-  /// This function suspends until the elements have been accepted. Implements can use this
+  /// This function suspends until the elements have been accepted. Implementers can use this
   /// to exert backpressure on callers.
   ///
   /// - Parameter elements: The elements to write.
@@ -31,13 +39,6 @@ public protocol RPCWriterProtocol<Element>: Sendable {
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 extension RPCWriterProtocol {
-  /// Writes a single element into the sink.
-  ///
-  /// - Parameter element: The element to write.
-  public func write(_ element: Element) async throws {
-    try await self.write(contentsOf: CollectionOfOne(element))
-  }
-
   /// Writes an `AsyncSequence` of values into the sink.
   ///
   /// - Parameter elements: The elements to write.

+ 4 - 0
Sources/GRPCHTTP2Core/Client/Connection/Connection.swift

@@ -366,6 +366,10 @@ extension Connection {
         self.http2Stream = http2Stream
       }
 
+      func write(_ element: RPCRequestPart) async throws {
+        try await self.requestWriter.write(element)
+      }
+
       func write(contentsOf elements: some Sequence<Self.Element>) async throws {
         try await self.requestWriter.write(contentsOf: elements)
       }

+ 4 - 0
Sources/GRPCHTTP2Core/Server/Connection/ServerConnection.swift

@@ -35,6 +35,10 @@ public enum ServerConnection {
         self.http2Stream = http2Stream
       }
 
+      public func write(_ element: RPCResponsePart) async throws {
+        try await self.responseWriter.write(element)
+      }
+
       public func write(contentsOf elements: some Sequence<Self.Element>) async throws {
         try await self.responseWriter.write(contentsOf: elements)
       }

+ 6 - 0
Sources/GRPCInterceptors/HookedWriter.swift

@@ -32,6 +32,12 @@ struct HookedWriter<Element>: RPCWriterProtocol {
     self.afterEachWrite = afterEachWrite
   }
 
+  func write(_ element: Element) async throws {
+    self.beforeEachWrite()
+    try await self.writer.write(element)
+    self.afterEachWrite()
+  }
+
   func write(contentsOf elements: some Sequence<Element>) async throws {
     self.beforeEachWrite()
     try await self.writer.write(contentsOf: elements)

+ 10 - 2
Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift

@@ -31,6 +31,10 @@ extension RPCWriter {
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 private struct FailOnWrite<Element>: RPCWriterProtocol {
+  func write(_ element: Element) async throws {
+    XCTFail("Unexpected write")
+  }
+
   func write(contentsOf elements: some Sequence<Element>) async throws {
     XCTFail("Unexpected write")
   }
@@ -44,9 +48,13 @@ private struct AsyncStreamGatheringWriter<Element>: RPCWriterProtocol {
     self.continuation = continuation
   }
 
-  func write(contentsOf elements: some Sequence<Element>) async throws {
+  func write(_ element: Element) {
+    self.continuation.yield(element)
+  }
+
+  func write(contentsOf elements: some Sequence<Element>) {
     for element in elements {
-      self.continuation.yield(element)
+      self.write(element)
     }
   }
 }

+ 6 - 2
Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift

@@ -183,9 +183,13 @@ struct TestWriter<WriterElement: Sendable>: RPCWriterProtocol {
     self.streamContinuation = streamContinuation
   }
 
-  func write(contentsOf elements: some Sequence<Self.Element>) async throws {
+  func write(_ element: WriterElement) {
+    self.streamContinuation.yield(element)
+  }
+
+  func write(contentsOf elements: some Sequence<Self.Element>) {
     elements.forEach { element in
-      self.streamContinuation.yield(element)
+      self.write(element)
     }
   }
 }