Browse Source

Merge pull request #217 from MrMage/fix-leaks

Fix a few memory leaks related to unreleased slices in observers and metadata arrays
Tim Burks 7 years ago
parent
commit
600644a931

+ 2 - 0
Sources/CgRPC/shim/cgrpc.h

@@ -191,10 +191,12 @@ void cgrpc_operations_add_operation(cgrpc_operations *call, cgrpc_observer *obse
 // metadata support
 cgrpc_metadata_array *cgrpc_metadata_array_create(void);
 void cgrpc_metadata_array_destroy(cgrpc_metadata_array *array);
+void cgrpc_metadata_array_unref_fields(cgrpc_metadata_array *array);
 size_t cgrpc_metadata_array_get_count(cgrpc_metadata_array *array);
 char *cgrpc_metadata_array_copy_key_at_index(cgrpc_metadata_array *array, size_t index);
 char *cgrpc_metadata_array_copy_value_at_index(cgrpc_metadata_array *array, size_t index);
 void cgrpc_metadata_array_move_metadata(cgrpc_metadata_array *dest, cgrpc_metadata_array *src);
+cgrpc_metadata_array *cgrpc_metadata_array_copy(cgrpc_metadata_array *src);
 void cgrpc_metadata_array_append_metadata(cgrpc_metadata_array *metadata, const char *key, const char *value);
 
 // mutex support

+ 5 - 4
Sources/CgRPC/shim/channel.c

@@ -72,24 +72,25 @@ void cgrpc_channel_destroy(cgrpc_channel *c) {
   free(c);
 }
 
-grpc_slice host_slice;
-
 cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
                                       const char *method,
                                       const char *host,
                                       double timeout) {
   // create call
-  host_slice = grpc_slice_from_copied_string(host);
+  grpc_slice host_slice = grpc_slice_from_copied_string(host);
+  grpc_slice method_slice = grpc_slice_from_copied_string(method);
   gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
   // The resulting call will have a retain call of +1. We'll release it in `cgrpc_call_destroy()`.
   grpc_call *channel_call = grpc_channel_create_call(channel->channel,
                                                      NULL,
                                                      GRPC_PROPAGATE_DEFAULTS,
                                                      channel->completion_queue,
-                                                     grpc_slice_from_copied_string(method),
+                                                     method_slice,
                                                      &host_slice,
                                                      deadline,
                                                      NULL);
+  grpc_slice_unref(host_slice);
+  grpc_slice_unref(method_slice);
   cgrpc_call *call = (cgrpc_call *) malloc(sizeof(cgrpc_call));
   memset(call, 0, sizeof(cgrpc_call));
   call->call = channel_call;

+ 0 - 1
Sources/CgRPC/shim/internal.h

@@ -96,7 +96,6 @@ typedef struct {
   grpc_metadata_array trailing_metadata_recv;
   grpc_status_code server_status;
   grpc_slice server_details;
-  size_t server_details_capacity;
 } cgrpc_observer_recv_status_on_client;
 
 typedef struct {

+ 22 - 0
Sources/CgRPC/shim/metadata.c

@@ -28,6 +28,13 @@ cgrpc_metadata_array *cgrpc_metadata_array_create() {
   return metadata;
 }
 
+void cgrpc_metadata_array_unref_fields(cgrpc_metadata_array *array) {
+  for (size_t i = 0; i < array->count; i++) {
+    grpc_slice_unref(array->metadata[i].key);
+    grpc_slice_unref(array->metadata[i].value);
+  }
+}
+
 void cgrpc_metadata_array_destroy(cgrpc_metadata_array *array) {
   grpc_metadata_array_destroy(array);
   gpr_free(array);
@@ -64,6 +71,21 @@ void cgrpc_metadata_array_move_metadata(cgrpc_metadata_array *destination,
   source->metadata = NULL;
 }
 
+cgrpc_metadata_array *cgrpc_metadata_array_copy(cgrpc_metadata_array *src) {
+  cgrpc_metadata_array *dst = cgrpc_metadata_array_create();
+  if (src->count > 0) {
+    dst->capacity = src->count;
+    dst->metadata = gpr_malloc(dst->capacity * sizeof(grpc_metadata));
+    dst->count = src->count;
+    for (size_t i = 0; i < src->count; i++) {
+      dst->metadata[i].key = grpc_slice_ref(src->metadata[i].key);
+      dst->metadata[i].value = grpc_slice_ref(src->metadata[i].value);
+      dst->metadata[i].flags = src->metadata[i].flags;
+    }
+  }
+  return dst;
+}
+
 void cgrpc_metadata_array_append_metadata(cgrpc_metadata_array *metadata, const char *key, const char *value) {
   if (metadata->count >= metadata->capacity) {
     size_t new_capacity = 2 * metadata->capacity;

+ 2 - 1
Sources/CgRPC/shim/observers.c

@@ -127,7 +127,6 @@ void cgrpc_observer_apply(cgrpc_observer *observer, grpc_op *op) {
       grpc_metadata_array_init(&(obs->trailing_metadata_recv));
       obs->server_status = GRPC_STATUS_OK;
       obs->server_details = grpc_slice_from_copied_string("");
-      obs->server_details_capacity = 0;
       op->data.recv_status_on_client.trailing_metadata = &(obs->trailing_metadata_recv);
       op->data.recv_status_on_client.status = &(obs->server_status);
       op->data.recv_status_on_client.status_details = &(obs->server_details);
@@ -167,6 +166,7 @@ void cgrpc_observer_destroy(cgrpc_observer *observer) {
     }
     case GRPC_OP_SEND_STATUS_FROM_SERVER: {
       cgrpc_observer_send_status_from_server *obs = (cgrpc_observer_send_status_from_server *) observer;
+      grpc_slice_unref(obs->status_details);
       free(obs);
       break;
     }
@@ -185,6 +185,7 @@ void cgrpc_observer_destroy(cgrpc_observer *observer) {
     case GRPC_OP_RECV_STATUS_ON_CLIENT: {
       cgrpc_observer_recv_status_on_client *obs = (cgrpc_observer_recv_status_on_client *) observer;
       grpc_metadata_array_destroy(&(obs->trailing_metadata_recv));
+      grpc_slice_unref(obs->server_details);
       free(obs);
       break;
     }

+ 1 - 1
Sources/Examples/Echo/main.swift

@@ -45,7 +45,7 @@ func buildEchoService(_ ssl: Bool, _ address: String, _ port: String, _: String)
   } else {
     service = Echo_EchoServiceClient(address: address + ":" + port, secure: false)
   }
-  service.metadata = Metadata([
+  service.metadata = try! Metadata([
     "x-goog-api-key": "YOUR_API_KEY",
     "x-ios-bundle-identifier": "io.grpc.echo"
   ])

+ 3 - 3
Sources/Examples/Simple/main.swift

@@ -32,7 +32,7 @@ func client() throws {
     print("calling " + method)
     let call = c.makeCall(method)
 
-    let metadata = Metadata([
+    let metadata = try Metadata([
       "x": "xylophone",
       "y": "yu",
       "z": "zither"
@@ -83,7 +83,7 @@ func server() throws {
           + ":" + initialMetadata.value(i)!)
       }
 
-      let initialMetadataToSend = Metadata([
+      let initialMetadataToSend = try Metadata([
         "a": "Apple",
         "b": "Banana",
         "c": "Cherry"
@@ -99,7 +99,7 @@ func server() throws {
       }
 
       let replyMessage = "hello, client!"
-      let trailingMetadataToSend = Metadata([
+      let trailingMetadataToSend = try Metadata([
         "0": "zero",
         "1": "one",
         "2": "two"

+ 7 - 5
Sources/SwiftGRPC/Core/Handler.swift

@@ -81,7 +81,9 @@ public class Handler {
   /// Fills the handler properties with information about the received request
   ///
   func requestCall(tag: Int) throws {
-    let error = cgrpc_handler_request_call(underlyingHandler, requestMetadata.underlyingArray, UnsafeMutableRawPointer(bitPattern: tag))
+    let error = cgrpc_handler_request_call(underlyingHandler,
+                                           try requestMetadata.getUnderlyingArrayAndTransferFieldOwnership(),
+                                           UnsafeMutableRawPointer(bitPattern: tag))
     if error != GRPC_CALL_OK {
       throw CallError.callError(grpcCallError: error)
     }
@@ -100,7 +102,7 @@ public class Handler {
                            completion: ((Bool) -> Void)? = nil) throws {
     try call.perform(OperationGroup(
       call: call,
-      operations: [.sendInitialMetadata(initialMetadata)],
+      operations: [.sendInitialMetadata(initialMetadata.copy())],
       completion: completion != nil
         ? { operationGroup in completion?(operationGroup.success) }
         : nil))
@@ -113,7 +115,7 @@ public class Handler {
     try call.perform(OperationGroup(
       call: call,
       operations: [
-        .sendInitialMetadata(initialMetadata),
+        .sendInitialMetadata(initialMetadata.copy()),
         .receiveMessage
     ]) { operationGroup in
       if operationGroup.success {
@@ -134,7 +136,7 @@ public class Handler {
       operations: [
         .sendMessage(messageBuffer),
         .receiveCloseOnServer,
-        .sendStatusFromServer(status.code, status.message, status.trailingMetadata)
+        .sendStatusFromServer(status.code, status.message, status.trailingMetadata.copy())
     ]) { _ in
       completion?()
       self.shutdown()
@@ -148,7 +150,7 @@ public class Handler {
       call: call,
       operations: [
         .receiveCloseOnServer,
-        .sendStatusFromServer(status.code, status.message, status.trailingMetadata)
+        .sendStatusFromServer(status.code, status.message, status.trailingMetadata.copy())
     ]) { _ in
       completion?()
       self.shutdown()

+ 33 - 13
Sources/SwiftGRPC/Core/Metadata.swift

@@ -20,25 +20,42 @@ import Foundation // for String.Encoding
 
 /// Metadata sent with gRPC messages
 public class Metadata: CustomStringConvertible {
+  public enum Error: Swift.Error {
+    /// Field ownership can only be transferred once. Likewise, it is not advisable to write to a metadata array whose
+    /// fields we do not own.
+    case doesNotOwnFields
+  }
+  
   /// Pointer to underlying C representation
-  let underlyingArray: UnsafeMutableRawPointer
+  fileprivate let underlyingArray: UnsafeMutableRawPointer
+  /// Ownership of the fields inside metadata arrays provided by `grpc_op_recv_initial_metadata` and
+  /// `grpc_op_recv_status_on_client` is retained by the gRPC library. Similarly, passing metadata to gRPC for sending
+  /// to the client for sending/receiving also transfers ownership. However, before we have passed that metadata to
+  /// gRPC, we are still responsible for releasing its fields. This variable tracks that.
+  fileprivate var ownsFields: Bool
 
-  init(underlyingArray: UnsafeMutableRawPointer) {
+  init(underlyingArray: UnsafeMutableRawPointer, ownsFields: Bool) {
     self.underlyingArray = underlyingArray
+    self.ownsFields = ownsFields
   }
 
   public init() {
     underlyingArray = cgrpc_metadata_array_create()
+    ownsFields = true
   }
 
-  public init(_ pairs: [String: String]) {
+  public init(_ pairs: [String: String]) throws {
     underlyingArray = cgrpc_metadata_array_create()
+    ownsFields = true
     for (key, value) in pairs {
-      add(key: key, value: value)
+      try add(key: key, value: value)
     }
   }
 
   deinit {
+    if ownsFields {
+      cgrpc_metadata_array_unref_fields(underlyingArray)
+    }
     cgrpc_metadata_array_destroy(underlyingArray)
   }
 
@@ -64,7 +81,10 @@ public class Metadata: CustomStringConvertible {
     return String(cString: valueData, encoding: String.Encoding.utf8)
   }
   
-  public func add(key: String, value: String) {
+  public func add(key: String, value: String) throws {
+    if !ownsFields {
+      throw Error.doesNotOwnFields
+    }
     cgrpc_metadata_array_append_metadata(underlyingArray, key, value)
   }
   
@@ -79,14 +99,14 @@ public class Metadata: CustomStringConvertible {
   }
   
   public func copy() -> Metadata {
-    let copy = Metadata()
-    for index in 0..<count() {
-      let keyData = cgrpc_metadata_array_copy_key_at_index(underlyingArray, index)!
-      defer { cgrpc_free_copied_string(keyData) }
-      let valueData = cgrpc_metadata_array_copy_value_at_index(underlyingArray, index)!
-      defer { cgrpc_free_copied_string(valueData) }
-      cgrpc_metadata_array_append_metadata(copy.underlyingArray, keyData, valueData)
+    return Metadata(underlyingArray: cgrpc_metadata_array_copy(underlyingArray), ownsFields: true)
+  }
+  
+  func getUnderlyingArrayAndTransferFieldOwnership() throws -> UnsafeMutableRawPointer {
+    if !ownsFields {
+      throw Error.doesNotOwnFields
     }
-    return copy
+    ownsFields = false
+    return underlyingArray
   }
 }

+ 9 - 7
Sources/SwiftGRPC/Core/OperationGroup.swift

@@ -54,18 +54,18 @@ class OperationGroup {
   ///
   /// - Parameter: operation: the operation to observe
   /// - Returns: the observer
-  private func underlyingObserverForOperation(operation: Operation) -> UnsafeMutableRawPointer {
+  private func underlyingObserverForOperation(operation: Operation) throws -> UnsafeMutableRawPointer {
     let underlyingObserver: UnsafeMutableRawPointer
     switch operation {
     case .sendInitialMetadata(let metadata):
-      underlyingObserver = cgrpc_observer_create_send_initial_metadata(metadata.underlyingArray)!
+      underlyingObserver = cgrpc_observer_create_send_initial_metadata(try metadata.getUnderlyingArrayAndTransferFieldOwnership())!
     case .sendMessage(let message):
       underlyingObserver = cgrpc_observer_create_send_message()!
       cgrpc_observer_send_message_set_message(underlyingObserver, message.underlyingByteBuffer)
     case .sendCloseFromClient:
       underlyingObserver = cgrpc_observer_create_send_close_from_client()!
     case .sendStatusFromServer(let statusCode, let statusMessage, let metadata):
-      underlyingObserver = cgrpc_observer_create_send_status_from_server(metadata.underlyingArray)!
+      underlyingObserver = cgrpc_observer_create_send_status_from_server(try metadata.getUnderlyingArrayAndTransferFieldOwnership())!
       cgrpc_observer_send_status_from_server_set_status(underlyingObserver, Int32(statusCode.rawValue))
       cgrpc_observer_send_status_from_server_set_status_details(underlyingObserver, statusMessage)
     case .receiveInitialMetadata:
@@ -85,7 +85,7 @@ class OperationGroup {
   /// - Parameter operations: an array of operations
   init(call: Call,
        operations: [Operation],
-       completion: ((OperationGroup) -> Void)? = nil) {
+       completion: ((OperationGroup) -> Void)? = nil) throws {
     self.call = call
     self.operations = operations
     self.completion = completion
@@ -98,7 +98,7 @@ class OperationGroup {
     underlyingOperations = cgrpc_operations_create()
     cgrpc_operations_reserve_space_for_operations(underlyingOperations, Int32(operations.count))
     for operation in operations {
-      let underlyingObserver = underlyingObserverForOperation(operation: operation)
+      let underlyingObserver = try underlyingObserverForOperation(operation: operation)
       underlyingObservers.append(underlyingObserver)
       cgrpc_operations_add_operation(underlyingOperations, underlyingObserver)
     }
@@ -143,7 +143,8 @@ class OperationGroup {
       switch operation {
       case .receiveInitialMetadata:
         cachedInitialMetadata = Metadata(
-          underlyingArray: cgrpc_observer_recv_initial_metadata_get_metadata(underlyingObservers[i]))
+          underlyingArray: cgrpc_observer_recv_initial_metadata_get_metadata(underlyingObservers[i]),
+          ownsFields: false)
         return cachedInitialMetadata!
       default:
         continue
@@ -196,7 +197,8 @@ class OperationGroup {
       switch operation {
       case .receiveStatusOnClient:
         cachedTrailingMetadata = Metadata(
-          underlyingArray: cgrpc_observer_recv_status_on_client_get_metadata(underlyingObservers[i]))
+          underlyingArray: cgrpc_observer_recv_status_on_client_get_metadata(underlyingObservers[i]),
+          ownsFields: false)
         return cachedTrailingMetadata!
       default:
         continue

+ 9 - 9
Tests/SwiftGRPCTests/GRPCTests.swift

@@ -178,7 +178,7 @@ func callUnaryIndividual(channel: Channel, message: Data, shouldSucceed: Bool) t
   let sem = DispatchSemaphore(value: 0)
   let method = hello
   let call = channel.makeCall(method)
-  let metadata = Metadata(initialClientMetadata)
+  let metadata = try Metadata(initialClientMetadata)
   try call.start(.unary, metadata: metadata, message: message) {
     response in
     // verify the basic response from the server
@@ -224,7 +224,7 @@ func callUnaryIndividual(channel: Channel, message: Data, shouldSucceed: Bool) t
 
 func callServerStream(channel: Channel) throws {
   let message = evenClientText.data(using: .utf8)
-  let metadata = Metadata(initialClientMetadata)
+  let metadata = try Metadata(initialClientMetadata)
 
   let sem = DispatchSemaphore(value: 0)
   let method = helloServerStream
@@ -266,7 +266,7 @@ let clientPing = "ping"
 let serverPong = "pong"
 
 func callBiDiStream(channel: Channel) throws {
-  let metadata = Metadata(initialClientMetadata)
+  let metadata = try Metadata(initialClientMetadata)
 
   let sem = DispatchSemaphore(value: 0)
   let method = helloBiDiStream
@@ -353,7 +353,7 @@ func handleUnary(requestHandler: Handler) throws {
   XCTAssertEqual(requestHandler.method, hello)
   let initialMetadata = requestHandler.requestMetadata
   verify_metadata(initialMetadata, expected: initialClientMetadata)
-  let initialMetadataToSend = Metadata(initialServerMetadata)
+  let initialMetadataToSend = try Metadata(initialServerMetadata)
   let receiveSem = DispatchSemaphore(value: 0)
   var inputMessage: Data?
   try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
@@ -377,7 +377,7 @@ func handleUnary(requestHandler: Handler) throws {
   let replyMessage = (inputMessage == nil || inputMessage!.count < sizeThresholdForReturningDataVerbatim)
     ? serverText.data(using: .utf8)!
     : inputMessage!
-  let trailingMetadataToSend = Metadata(trailingServerMetadata)
+  let trailingMetadataToSend = try Metadata(trailingServerMetadata)
   if let inputMessage = inputMessage,
     inputMessage.count >= sizeThresholdForReturningDataVerbatim
       || inputMessage == evenClientText.data(using: .utf8)! {
@@ -398,7 +398,7 @@ func handleServerStream(requestHandler: Handler) throws {
   let initialMetadata = requestHandler.requestMetadata
   verify_metadata(initialMetadata, expected: initialClientMetadata)
 
-  let initialMetadataToSend = Metadata(initialServerMetadata)
+  let initialMetadataToSend = try Metadata(initialServerMetadata)
   try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
     if let messageData = $0 {
       let messageString = String(data: messageData, encoding: .utf8)
@@ -416,7 +416,7 @@ func handleServerStream(requestHandler: Handler) throws {
     requestHandler.call.messageQueueEmpty.wait()
   }
 
-  let trailingMetadataToSend = Metadata(trailingServerMetadata)
+  let trailingMetadataToSend = try Metadata(trailingServerMetadata)
   try requestHandler.sendStatus(ServerStatus(
     // We need to return status OK here, as it seems like the server might never send out the last few messages once it
     // has been asked to send a non-OK status. Alternatively, we could send a non-OK status here, but then we would need
@@ -432,7 +432,7 @@ func handleBiDiStream(requestHandler: Handler) throws {
   let initialMetadata = requestHandler.requestMetadata
   verify_metadata(initialMetadata, expected: initialClientMetadata)
 
-  let initialMetadataToSend = Metadata(initialServerMetadata)
+  let initialMetadataToSend = try Metadata(initialServerMetadata)
   let sendMetadataSem = DispatchSemaphore(value: 0)
   try requestHandler.sendMetadata(initialMetadata: initialMetadataToSend) { _ in
     _ = sendMetadataSem.signal()
@@ -463,7 +463,7 @@ func handleBiDiStream(requestHandler: Handler) throws {
     requestHandler.call.messageQueueEmpty.wait()
   }
 
-  let trailingMetadataToSend = Metadata(trailingServerMetadata)
+  let trailingMetadataToSend = try Metadata(trailingServerMetadata)
   let sem = DispatchSemaphore(value: 0)
   try requestHandler.sendStatus(ServerStatus(
     // We need to return status OK here, as it seems like the server might never send out the last few messages once it