Browse Source

Fix a few memory leaks related to unreleased slices in observers and metadata arrays.

Daniel Alm 7 years ago
parent
commit
e3e2bfa3aa

+ 2 - 0
Sources/CgRPC/shim/cgrpc.h

@@ -191,10 +191,12 @@ void cgrpc_operations_add_operation(cgrpc_operations *call, cgrpc_observer *obse
 // metadata support
 cgrpc_metadata_array *cgrpc_metadata_array_create(void);
 void cgrpc_metadata_array_destroy(cgrpc_metadata_array *array);
+void cgrpc_metadata_array_unref_fields(cgrpc_metadata_array *array);
 size_t cgrpc_metadata_array_get_count(cgrpc_metadata_array *array);
 char *cgrpc_metadata_array_copy_key_at_index(cgrpc_metadata_array *array, size_t index);
 char *cgrpc_metadata_array_copy_value_at_index(cgrpc_metadata_array *array, size_t index);
 void cgrpc_metadata_array_move_metadata(cgrpc_metadata_array *dest, cgrpc_metadata_array *src);
+cgrpc_metadata_array *cgrpc_metadata_array_copy(cgrpc_metadata_array *src);
 void cgrpc_metadata_array_append_metadata(cgrpc_metadata_array *metadata, const char *key, const char *value);
 
 // mutex support

+ 5 - 4
Sources/CgRPC/shim/channel.c

@@ -72,24 +72,25 @@ void cgrpc_channel_destroy(cgrpc_channel *c) {
   free(c);
 }
 
-grpc_slice host_slice;
-
 cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
                                       const char *method,
                                       const char *host,
                                       double timeout) {
   // create call
-  host_slice = grpc_slice_from_copied_string(host);
+  grpc_slice host_slice = grpc_slice_from_copied_string(host);
+  grpc_slice method_slice = grpc_slice_from_copied_string(method);
   gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
   // The resulting call will have a retain call of +1. We'll release it in `cgrpc_call_destroy()`.
   grpc_call *channel_call = grpc_channel_create_call(channel->channel,
                                                      NULL,
                                                      GRPC_PROPAGATE_DEFAULTS,
                                                      channel->completion_queue,
-                                                     grpc_slice_from_copied_string(method),
+                                                     method_slice,
                                                      &host_slice,
                                                      deadline,
                                                      NULL);
+  grpc_slice_unref(host_slice);
+  grpc_slice_unref(method_slice);
   cgrpc_call *call = (cgrpc_call *) malloc(sizeof(cgrpc_call));
   memset(call, 0, sizeof(cgrpc_call));
   call->call = channel_call;

+ 0 - 1
Sources/CgRPC/shim/internal.h

@@ -96,7 +96,6 @@ typedef struct {
   grpc_metadata_array trailing_metadata_recv;
   grpc_status_code server_status;
   grpc_slice server_details;
-  size_t server_details_capacity;
 } cgrpc_observer_recv_status_on_client;
 
 typedef struct {

+ 22 - 0
Sources/CgRPC/shim/metadata.c

@@ -28,6 +28,13 @@ cgrpc_metadata_array *cgrpc_metadata_array_create() {
   return metadata;
 }
 
+void cgrpc_metadata_array_unref_fields(cgrpc_metadata_array *array) {
+  for (size_t i = 0; i < array->count; i++) {
+    grpc_slice_unref(array->metadata[i].key);
+    grpc_slice_unref(array->metadata[i].value);
+  }
+}
+
 void cgrpc_metadata_array_destroy(cgrpc_metadata_array *array) {
   grpc_metadata_array_destroy(array);
   gpr_free(array);
@@ -64,6 +71,21 @@ void cgrpc_metadata_array_move_metadata(cgrpc_metadata_array *destination,
   source->metadata = NULL;
 }
 
+cgrpc_metadata_array *cgrpc_metadata_array_copy(cgrpc_metadata_array *src) {
+  cgrpc_metadata_array *dst = cgrpc_metadata_array_create();
+  if (src->count > 0) {
+    dst->capacity = src->count;
+    dst->metadata = gpr_malloc(dst->capacity * sizeof(grpc_metadata));
+    dst->count = src->count;
+    for (size_t i = 0; i < src->count; i++) {
+      dst->metadata[i].key = grpc_slice_ref(src->metadata[i].key);
+      dst->metadata[i].value = grpc_slice_ref(src->metadata[i].value);
+      dst->metadata[i].flags = src->metadata[i].flags;
+    }
+  }
+  return dst;
+}
+
 void cgrpc_metadata_array_append_metadata(cgrpc_metadata_array *metadata, const char *key, const char *value) {
   if (metadata->count >= metadata->capacity) {
     size_t new_capacity = 2 * metadata->capacity;

+ 2 - 1
Sources/CgRPC/shim/observers.c

@@ -127,7 +127,6 @@ void cgrpc_observer_apply(cgrpc_observer *observer, grpc_op *op) {
       grpc_metadata_array_init(&(obs->trailing_metadata_recv));
       obs->server_status = GRPC_STATUS_OK;
       obs->server_details = grpc_slice_from_copied_string("");
-      obs->server_details_capacity = 0;
       op->data.recv_status_on_client.trailing_metadata = &(obs->trailing_metadata_recv);
       op->data.recv_status_on_client.status = &(obs->server_status);
       op->data.recv_status_on_client.status_details = &(obs->server_details);
@@ -167,6 +166,7 @@ void cgrpc_observer_destroy(cgrpc_observer *observer) {
     }
     case GRPC_OP_SEND_STATUS_FROM_SERVER: {
       cgrpc_observer_send_status_from_server *obs = (cgrpc_observer_send_status_from_server *) observer;
+      grpc_slice_unref(obs->status_details);
       free(obs);
       break;
     }
@@ -185,6 +185,7 @@ void cgrpc_observer_destroy(cgrpc_observer *observer) {
     case GRPC_OP_RECV_STATUS_ON_CLIENT: {
       cgrpc_observer_recv_status_on_client *obs = (cgrpc_observer_recv_status_on_client *) observer;
       grpc_metadata_array_destroy(&(obs->trailing_metadata_recv));
+      grpc_slice_unref(obs->server_details);
       free(obs);
       break;
     }

+ 7 - 5
Sources/SwiftGRPC/Core/Handler.swift

@@ -81,7 +81,9 @@ public class Handler {
   /// Fills the handler properties with information about the received request
   ///
   func requestCall(tag: Int) throws {
-    let error = cgrpc_handler_request_call(underlyingHandler, requestMetadata.underlyingArray, UnsafeMutableRawPointer(bitPattern: tag))
+    let error = cgrpc_handler_request_call(underlyingHandler,
+                                           try requestMetadata.getUnderlyingArrayAndTransferFieldOwnership(),
+                                           UnsafeMutableRawPointer(bitPattern: tag))
     if error != GRPC_CALL_OK {
       throw CallError.callError(grpcCallError: error)
     }
@@ -100,7 +102,7 @@ public class Handler {
                            completion: ((Bool) -> Void)? = nil) throws {
     try call.perform(OperationGroup(
       call: call,
-      operations: [.sendInitialMetadata(initialMetadata)],
+      operations: [.sendInitialMetadata(initialMetadata.copy())],
       completion: completion != nil
         ? { operationGroup in completion?(operationGroup.success) }
         : nil))
@@ -113,7 +115,7 @@ public class Handler {
     try call.perform(OperationGroup(
       call: call,
       operations: [
-        .sendInitialMetadata(initialMetadata),
+        .sendInitialMetadata(initialMetadata.copy()),
         .receiveMessage
     ]) { operationGroup in
       if operationGroup.success {
@@ -134,7 +136,7 @@ public class Handler {
       operations: [
         .sendMessage(messageBuffer),
         .receiveCloseOnServer,
-        .sendStatusFromServer(status.code, status.message, status.trailingMetadata)
+        .sendStatusFromServer(status.code, status.message, status.trailingMetadata.copy())
     ]) { _ in
       completion?()
       self.shutdown()
@@ -148,7 +150,7 @@ public class Handler {
       call: call,
       operations: [
         .receiveCloseOnServer,
-        .sendStatusFromServer(status.code, status.message, status.trailingMetadata)
+        .sendStatusFromServer(status.code, status.message, status.trailingMetadata.copy())
     ]) { _ in
       completion?()
       self.shutdown()

+ 32 - 12
Sources/SwiftGRPC/Core/Metadata.swift

@@ -20,25 +20,42 @@ import Foundation // for String.Encoding
 
 /// Metadata sent with gRPC messages
 public class Metadata: CustomStringConvertible {
+  public enum Error: Swift.Error {
+    /// Field ownership can only be transferred once. Likewise, it is not advisable to write to a metadata array whose
+    /// fields we do not own.
+    case doesNotOwnFields
+  }
+  
   /// Pointer to underlying C representation
-  let underlyingArray: UnsafeMutableRawPointer
+  fileprivate let underlyingArray: UnsafeMutableRawPointer
+  /// Ownership of the fields inside metadata arrays provided by `grpc_op_recv_initial_metadata` and
+  /// `grpc_op_recv_status_on_client` is retained by the gRPC library. Similarly, passing metadata to gRPC for sending
+  /// to the client for sending/receiving also transfers ownership. However, before we have passed that metadata to
+  /// gRPC, we are still responsible for releasing its fields. This variable tracks that.
+  fileprivate var ownsFields: Bool
 
-  init(underlyingArray: UnsafeMutableRawPointer) {
+  init(underlyingArray: UnsafeMutableRawPointer, ownsFields: Bool) {
     self.underlyingArray = underlyingArray
+    self.ownsFields = ownsFields
   }
 
   public init() {
     underlyingArray = cgrpc_metadata_array_create()
+    ownsFields = true
   }
 
   public init(_ pairs: [String: String]) {
     underlyingArray = cgrpc_metadata_array_create()
+    ownsFields = true
     for (key, value) in pairs {
-      add(key: key, value: value)
+      try! add(key: key, value: value)
     }
   }
 
   deinit {
+    if ownsFields {
+      cgrpc_metadata_array_unref_fields(underlyingArray)
+    }
     cgrpc_metadata_array_destroy(underlyingArray)
   }
 
@@ -64,7 +81,10 @@ public class Metadata: CustomStringConvertible {
     return String(cString: valueData, encoding: String.Encoding.utf8)
   }
   
-  public func add(key: String, value: String) {
+  public func add(key: String, value: String) throws {
+    if !ownsFields {
+      throw Error.doesNotOwnFields
+    }
     cgrpc_metadata_array_append_metadata(underlyingArray, key, value)
   }
   
@@ -79,14 +99,14 @@ public class Metadata: CustomStringConvertible {
   }
   
   public func copy() -> Metadata {
-    let copy = Metadata()
-    for index in 0..<count() {
-      let keyData = cgrpc_metadata_array_copy_key_at_index(underlyingArray, index)!
-      defer { cgrpc_free_copied_string(keyData) }
-      let valueData = cgrpc_metadata_array_copy_value_at_index(underlyingArray, index)!
-      defer { cgrpc_free_copied_string(valueData) }
-      cgrpc_metadata_array_append_metadata(copy.underlyingArray, keyData, valueData)
+    return Metadata(underlyingArray: cgrpc_metadata_array_copy(underlyingArray), ownsFields: true)
+  }
+  
+  func getUnderlyingArrayAndTransferFieldOwnership() throws -> UnsafeMutableRawPointer {
+    if !ownsFields {
+      throw Error.doesNotOwnFields
     }
-    return copy
+    ownsFields = false
+    return underlyingArray
   }
 }

+ 9 - 7
Sources/SwiftGRPC/Core/OperationGroup.swift

@@ -54,18 +54,18 @@ class OperationGroup {
   ///
   /// - Parameter: operation: the operation to observe
   /// - Returns: the observer
-  private func underlyingObserverForOperation(operation: Operation) -> UnsafeMutableRawPointer {
+  private func underlyingObserverForOperation(operation: Operation) throws -> UnsafeMutableRawPointer {
     let underlyingObserver: UnsafeMutableRawPointer
     switch operation {
     case .sendInitialMetadata(let metadata):
-      underlyingObserver = cgrpc_observer_create_send_initial_metadata(metadata.underlyingArray)!
+      underlyingObserver = cgrpc_observer_create_send_initial_metadata(try metadata.getUnderlyingArrayAndTransferFieldOwnership())!
     case .sendMessage(let message):
       underlyingObserver = cgrpc_observer_create_send_message()!
       cgrpc_observer_send_message_set_message(underlyingObserver, message.underlyingByteBuffer)
     case .sendCloseFromClient:
       underlyingObserver = cgrpc_observer_create_send_close_from_client()!
     case .sendStatusFromServer(let statusCode, let statusMessage, let metadata):
-      underlyingObserver = cgrpc_observer_create_send_status_from_server(metadata.underlyingArray)!
+      underlyingObserver = cgrpc_observer_create_send_status_from_server(try metadata.getUnderlyingArrayAndTransferFieldOwnership())!
       cgrpc_observer_send_status_from_server_set_status(underlyingObserver, Int32(statusCode.rawValue))
       cgrpc_observer_send_status_from_server_set_status_details(underlyingObserver, statusMessage)
     case .receiveInitialMetadata:
@@ -85,7 +85,7 @@ class OperationGroup {
   /// - Parameter operations: an array of operations
   init(call: Call,
        operations: [Operation],
-       completion: ((OperationGroup) -> Void)? = nil) {
+       completion: ((OperationGroup) -> Void)? = nil) throws {
     self.call = call
     self.operations = operations
     self.completion = completion
@@ -98,7 +98,7 @@ class OperationGroup {
     underlyingOperations = cgrpc_operations_create()
     cgrpc_operations_reserve_space_for_operations(underlyingOperations, Int32(operations.count))
     for operation in operations {
-      let underlyingObserver = underlyingObserverForOperation(operation: operation)
+      let underlyingObserver = try underlyingObserverForOperation(operation: operation)
       underlyingObservers.append(underlyingObserver)
       cgrpc_operations_add_operation(underlyingOperations, underlyingObserver)
     }
@@ -143,7 +143,8 @@ class OperationGroup {
       switch operation {
       case .receiveInitialMetadata:
         cachedInitialMetadata = Metadata(
-          underlyingArray: cgrpc_observer_recv_initial_metadata_get_metadata(underlyingObservers[i]))
+          underlyingArray: cgrpc_observer_recv_initial_metadata_get_metadata(underlyingObservers[i]),
+          ownsFields: false)
         return cachedInitialMetadata!
       default:
         continue
@@ -196,7 +197,8 @@ class OperationGroup {
       switch operation {
       case .receiveStatusOnClient:
         cachedTrailingMetadata = Metadata(
-          underlyingArray: cgrpc_observer_recv_status_on_client_get_metadata(underlyingObservers[i]))
+          underlyingArray: cgrpc_observer_recv_status_on_client_get_metadata(underlyingObservers[i]),
+          ownsFields: false)
         return cachedTrailingMetadata!
       default:
         continue