Browse Source

Merge pull request #168 from MrMage/improve-example-tests

Improve the example tests
Tim Burks 7 years ago
parent
commit
2676eda57c

+ 1 - 1
Examples/Echo/EchoProvider.swift

@@ -63,9 +63,9 @@ class EchoProvider: Echo_EchoProvider {
     while true {
       do {
         let request = try session.receive()
-        count += 1
         var response = Echo_EchoResponse()
         response.text = "Swift echo update (\(count)): \(request.text)"
+        count += 1
         let sem = DispatchSemaphore(value: 0)
         try session.send(response) { _ in sem.signal() }
         _ = sem.wait(timeout: DispatchTime.distantFuture)

+ 1 - 1
Examples/Echo/PackageManager/Makefile

@@ -10,7 +10,7 @@ test:	all
 	./Echo collect | tee -a test.out
 	./Echo update | tee -a test.out
 	kill -9 `cat echo.pid`
-	diff test.out test.gold
+	diff -u test.out test.gold
 	
 project:
 	swift package generate-xcodeproj

+ 21 - 57
Examples/Echo/PackageManager/Sources/main.swift

@@ -135,38 +135,23 @@ Group {
       sem.signal()
     }
 
-    let sendCountMutex = Mutex()
-    var sendCount = 0
-
     let parts = message.components(separatedBy: " ")
     for part in parts {
       var requestMessage = Echo_EchoRequest()
       requestMessage.text = part
       print("collect sending: " + part)
-      try collectCall.send(requestMessage) {
-        error in
-        sendCountMutex.synchronize {
-          sendCount = sendCount + 1
-        }
-	if let error = error {	
+      try collectCall.send(requestMessage) { error in
+        if let error = error {	
           print("collect send error \(error)")
-	}
-      }
-    }
-    // don't close until all sends have completed
-    var waiting = true
-    while (waiting) {
-      sendCountMutex.synchronize {
-        if sendCount == parts.count {
-          waiting = false
         }
       }
     }
+    collectCall.waitForSendOperationsToFinish()
     let responseMessage = try collectCall.closeAndReceive()
     print("collect received: \(responseMessage.text)")
     _ = sem.wait(timeout: DispatchTime.distantFuture)
     if let statusCode = callResult?.statusCode {
-      print("collect completed with status \(statusCode)")
+      print("collect completed with code \(statusCode)")
     }
   }
 
@@ -181,58 +166,37 @@ Group {
       sem.signal()
     }
 
-    let responsesMutex = Mutex()
-    var responses : [String] = []
-
-    DispatchQueue.global().async {
-      var running = true
-      while running {
-        do {
-          let responseMessage = try updateCall.receive()
-          responsesMutex.synchronize {
-            responses.append("update received: \(responseMessage.text)")
-          }
-        } catch ClientError.endOfStream {
-          running = false
-        } catch (let error) {
-          responsesMutex.synchronize {
-            responses.append("update receive error: \(error)")
-          }
-        }
-      }
-    }
-
     let parts = message.components(separatedBy: " ")
     for part in parts {
       var requestMessage = Echo_EchoRequest()
       requestMessage.text = part
       print("update sending: " + requestMessage.text)
-      try updateCall.send(requestMessage) {
-        error in
-        if let error = error {
-          print("update send error: \(error)")
+      try updateCall.send(requestMessage) { error in
+        if let error = error {	
+          print("update send error \(error)")
         }
       }
     }
-
-    // don't close until last update is received
-    var waiting = true
-    while (waiting) {
-      responsesMutex.synchronize {
-        if responses.count == parts.count {
-          waiting = false
-        }
+    updateCall.waitForSendOperationsToFinish()
+    
+    try updateCall.closeSend()
+    
+    while true {
+      do {
+        let responseMessage = try updateCall.receive()
+        print("update received: \(responseMessage.text)")
+      } catch ClientError.endOfStream {
+        break
+      } catch (let error) {
+        print("update receive error: \(error)")
+        break
       }
     }
-    try updateCall.closeSend()
 
     _ = sem.wait(timeout: DispatchTime.distantFuture)
 
-    for response in responses {
-      print(response)
-    }
     if let statusCode = callResult?.statusCode {
-      print("update completed with status \(statusCode)")
+      print("update completed with code \(statusCode)")
     }
   }
 

+ 6 - 6
Examples/Echo/PackageManager/test.gold

@@ -14,14 +14,14 @@ collect sending: 1
 collect sending: 2
 collect sending: 3
 collect received: Swift echo collect: Testing 1 2 3
-collect completed with status ok
+collect completed with code ok
 calling update
 update sending: Testing
 update sending: 1
 update sending: 2
 update sending: 3
-update received: Swift echo update (1): Testing
-update received: Swift echo update (2): 1
-update received: Swift echo update (3): 2
-update received: Swift echo update (4): 3
-update completed with status ok
+update received: Swift echo update (0): Testing
+update received: Swift echo update (1): 1
+update received: Swift echo update (2): 2
+update received: Swift echo update (3): 3
+update completed with code ok

+ 21 - 19
Sources/gRPC/Call.swift

@@ -157,6 +157,8 @@ public class Call {
   /// A queue of pending messages to send over the call
   private var messageQueue: [(dataToSend: Data, completion: (Error?) -> Void)] = []
 
+  public let messageQueueEmpty = DispatchGroup()
+  
   /// True if a message write operation is underway
   private var writing: Bool
 
@@ -254,6 +256,7 @@ public class Call {
   /// Parameter data: the message data to send
   /// - Throws: `CallError` if fails to call. `CallWarning` if blocked.
   public func sendMessage(data: Data, completion: @escaping (Error?) -> Void) throws {
+    messageQueueEmpty.enter()
     try sendMutex.synchronize {
       if writing {
         if (Call.messageQueueMaxLength > 0) && // if max length is <= 0, consider it infinite
@@ -272,29 +275,28 @@ public class Call {
   private func sendWithoutBlocking(data: Data, completion: @escaping (Error?) -> Void) throws {
     try perform(OperationGroup(call: self,
                                operations: [.sendMessage(ByteBuffer(data: data))]) { operationGroup in
-        if operationGroup.success {
-          self.messageDispatchQueue.async {
-            self.sendMutex.synchronize {
-              // if there are messages pending, send the next one
-              if self.messageQueue.count > 0 {
-                let (nextMessage, nextCompletionHandler) = self.messageQueue.removeFirst()
-                do {
-                  try self.sendWithoutBlocking(data: nextMessage, completion: nextCompletionHandler)
-                } catch (let callError) {
-                  nextCompletionHandler(callError)
-                }
-              } else {
-                // otherwise, we are finished writing
-                self.writing = false
+        // TODO(timburks, danielalm): Is the `async` dispatch here needed, and/or should we call the completion handler
+        // and leave `messageQueueEmpty` in the `async` block as well?
+        self.messageDispatchQueue.async {
+          // Always enqueue the next message, even if sending this one failed. This ensures that all send completion
+          // handlers are called eventually.
+          self.sendMutex.synchronize {
+            // if there are messages pending, send the next one
+            if self.messageQueue.count > 0 {
+              let (nextMessage, nextCompletionHandler) = self.messageQueue.removeFirst()
+              do {
+                try self.sendWithoutBlocking(data: nextMessage, completion: nextCompletionHandler)
+              } catch (let callError) {
+                nextCompletionHandler(callError)
               }
+            } else {
+              // otherwise, we are finished writing
+              self.writing = false
             }
           }
-          completion(nil)
-        } else {
-          // if the event failed, shut down
-          self.writing = false
-          completion(CallError.unknown)
         }
+        completion(operationGroup.success ? nil : CallError.unknown)
+        self.messageQueueEmpty.leave()
     })
   }
 

+ 5 - 0
Sources/gRPC/GenCodeSupport/ClientCall.swift

@@ -20,6 +20,9 @@ import SwiftProtobuf
 
 public protocol ClientCall: class {
   static var method: String { get }
+  
+  /// Cancel the call.
+  func cancel()
 }
 
 open class ClientCallBase: ClientCall {
@@ -31,4 +34,6 @@ open class ClientCallBase: ClientCall {
   public init(_ channel: Channel) {
     call = channel.makeCall(type(of: self).method)
   }
+  
+  public func cancel() { call.cancel() }
 }

+ 7 - 3
Sources/gRPC/GenCodeSupport/ClientCallBidirectionalStreaming.swift

@@ -19,6 +19,8 @@ import Foundation
 import SwiftProtobuf
 
 public protocol ClientCallBidirectionalStreaming: ClientCall {
+  func waitForSendOperationsToFinish()
+  
   // TODO: Move the other, message type-dependent, methods into this protocol. At the moment, this is not possible,
   // as the protocol would then have an associated type requirement (and become pretty much unusable in the process).
 }
@@ -81,9 +83,9 @@ open class ClientCallBidirectionalStreamingBase<InputType: Message, OutputType:
     }
     _ = sem.wait(timeout: DispatchTime.distantFuture)
   }
-
-  public func cancel() {
-    call.cancel()
+  
+  public func waitForSendOperationsToFinish() {
+    call.messageQueueEmpty.wait()
   }
 }
 
@@ -123,5 +125,7 @@ open class ClientCallBidirectionalStreamingTestStub<InputType: Message, OutputTy
 
   open func closeSend() throws {}
 
+  open func waitForSendOperationsToFinish() {}
+
   open func cancel() {}
 }

+ 5 - 4
Sources/gRPC/GenCodeSupport/ClientCallClientStreaming.swift

@@ -19,8 +19,7 @@ import Foundation
 import SwiftProtobuf
 
 public protocol ClientCallClientStreaming: ClientCall {
-  /// Cancel the call.
-  func cancel()
+  func waitForSendOperationsToFinish()
 
   // TODO: Move the other, message type-dependent, methods into this protocol. At the moment, this is not possible,
   // as the protocol would then have an associated type requirement (and become pretty much unusable in the process).
@@ -73,8 +72,8 @@ open class ClientCallClientStreamingBase<InputType: Message, OutputType: Message
     return returnResponse
   }
 
-  public func cancel() {
-    call.cancel()
+  public func waitForSendOperationsToFinish() {
+    call.messageQueueEmpty.wait()
   }
 }
 
@@ -100,5 +99,7 @@ open class ClientCallClientStreamingTestStub<InputType: Message, OutputType: Mes
     return output!
   }
 
+  open func waitForSendOperationsToFinish() {}
+  
   open func cancel() {}
 }

+ 0 - 7
Sources/gRPC/GenCodeSupport/ClientCallServerStreaming.swift

@@ -19,9 +19,6 @@ import Foundation
 import SwiftProtobuf
 
 public protocol ClientCallServerStreaming: ClientCall {
-  /// Cancel the call.
-  func cancel()
-
   // TODO: Move the other, message type-dependent, methods into this protocol. At the moment, this is not possible,
   // as the protocol would then have an associated type requirement (and become pretty much unusable in the process).
 }
@@ -70,10 +67,6 @@ open class ClientCallServerStreamingBase<InputType: Message, OutputType: Message
     }
     return returnResponse
   }
-
-  public func cancel() {
-    call.cancel()
-  }
 }
 
 /// Simple fake implementation of ClientCallServerStreamingBase that returns a previously-defined set of results.

+ 1 - 8
Sources/gRPC/GenCodeSupport/ClientCallUnary.swift

@@ -18,10 +18,7 @@ import Dispatch
 import Foundation
 import SwiftProtobuf
 
-public protocol ClientCallUnary: ClientCall {
-  /// Cancel the call.
-  func cancel()
-}
+public protocol ClientCallUnary: ClientCall {}
 
 open class ClientCallUnaryBase<InputType: Message, OutputType: Message>: ClientCallBase, ClientCallUnary {
   /// Run the call. Blocks until the reply is received.
@@ -59,8 +56,4 @@ open class ClientCallUnaryBase<InputType: Message, OutputType: Message>: ClientC
     }
     return self
   }
-
-  public func cancel() {
-    call.cancel()
-  }
 }