2
0
Эх сурвалжийг харах

Merge pull request #188 from MrMage/even-more-improvements

Two important fixes (and possibly more improvements)
Tim Burks 7 жил өмнө
parent
commit
dd4d1cb64d

+ 3 - 1
Sources/CgRPC/shim/call.c

@@ -21,7 +21,9 @@
 #include <assert.h>
 
 void cgrpc_call_destroy(cgrpc_call *call) {
-  //grpc_call_destroy(call->call);
+  if (call->call) {
+    grpc_call_unref(call->call);
+  }
   free(call);
 }
 

+ 22 - 0
Sources/CgRPC/shim/cgrpc.h

@@ -91,6 +91,22 @@ typedef enum grpc_completion_type {
   GRPC_OP_COMPLETE
 } grpc_completion_type;
 
+/** Connectivity state of a channel. */
+typedef enum grpc_connectivity_state {
+  /** channel has just been initialized */
+  GRPC_CHANNEL_INIT = -1,
+  /** channel is idle */
+  GRPC_CHANNEL_IDLE,
+  /** channel is connecting */
+  GRPC_CHANNEL_CONNECTING,
+  /** channel is ready for work */
+  GRPC_CHANNEL_READY,
+  /** channel has seen a failure but expects to recover */
+  GRPC_CHANNEL_TRANSIENT_FAILURE,
+  /** channel has seen a failure that it cannot recover from */
+  GRPC_CHANNEL_SHUTDOWN
+} grpc_connectivity_state;
+
 typedef struct grpc_event {
   /** The type of the completion. */
   grpc_completion_type type;
@@ -110,6 +126,9 @@ void grpc_shutdown(void);
 const char *grpc_version_string(void);
 const char *grpc_g_stands_for(void);
 
+void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
+void grpc_completion_queue_destroy(cgrpc_completion_queue *cq);
+
 // helper
 void cgrpc_free_copied_string(char *string);
 
@@ -126,6 +145,9 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
                                       double timeout);
 cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);
 
+grpc_connectivity_state cgrpc_channel_check_connectivity_state(
+    cgrpc_channel *channel, int try_to_connect);
+
 // server support
 cgrpc_server *cgrpc_server_create(const char *address);
 cgrpc_server *cgrpc_server_create_secure(const char *address,

+ 5 - 2
Sources/CgRPC/shim/channel.c

@@ -71,8 +71,6 @@ void cgrpc_channel_destroy(cgrpc_channel *c) {
   c->channel = NULL;
 
   grpc_completion_queue_shutdown(c->completion_queue);
-  cgrpc_completion_queue_drain(c->completion_queue);
-  grpc_completion_queue_destroy(c->completion_queue);
   free(c);
 }
 
@@ -85,6 +83,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
   // create call
   host_slice = grpc_slice_from_copied_string(host);
   gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
+  // The resulting call will have a retain call of +1. We'll release it in `cgrpc_call_destroy()`.
   grpc_call *channel_call = grpc_channel_create_call(channel->channel,
                                                      NULL,
                                                      GRPC_PROPAGATE_DEFAULTS,
@@ -102,3 +101,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
 cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
   return channel->completion_queue;
 }
+
+grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
+  return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
+}

+ 10 - 5
Sources/CgRPC/shim/handler.c

