Browse Source

Reduce allocations with NIOAny (#476)

* Make status a class

* Add size tests.

* Box protobuf Messages to avoid unnecessary copies through the pipeline.

* Rename Box to _Box

* Fix typo, update documentation

* Update LinuxMain
George Barnett 6 years ago
parent
commit
6f9b914064

+ 27 - 0
Sources/GRPC/Box.swift

@@ -0,0 +1,27 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+
+/// Provides a "box" to put a value in.
+///
+/// Allows large values to be passed around without being copied.
+public final class _Box<T> {
+  let value: T
+
+  init(_ value: T) {
+    self.value = value
+  }
+}

+ 2 - 2
Sources/GRPC/ClientCalls/ClientCall.swift

@@ -114,7 +114,7 @@ public protocol UnaryResponseClientCall: ClientCall {
 extension StreamingRequestClientCall {
   public func sendMessage(_ message: RequestMessage, flush: Bool = true) -> EventLoopFuture<Void> {
     return self.subchannel.flatMap { channel in
-      let writeFuture = channel.write(GRPCClientRequestPart.message(message))
+      let writeFuture = channel.write(GRPCClientRequestPart.message(_Box(message)))
       if flush {
         channel.flush()
       }
@@ -124,7 +124,7 @@ extension StreamingRequestClientCall {
 
   public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?, flush: Bool = true) {
     self.subchannel.whenSuccess { channel in
-      channel.write(GRPCClientRequestPart.message(message), promise: promise)
+      channel.write(GRPCClientRequestPart.message(_Box(message)), promise: promise)
       if flush {
         channel.flush()
       }

+ 1 - 1
Sources/GRPC/ClientCalls/ServerStreamingClientCall.swift

@@ -34,7 +34,7 @@ public final class ServerStreamingClientCall<RequestMessage: Message, ResponseMe
 
     let requestHandler = GRPCClientUnaryRequestChannelHandler<RequestMessage>(
       requestHead: makeRequestHead(path: path, host: connection.host, callOptions: callOptions),
-      request: request)
+      request: _Box(request))
 
     super.init(
       connection: connection,

+ 1 - 1
Sources/GRPC/ClientCalls/UnaryClientCall.swift

@@ -41,7 +41,7 @@ public final class UnaryClientCall<RequestMessage: Message, ResponseMessage: Mes
 
     let requestHandler = GRPCClientUnaryRequestChannelHandler<RequestMessage>(
       requestHead: makeRequestHead(path: path, host: connection.host, callOptions: callOptions),
-      request: request)
+      request: _Box(request))
 
     self.response = responseHandler.responsePromise.futureResult
     super.init(

+ 10 - 5
Sources/GRPC/GRPCClientCodec.swift

@@ -21,14 +21,18 @@ import SwiftProtobuf
 /// Outgoing gRPC package with a fixed message type.
 public enum GRPCClientRequestPart<RequestMessage: Message> {
   case head(HTTPRequestHead)
-  case message(RequestMessage)
+  // We box the message to keep the enum small enough to fit in `NIOAny` and avoid unnecessary
+  // allocations.
+  case message(_Box<RequestMessage>)
   case end
 }
 
 /// Incoming gRPC package with a fixed message type.
 public enum GRPCClientResponsePart<ResponseMessage: Message> {
   case headers(HTTPHeaders)
-  case message(ResponseMessage)
+  // We box the message to keep the enum small enough to fit in `NIOAny` and avoid unnecessary
+  // allocations.
+  case message(_Box<ResponseMessage>)
   case status(GRPCStatus)
 }
 
@@ -53,7 +57,8 @@ extension GRPCClientCodec: ChannelInboundHandler {
       // Force unwrapping is okay here; we're reading the readable bytes.
       let messageAsData = messageBuffer.readData(length: messageBuffer.readableBytes)!
       do {
-        context.fireChannelRead(self.wrapInboundOut(.message(try ResponseMessage(serializedData: messageAsData))))
+        let box = _Box(try ResponseMessage(serializedData: messageAsData))
+        context.fireChannelRead(self.wrapInboundOut(.message(box)))
       } catch {
         context.fireErrorCaught(GRPCError.client(.responseProtoDeserializationFailure))
       }
@@ -75,9 +80,9 @@ extension GRPCClientCodec: ChannelOutboundHandler {
     case .head(let head):
       context.write(self.wrapOutboundOut(.head(head)), promise: promise)
 
-    case .message(let message):
+    case .message(let box):
       do {
-        context.write(self.wrapOutboundOut(.message(try message.serializedData())), promise: promise)
+        context.write(self.wrapOutboundOut(.message(try box.value.serializedData())), promise: promise)
       } catch {
         let error = GRPCError.client(.requestProtoSerializationFailure)
         promise?.fail(error)

+ 2 - 2
Sources/GRPC/GRPCClientRequestChannelHandler.swift

@@ -41,9 +41,9 @@ internal class GRPCClientRequestChannelHandler<RequestMessage: Message>: Channel
 /// Sends the request head, message and end on `channelActive(context:)`.
 internal final class GRPCClientUnaryRequestChannelHandler<RequestMessage: Message>: GRPCClientRequestChannelHandler<RequestMessage> {
   /// The request to send.
-  internal let request: RequestMessage
+  internal let request: _Box<RequestMessage>
 
-  init(requestHead: HTTPRequestHead, request: RequestMessage) {
+  init(requestHead: HTTPRequestHead, request: _Box<RequestMessage>) {
     self.request = request
     super.init(requestHead: requestHead)
   }

+ 7 - 7
Sources/GRPC/GRPCClientResponseChannelHandler.swift

@@ -121,7 +121,7 @@ internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: Chann
   /// Called when a response is received. Subclasses should override this method.
   ///
   /// - Parameter response: The received response.
-  internal func onResponse(_ response: ResponseMessage) {
+  internal func onResponse(_ response: _Box<ResponseMessage>) {
     // no-op
   }
 
@@ -145,13 +145,13 @@ internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: Chann
       self.initialMetadataPromise.succeed(headers)
       self.inboundState = .expectingMessageOrStatus
 
-    case .message(let message):
+    case .message(let boxedMessage):
       guard self.inboundState == .expectingMessageOrStatus else {
         self.errorCaught(context: context, error: GRPCError.client(.responseCardinalityViolation))
         return
       }
 
-      self.onResponse(message)
+      self.onResponse(boxedMessage)
       self.inboundState = self.responseArity.inboundStateAfterResponse
 
     case .status(let status):
@@ -225,8 +225,8 @@ final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRP
   /// Succeeds the response promise with the given response.
   ///
   /// - Parameter response: The response received from the service.
-  override func onResponse(_ response: ResponseMessage) {
-    self.responsePromise.succeed(response)
+  override func onResponse(_ response: _Box<ResponseMessage>) {
+    self.responsePromise.succeed(response.value)
   }
 
   /// Fails the response promise if the given status is not `.ok`.
@@ -266,8 +266,8 @@ final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>:
   /// Calls a user-provided handler with the given response.
   ///
   /// - Parameter response: The response received from the service.
-  override func onResponse(_ response: ResponseMessage) {
-    self.responseHandler(response)
+  override func onResponse(_ response: _Box<ResponseMessage>) {
+    self.responseHandler(response.value)
   }
 }
 

+ 7 - 1
Sources/GRPC/GRPCStatus.swift

@@ -4,7 +4,13 @@ import NIOHTTP1
 import NIOHTTP2
 
 /// Encapsulates the result of a gRPC call.
-public struct GRPCStatus: Error, Equatable {
+///
+/// We use a `class` here for a couple of reasons:
+/// - The size of the equivalent `struct` is larger than the value buffer in an existential
+///   container so would incur a heap allocation each time a `GRPCStatus` is passed to a function
+///   taking an `Error`.
+/// - We aren't using value semantics (since all properties are constant).
+public final class GRPCStatus: Error {
   /// The code to return in the `grpc-status` header.
   public let code: StatusCode
   /// The message to return in the `grpc-message` header.

+ 2 - 2
Tests/GRPCTests/GRPCChannelHandlerTests.swift

@@ -15,7 +15,7 @@ class GRPCChannelHandlerTests: GRPCChannelHandlerResponseCapturingTestCase {
     XCTAssertEqual([expectedError], errorCollector.asGRPCServerErrors)
 
     responses[0].assertStatus { status in
-      XCTAssertEqual(status, expectedError.asGRPCStatus())
+      assertEqualStatusIgnoringTrailers(status, expectedError.asGRPCStatus())
     }
   }
 
@@ -53,7 +53,7 @@ class GRPCChannelHandlerTests: GRPCChannelHandlerResponseCapturingTestCase {
 
     responses[0].assertHeaders()
     responses[1].assertStatus { status in
-      XCTAssertEqual(status, expectedError.asGRPCStatus())
+      assertEqualStatusIgnoringTrailers(status, expectedError.asGRPCStatus())
     }
   }
 }

+ 56 - 0
Tests/GRPCTests/GRPCTypeSizeTests.swift

@@ -0,0 +1,56 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import GRPC
+import XCTest
+
+/// These test check the size of types which get wrapped in `NIOAny`. If the size of the type is
+/// greater than 24 bytes (the size of the value buffer in an existential container) then it will
+/// incur an additional heap allocation.
+///
+/// This commit message explains the problem and one way to mitigate the issue:
+/// https://github.com/apple/swift-nio-http2/commit/4097c3a807a83661f0add383edef29b426e666cb
+///
+/// Session 416 of WWDC 2016 also provides a good explanation of existential containers.
+class GRPCTypeSizeTests: XCTestCase {
+  let existentialContainerBufferSize = 24
+
+  func checkSize<T>(of: T.Type, file: StaticString = #file, line: UInt = #line) {
+    XCTAssertLessThanOrEqual(MemoryLayout<T>.size, self.existentialContainerBufferSize, file: file, line: line)
+  }
+
+  // `GRPCStatus` isn't wrapped in `NIOAny` but is passed around through functions taking a type
+  // conforming to `Error`, so size is important here too.
+  func testGRPCStatus() {
+    self.checkSize(of: GRPCStatus.self)
+  }
+
+  func testRawGRPCClientRequestPart() {
+    self.checkSize(of: RawGRPCClientRequestPart.self)
+  }
+
+  func testRawGRPCClientResponsePart() {
+    self.checkSize(of: RawGRPCClientResponsePart.self)
+  }
+
+  func testGRPCClientRequestPart() {
+    self.checkSize(of: GRPCClientRequestPart<Echo_EchoRequest>.self)
+  }
+
+  func testGRPCClientResponsePart() {
+    self.checkSize(of: GRPCClientResponsePart<Echo_EchoResponse>.self)
+  }
+}

+ 5 - 5
Tests/GRPCTests/HTTP1ToRawGRPCServerCodecTests.swift

@@ -28,7 +28,7 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas
 
     responses[0].assertHeaders()
     responses[1].assertStatus { status in
-      XCTAssertEqual(status, expectedError.asGRPCStatus())
+      assertEqualStatusIgnoringTrailers(status, expectedError.asGRPCStatus())
     }
   }
 
@@ -57,7 +57,7 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas
     responses[0].assertHeaders()
     responses[1].assertMessage()
     responses[2].assertStatus { status in
-      XCTAssertEqual(status, .ok)
+      assertEqualStatusIgnoringTrailers(status, .ok)
     }
   }
 
@@ -75,7 +75,7 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas
 
     responses[0].assertHeaders()
     responses[1].assertStatus { status in
-      XCTAssertEqual(status, expectedError.asGRPCStatus())
+      assertEqualStatusIgnoringTrailers(status, expectedError.asGRPCStatus())
     }
   }
 
@@ -103,7 +103,7 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas
 
     responses[0].assertHeaders()
     responses[1].assertStatus { status in
-      XCTAssertEqual(status, .processingError)
+      assertEqualStatusIgnoringTrailers(status, .processingError)
     }
   }
 
@@ -126,7 +126,7 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas
     responses[0].assertHeaders()
     responses[1].assertMessage()
     responses[2].assertStatus { status in
-      XCTAssertEqual(status, .ok)
+      assertEqualStatusIgnoringTrailers(status, .ok)
     }
   }
 

+ 14 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -210,6 +210,19 @@ extension GRPCStatusMessageMarshallerTests {
     ]
 }
 
+extension GRPCTypeSizeTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__GRPCTypeSizeTests = [
+        ("testGRPCClientRequestPart", testGRPCClientRequestPart),
+        ("testGRPCClientResponsePart", testGRPCClientResponsePart),
+        ("testGRPCStatus", testGRPCStatus),
+        ("testRawGRPCClientRequestPart", testRawGRPCClientRequestPart),
+        ("testRawGRPCClientResponsePart", testRawGRPCClientResponsePart),
+    ]
+}
+
 extension HTTP1ToRawGRPCServerCodecTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -307,6 +320,7 @@ public func __allTests() -> [XCTestCaseEntry] {
         testCase(GRPCSecureInteroperabilityTests.__allTests__GRPCSecureInteroperabilityTests),
         testCase(GRPCStatusCodeTests.__allTests__GRPCStatusCodeTests),
         testCase(GRPCStatusMessageMarshallerTests.__allTests__GRPCStatusMessageMarshallerTests),
+        testCase(GRPCTypeSizeTests.__allTests__GRPCTypeSizeTests),
         testCase(HTTP1ToRawGRPCServerCodecTests.__allTests__HTTP1ToRawGRPCServerCodecTests),
         testCase(LengthPrefixedMessageReaderTests.__allTests__LengthPrefixedMessageReaderTests),
         testCase(ServerDelayedThrowingTests.__allTests__ServerDelayedThrowingTests),