Browse Source

Fix crash during Channel with subscription is destroying (#328)

* Fix crash during Channel destroying

* Add channel crash test case

* Fix review issues

* Fix review issues

* Fix review issues
slavabulgakov 7 years ago
parent
commit
2fd06dcdd6
2 changed files with 54 additions and 0 deletions
  1. 13 0
      Sources/SwiftGRPC/Core/Channel.swift
  2. 41 0
      Tests/SwiftGRPCTests/ChannelCrashTests.swift

+ 13 - 0
Sources/SwiftGRPC/Core/Channel.swift

@@ -132,6 +132,8 @@ private extension Channel {
     private let underlyingCompletionQueue: UnsafeMutableRawPointer
     private let callback: (ConnectivityState) -> Void
     private var lastState: ConnectivityState
+    private var hasBeenShutdown = false
+    private let stateMutex: Mutex = Mutex()
 
     init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) {
       self.underlyingChannel = underlyingChannel
@@ -151,11 +153,19 @@ private extension Channel {
 
       spinloopThreadQueue.async {
         while true  {
+          guard (self.stateMutex.synchronize{ !self.hasBeenShutdown }) else {
+            return
+          }
+            
           guard let underlyingState = self.lastState.underlyingState else { return }
 
           let deadline: TimeInterval = 0.2
           cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
           let event = self.completionQueue.wait(timeout: deadline)
+          
+          guard (self.stateMutex.synchronize{ !self.hasBeenShutdown }) else {
+            return
+          }
 
           switch event.type {
           case .complete:
@@ -178,6 +188,9 @@ private extension Channel {
     }
 
     func shutdown() {
+      stateMutex.synchronize {
+        hasBeenShutdown = true
+      }
       completionQueue.shutdown()
     }
   }

+ 41 - 0
Tests/SwiftGRPCTests/ChannelCrashTests.swift

@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import XCTest
+@testable import SwiftGRPC
+
+class ChannelCrashTests: BasicEchoTestCase {
+    override var defaultTimeout: TimeInterval { return 0.4 }
+    
+    func testDanglingConnectivityObserversDontCrash() {
+        let completionHandlerExpectation = expectation(description: "completion handler called")
+        
+        client?.channel.subscribe { connectivityState in
+            print("ConnectivityState: \(connectivityState)")
+        }
+        
+        let request = Echo_EchoRequest(text: "foo bar baz foo bar baz")
+        _ = try! client!.expand(request) { callResult in
+            print("callResult.statusCode: \(callResult.statusCode)")
+            completionHandlerExpectation.fulfill()
+        }
+        
+        DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(300)) {
+            self.client = nil // Deallocating the client
+        }
+
+        waitForExpectations(timeout: 0.5)
+    }
+}