@@ -33,12 +33,10 @@ cgrpc_handler *cgrpc_handler_create_with_server(cgrpc_server *server) {
 
 void cgrpc_handler_destroy(cgrpc_handler *h) {
   grpc_completion_queue_shutdown(h->completion_queue);
-  cgrpc_completion_queue_drain(h->completion_queue);
-  grpc_completion_queue_destroy(h->completion_queue);
   grpc_metadata_array_destroy(&(h->request_metadata_recv));
   grpc_call_details_destroy(&(h->call_details));
   if (h->server_call) {
-    //grpc_call_destroy(h->server_call);
+    grpc_call_unref(h->server_call);
   }
   free(h);
 }
@@ -67,6 +65,10 @@ cgrpc_call *cgrpc_handler_get_call(cgrpc_handler *h) {
   cgrpc_call *call = (cgrpc_call *) malloc(sizeof(cgrpc_call));
   memset(call, 0, sizeof(cgrpc_call));
   call->call = h->server_call;
+  if (call->call) {
+    // This retain will be balanced by `cgrpc_call_destroy()`.
+    grpc_call_ref(call->call);
+  }
   return call;
 }
 
@@ -77,6 +79,11 @@ cgrpc_completion_queue *cgrpc_handler_get_completion_queue(cgrpc_handler *h) {
 grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h,
                                            cgrpc_metadata_array *metadata,
                                            long tag) {
+  if (h->server_call != NULL) {
+    return GRPC_CALL_OK;
+  }
+  // This fills `h->server_call` with a call with retain count of +1.
+  // We'll release that retain in `cgrpc_handler_destroy()`.
   return grpc_server_request_call(h->server->server,
                                   &(h->server_call),
                                   &(h->call_details),
@@ -85,5 +92,3 @@ grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h,
                                   h->server->completion_queue,
                                   cgrpc_create_tag(tag));
 }
-
-

+ 0 - 2
Sources/CgRPC/shim/server.c

@@ -77,8 +77,6 @@ void cgrpc_server_destroy(cgrpc_server *server) {
   server->server = NULL;
 
   grpc_completion_queue_shutdown(server->completion_queue);
-  cgrpc_completion_queue_drain(server->completion_queue);
-  grpc_completion_queue_destroy(server->completion_queue);
 }
 
 void cgrpc_server_start(cgrpc_server *server) {

+ 4 - 0
Sources/SwiftGRPC/Core/Channel.swift

@@ -31,6 +31,10 @@ public class Channel {
 
   /// Default host to use for new calls
   public var host: String
+  
+  public var connectivityState: ConnectivityState? {
+    return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
+  }
 
   /// Initializes a gRPC channel
   ///

+ 5 - 0
Sources/SwiftGRPC/Core/CompletionQueue.swift

@@ -77,6 +77,11 @@ class CompletionQueue {
     self.underlyingCompletionQueue = underlyingCompletionQueue
     self.name = name
   }
+  
+  deinit {
+    cgrpc_completion_queue_drain(underlyingCompletionQueue)
+    grpc_completion_queue_destroy(underlyingCompletionQueue)
+  }
 
   /// Waits for an operation group to complete
   ///

+ 32 - 0
Sources/SwiftGRPC/Core/ConnectivityState.swift

@@ -0,0 +1,32 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if SWIFT_PACKAGE
+  import CgRPC
+#endif
+import Foundation
+
+public enum ConnectivityState: Int32, Error {
+  case initializing = -1
+  case idle
+  case connecting
+  case ready
+  case transient_failure
+  case shutdown
+  
+  static func fromCEnum(_ connectivityState: grpc_connectivity_state) -> ConnectivityState? {
+    return ConnectivityState(rawValue: connectivityState.rawValue)
+  }
+}

+ 1 - 1
Sources/SwiftGRPC/Core/Handler.swift

@@ -32,7 +32,7 @@ public class Handler {
   /// A Call object that can be used to respond to the request
   public private(set) lazy var call: Call = {
     Call(underlyingCall: cgrpc_handler_get_call(self.underlyingHandler),
-         owned: false,
+         owned: true,
          completionQueue: self.completionQueue)
   }()
 

+ 3 - 3
Sources/SwiftGRPC/Core/Metadata.swift

@@ -69,13 +69,13 @@ public class Metadata: CustomStringConvertible {
   }
   
   public var description: String {
-    var result = ""
+    var lines: [String] = []
     for i in 0..<count() {
       let key = self.key(i)
       let value = self.value(i)
-      result += (key ?? "(nil)") + ":" + (value ?? "(nil)") + "\n"
+      lines.append((key ?? "(nil)") + ":" + (value ?? "(nil)"))
     }
-    return result
+    return lines.joined(separator: "\n")
   }
   
   public func copy() -> Metadata {

+ 5 - 5
Sources/SwiftGRPC/Core/Server.swift

@@ -66,8 +66,7 @@ public class Server {
     cgrpc_server_start(underlyingServer)
     // run the server on a new background thread
     dispatchQueue.async {
-      var running = true
-      while running {
+      spinloop: while true {
         do {
           let handler = Handler(underlyingServer: self.underlyingServer)
           try handler.requestCall(tag: Server.handlerCallTag)
@@ -97,16 +96,17 @@ public class Server {
                 }
               }
             } else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
-              running = false // exit the loop
+              break spinloop
             }
           } else if event.type == .queueTimeout {
             // everything is fine
+            continue
           } else if event.type == .queueShutdown {
-            running = false
+            break spinloop
           }
         } catch {
           print("server call error: \(error)")
-          running = false
+          break spinloop
         }
       }
       self.onCompletion?()

+ 10 - 6
Sources/SwiftGRPC/Runtime/ServiceServer.swift

@@ -22,6 +22,8 @@ open class ServiceServer {
   public let address: String
   public let server: Server
 
+  public var shouldLogRequests = true
+
   /// Create a server that accepts insecure connections.
   public init(address: String) {
     gRPC.initialize()
@@ -58,13 +60,15 @@ open class ServiceServer {
         return
       }
 
-      let unwrappedHost = handler.host ?? "(nil)"
       let unwrappedMethod = handler.method ?? "(nil)"
-      let unwrappedCaller = handler.caller ?? "(nil)"
-      print("Server received request to " + unwrappedHost
-        + " calling " + unwrappedMethod
-        + " from " + unwrappedCaller
-        + " with " + handler.requestMetadata.description)
+      if strongSelf.shouldLogRequests == true {
+        let unwrappedHost = handler.host ?? "(nil)"
+        let unwrappedCaller = handler.caller ?? "(nil)"
+        print("Server received request to " + unwrappedHost
+          + " calling " + unwrappedMethod
+          + " from " + unwrappedCaller
+          + " with " + handler.requestMetadata.description)
+      }
       
       do {
         if !(try strongSelf.handleMethod(unwrappedMethod, handler: handler, queue: queue)) {

+ 17 - 0
Tests/SwiftGRPCTests/EchoTests.swift

@@ -22,6 +22,7 @@ class EchoTests: BasicEchoTestCase {
   static var allTests: [(String, (EchoTests) -> () throws -> Void)] {
     return [
       ("testUnary", testUnary),
+      ("testUnaryLotsOfRequests", testUnaryLotsOfRequests),
       ("testClientStreaming", testClientStreaming),
       ("testClientStreamingLotsOfMessages", testClientStreamingLotsOfMessages),
       ("testServerStreaming", testServerStreaming),
@@ -48,6 +49,22 @@ extension EchoTests {
     XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text)
     XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text)
   }
+  
+  func testUnaryLotsOfRequests() {
+    // No need to spam the log with 50k lines.
+    server.shouldLogRequests = false
+    // Sending that many requests at once can sometimes trip things up, it seems.
+    client.timeout = 5.0
+    let clockStart = clock()
+    let numberOfRequests = 50_000
+    for i in 0..<numberOfRequests {
+      if i % 1_000 == 0 && i > 0 {
+        print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
+      }
+      XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text)
+    }
+    print("total time for \(numberOfRequests) requests: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
+  }
 }
 
 extension EchoTests {

+ 137 - 70
Tests/SwiftGRPCTests/GRPCTests.swift

@@ -19,12 +19,21 @@ import Foundation
 import XCTest
 
 class gRPCTests: XCTestCase {
+  // We have seen this test flake out in rare cases fairly often due to race conditions.
+  // To detect such rare errors, we run the tests several times.
+  // (By now, all known errors should have been fixed, but we'd still like to detect new ones.)
+  let testRepetitions = 10
+  
   func testConnectivity() {
-    runTest(useSSL: false)
+    for _ in 0..<testRepetitions {
+      runTest(useSSL: false)
+    }
   }
 
   func testConnectivitySecure() {
-    runTest(useSSL: true)
+    for _ in 0..<testRepetitions {
+      runTest(useSSL: true)
+    }
   }
 
   static var allTests: [(String, (gRPCTests) -> () throws -> Void)] {
@@ -37,7 +46,8 @@ class gRPCTests: XCTestCase {
 
 let address = "localhost:8085"
 let host = "example.com"
-let clientText = "hello, server!"
+let evenClientText = "hello, server!"
+let oddClientText = "hello, server, please fail!"
 let serverText = "hello, client!"
 let initialClientMetadata =
   [
@@ -75,11 +85,12 @@ let helloServerStream = "/hello.server-stream"
 let helloBiDiStream = "/hello.bidi-stream"
 
 // Return code/message for unary test
-let oddStatusCode = StatusCode.ok
 let oddStatusMessage = "OK"
+let evenStatusMessage = "some other status message"
 
-let evenStatusCode = StatusCode.notFound
-let eventStatusMessage = "Not Found"
+// Parsing very large messages as String is very inefficient,
+// so we avoid it anything above this threshold.
+let sizeThresholdForReturningDataVerbatim = 10_000
 
 func runTest(useSSL: Bool) {
   gRPC.initialize()
@@ -141,50 +152,78 @@ func runClient(useSSL: Bool) throws {
   }
 
   channel.host = host
-  try callUnary(channel: channel)
-  try callServerStream(channel: channel)
-  try callBiDiStream(channel: channel)
+  let largeMessage = Data(repeating: 88 /* 'X' */, count: 4_000_000)
+  for _ in 0..<10 {
+    // Send several calls to each server we spin up, to ensure that each individual server can handle many requests.
+    try callUnary(channel: channel)
+    try callServerStream(channel: channel)
+    try callBiDiStream(channel: channel)
+  }
+  // Test sending a large message.
+  try callUnaryIndividual(channel: channel, message: largeMessage, shouldSucceed: true)
+  try callUnaryIndividual(channel: channel, message: largeMessage, shouldSucceed: true)
 }
 
 func callUnary(channel: Channel) throws {
-  let message = clientText.data(using: .utf8)
-
+  let evenMessage = evenClientText.data(using: .utf8)!
+  let oddMessage = oddClientText.data(using: .utf8)!
   for i in 0..<steps {
-    let sem = DispatchSemaphore(value: 0)
-    let method = hello
-    let call = channel.makeCall(method)
-    let metadata = Metadata(initialClientMetadata)
-    try call.start(.unary, metadata: metadata, message: message) {
-      response in
-      // verify the basic response from the server
-      XCTAssertEqual(response.statusCode, (i % 2  == 0) ? evenStatusCode : oddStatusCode)
-      XCTAssertEqual(response.statusMessage, (i % 2  == 0) ? eventStatusMessage : oddStatusMessage)
-
-      // verify the message from the server
-      if (i % 2) == 0 {
-        let resultData = response.resultData!
-        let messageString = String(data: resultData, encoding: .utf8)
-        XCTAssertEqual(messageString, serverText)
-      }
+    try callUnaryIndividual(channel: channel,
+                            message: (i % 2) == 0 ? evenMessage : oddMessage,
+                            shouldSucceed: (i % 2) == 0)
+  }
+}
 
-      // verify the initial metadata from the server
-      let initialMetadata = response.initialMetadata!
+func callUnaryIndividual(channel: Channel, message: Data, shouldSucceed: Bool) throws {
+  let sem = DispatchSemaphore(value: 0)
+  let method = hello
+  let call = channel.makeCall(method)
+  let metadata = Metadata(initialClientMetadata)
+  try call.start(.unary, metadata: metadata, message: message) {
+    response in
+    // verify the basic response from the server
+    XCTAssertEqual(response.statusCode, .ok)
+    XCTAssertEqual(response.statusMessage, shouldSucceed ? evenStatusMessage : oddStatusMessage)
+    
+    //print("response.resultData?.count", response.resultData?.count)
+    
+    // verify the message from the server
+    if shouldSucceed {
+      if let resultData = response.resultData {
+        if resultData.count >= sizeThresholdForReturningDataVerbatim {
+          XCTAssertEqual(message, resultData)
+        } else {
+          let messageString = String(data: resultData, encoding: .utf8)
+          XCTAssertEqual(messageString, serverText)
+        }
+      } else {
+        XCTFail("callUnary response missing")
+      }
+    }
+    
+    // verify the initial metadata from the server
+    if let initialMetadata = response.initialMetadata {
       verify_metadata(initialMetadata, expected: initialServerMetadata)
-
-      // verify the trailing metadata from the server
-      let trailingMetadata = response.trailingMetadata!
+    } else {
+      XCTFail("callUnary initial metadata missing")
+    }
+    
+    // verify the trailing metadata from the server
+    if let trailingMetadata = response.trailingMetadata {
       verify_metadata(trailingMetadata, expected: trailingServerMetadata)
-
-      // report completion
-      sem.signal()
+    } else {
+      XCTFail("callUnary trailing metadata missing")
     }
-    // wait for the call to complete
-    _ = sem.wait()
+    
+    // report completion
+    sem.signal()
   }
+  // wait for the call to complete
+  _ = sem.wait()
 }
 
 func callServerStream(channel: Channel) throws {
-  let message = clientText.data(using: .utf8)
+  let message = evenClientText.data(using: .utf8)
   let metadata = Metadata(initialClientMetadata)
 
   let sem = DispatchSemaphore(value: 0)
@@ -197,8 +236,11 @@ func callServerStream(channel: Channel) throws {
     XCTAssertEqual(response.statusMessage, "Custom Status Message ServerStreaming")
 
     // verify the trailing metadata from the server
-    let trailingMetadata = response.trailingMetadata!
-    verify_metadata(trailingMetadata, expected: trailingServerMetadata)
+    if let trailingMetadata = response.trailingMetadata {
+      verify_metadata(trailingMetadata, expected: trailingServerMetadata)
+    } else {
+      XCTFail("callServerStream trailing metadata missing")
+    }
 
     sem.signal() // signal call is finished
   }
@@ -224,29 +266,31 @@ let clientPing = "ping"
 let serverPong = "pong"
 
 func callBiDiStream(channel: Channel) throws {
-  let message = clientPing.data(using: .utf8)
   let metadata = Metadata(initialClientMetadata)
 
   let sem = DispatchSemaphore(value: 0)
   let method = helloBiDiStream
   let call = channel.makeCall(method)
-  try call.start(.bidiStreaming, metadata: metadata, message: message) {
+  try call.start(.bidiStreaming, metadata: metadata, message: nil) {
     response in
 
     XCTAssertEqual(response.statusCode, .ok)
     XCTAssertEqual(response.statusMessage, "Custom Status Message BiDi")
 
     // verify the trailing metadata from the server
-    let trailingMetadata = response.trailingMetadata!
-    verify_metadata(trailingMetadata, expected: trailingServerMetadata)
+    if let trailingMetadata = response.trailingMetadata {
+      verify_metadata(trailingMetadata, expected: trailingServerMetadata)
+    } else {
+      XCTFail("callBiDiStream trailing metadata missing")
+    }
 
     sem.signal() // signal call is finished
   }
 
   // Send pings
+  let message = clientPing.data(using: .utf8)!
   for _ in 0..<steps {
-    let message = clientPing.data(using: .utf8)
-    try call.sendMessage(data: message!) { (err) in
+    try call.sendMessage(data: message) { err in
       XCTAssertNil(err)
     }
     call.messageQueueEmpty.wait()
@@ -277,14 +321,13 @@ func callBiDiStream(channel: Channel) throws {
 }
 
 func runServer(server: Server) throws -> DispatchSemaphore {
-  var requestCount = 0
   let sem = DispatchSemaphore(value: 0)
   server.run { requestHandler in
     do {
       if let method = requestHandler.method {
         switch method {
         case hello:
-          try handleUnary(requestHandler: requestHandler, requestCount: requestCount)
+          try handleUnary(requestHandler: requestHandler)
         case helloServerStream:
           try handleServerStream(requestHandler: requestHandler)
         case helloBiDiStream:
@@ -293,8 +336,6 @@ func runServer(server: Server) throws -> DispatchSemaphore {
           XCTFail("Invalid method \(method)")
         }
       }
-
-      requestCount += 1
     } catch {
       XCTFail("error \(error)")
     }
@@ -307,27 +348,45 @@ func runServer(server: Server) throws -> DispatchSemaphore {
   return sem
 }
 
-func handleUnary(requestHandler: Handler, requestCount: Int) throws {
+func handleUnary(requestHandler: Handler) throws {
   XCTAssertEqual(requestHandler.host, host)
   XCTAssertEqual(requestHandler.method, hello)
   let initialMetadata = requestHandler.requestMetadata
   verify_metadata(initialMetadata, expected: initialClientMetadata)
   let initialMetadataToSend = Metadata(initialServerMetadata)
-  try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) { messageData in
-    let messageString = String(data: messageData!, encoding: .utf8)
-    XCTAssertEqual(messageString, clientText)
+  let receiveSem = DispatchSemaphore(value: 0)
+  var inputMessage: Data?
+  try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
+    if let messageData = $0 {
+      inputMessage = messageData
+      if messageData.count < sizeThresholdForReturningDataVerbatim {
+        let messageString = String(data: messageData, encoding: .utf8)!
+        XCTAssertTrue(messageString == evenClientText || messageString == oddClientText,
+                      "handleUnary unexpected message string \(messageString)")
+      }
+    } else {
+      XCTFail("handleUnary message missing")
+    }
+    receiveSem.signal()
   }
-
-  if (requestCount % 2) == 0 {
-    let replyMessage = serverText
-    let trailingMetadataToSend = Metadata(trailingServerMetadata)
-    try requestHandler.sendResponse(message: replyMessage.data(using: .utf8)!,
-                                    status: ServerStatus(code: evenStatusCode,
-                                                         message: eventStatusMessage,
+  receiveSem.wait()
+
+  // We need to return status OK in both cases, as it seems like the server might never send out the last few messages
+  // once it has been asked to send a non-OK status. Alternatively, we could send a non-OK status here, but then we
+  // would need to sleep for a few milliseconds before sending the non-OK status.
+  let replyMessage = (inputMessage == nil || inputMessage!.count < sizeThresholdForReturningDataVerbatim)
+    ? serverText.data(using: .utf8)!
+    : inputMessage!
+  let trailingMetadataToSend = Metadata(trailingServerMetadata)
+  if let inputMessage = inputMessage,
+    inputMessage.count >= sizeThresholdForReturningDataVerbatim
+      || inputMessage == evenClientText.data(using: .utf8)! {
+    try requestHandler.sendResponse(message: replyMessage,
+                                    status: ServerStatus(code: .ok,
+                                                         message: evenStatusMessage,
                                                          trailingMetadata: trailingMetadataToSend))
   } else {
-    let trailingMetadataToSend = Metadata(trailingServerMetadata)
-    try requestHandler.sendStatus(ServerStatus(code: oddStatusCode,
+    try requestHandler.sendStatus(ServerStatus(code: .ok,
                                                message: oddStatusMessage,
                                                trailingMetadata: trailingMetadataToSend))
   }
@@ -340,14 +399,18 @@ func handleServerStream(requestHandler: Handler) throws {
   verify_metadata(initialMetadata, expected: initialClientMetadata)
 
   let initialMetadataToSend = Metadata(initialServerMetadata)
-  try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) { messageData in
-    let messageString = String(data: messageData!, encoding: .utf8)
-    XCTAssertEqual(messageString, clientText)
+  try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
+    if let messageData = $0 {
+      let messageString = String(data: messageData, encoding: .utf8)
+      XCTAssertEqual(messageString, evenClientText)
+    } else {
+      XCTFail("handleServerStream message missing")
+    }
   }
 
-  let replyMessage = serverText
+  let replyMessage = serverText.data(using: .utf8)!
   for _ in 0..<steps {
-    try requestHandler.call.sendMessage(data: replyMessage.data(using: .utf8)!) { error in
+    try requestHandler.call.sendMessage(data: replyMessage) { error in
       XCTAssertNil(error)
     }
     requestHandler.call.messageQueueEmpty.wait()
@@ -380,8 +443,12 @@ func handleBiDiStream(requestHandler: Handler) throws {
   for _ in 0..<steps {
     let receiveSem = DispatchSemaphore(value: 0)
     try requestHandler.call.receiveMessage { callStatus in
-      let messageString = String(data: callStatus.resultData!, encoding: .utf8)
-      XCTAssertEqual(messageString, clientPing)
+      if let messageData = callStatus.resultData {
+        let messageString = String(data: messageData, encoding: .utf8)
+        XCTAssertEqual(messageString, clientPing)
+      } else {
+        XCTFail("handleBiDiStream message empty")
+      }
       receiveSem.signal()
     }
     _ = receiveSem.wait()

+ 2 - 0
Tests/SwiftGRPCTests/ServerCancellingTests.swift

@@ -51,6 +51,8 @@ class ServerCancellingTests: BasicEchoTestCase {
   }
   
   override func makeProvider() -> Echo_EchoProvider { return CancellingProvider() }
+  
+  override var defaultTimeout: TimeInterval { return 5.0 }
 }
 
 extension ServerCancellingTests {