Browse Source

Ensure we're on the event loop when sending streaming responses (#1054)

Motivation:

We expect to send streamed responses from the event loop on the server.
Previously this was enforced by accident as responses were sent by
appending a callback to write the response to a `Channel` future.
However, since the implementation changed we lost this enforcement and
it's possible for users to run into assertion failures if they send
responses from off of the event loop.

Modifications:

Ensure `sendResponse` and `sendResponses` are called on the event loop
in the server streaming call contexts.

Result:

Responses can safely be sent from a streaming response context from off
of the event loop.
George Barnett 5 years ago
parent
commit
5b2b7eccda
1 changed files with 21 additions and 1 deletions
  1. 21 1
      Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift

+ 21 - 1
Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift

@@ -140,13 +140,33 @@ internal final class _StreamingResponseCallContext<Request, Response>:
     promise: EventLoopPromise<Void>?
   ) {
     let compress = compression.isEnabled(callDefault: self.compressionEnabled)
-    self._sendResponse(message, .init(compress: compress, flush: true), promise)
+    if self.eventLoop.inEventLoop {
+      self._sendResponse(message, .init(compress: compress, flush: true), promise)
+    } else {
+      self.eventLoop.execute {
+        self._sendResponse(message, .init(compress: compress, flush: true), promise)
+      }
+    }
   }
 
   override func sendResponses<Messages: Sequence>(
     _ messages: Messages,
     compression: Compression = .deferToCallDefault,
     promise: EventLoopPromise<Void>?
+  ) where Response == Messages.Element {
+    if self.eventLoop.inEventLoop {
+      self._sendResponses(messages, compression: compression, promise: promise)
+    } else {
+      self.eventLoop.execute {
+        self._sendResponses(messages, compression: compression, promise: promise)
+      }
+    }
+  }
+
+  private func _sendResponses<Messages: Sequence>(
+    _ messages: Messages,
+    compression: Compression,
+    promise: EventLoopPromise<Void>?
   ) where Response == Messages.Element {
     let compress = compression.isEnabled(callDefault: self.compressionEnabled)
     var iterator = messages.makeIterator()