Browse Source

Add interceptor pipeline timeouts (#1006)

Motivation:

Some RPCs set a deadline, we should support that!

Modifications:

- Add a deadline to the interceptor pipeline.
- Fixed a bug in ClientTransport where we may leave some buffered writes
  behind

Result:

Calls using the interceptor pipeline may timeout.
George Barnett 5 years ago
parent
commit
5eef2f8355

+ 46 - 1
Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

@@ -71,6 +71,9 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   /// The details of the call.
   internal let details: CallDetails
 
+  /// A task for closing the RPC in case of a timeout.
+  private var scheduledClose: Scheduled<Void>?
+
   /// The contexts associated with the interceptors stored in this pipeline. Context will be removed
   /// once the RPC has completed. Contexts are ordered from outbound to inbound, that is, the tail
   /// is first and the head is last.
@@ -165,6 +168,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
     )
 
     self.contexts = contexts
+    self.setupDeadline()
   }
 
   /// Emit a response part message into the interceptor pipeline.
@@ -223,7 +227,48 @@ extension ClientInterceptorPipeline {
   internal func close() {
     self.eventLoop.assertInEventLoop()
 
-    // TODO: make sure the transport is closed (in case a user interceptor emits an error).
+    // Grab the head, we'll use it to cancel the transport. This is most likely already closed,
+    // but there's nothing to stop an interceptor from emitting its own error and leaving the
+    // transport open.
+    let head = self._head
     self.contexts = nil
+
+    // Cancel the timeout.
+    self.scheduledClose?.cancel()
+    self.scheduledClose = nil
+
+    // Cancel the transport.
+    head?.invokeCancel(promise: nil)
+  }
+
+  /// Sets up a deadline for the pipeline.
+  private func setupDeadline() {
+    if self.eventLoop.inEventLoop {
+      self._setupDeadline()
+    } else {
+      self.eventLoop.execute {
+        self._setupDeadline()
+      }
+    }
+  }
+
+  /// Sets up a deadline for the pipeline.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  private func _setupDeadline() {
+    self.eventLoop.assertInEventLoop()
+
+    let timeLimit = self.details.options.timeLimit
+    let deadline = timeLimit.makeDeadline()
+
+    // There's no point scheduling this.
+    if deadline == .distantFuture {
+      return
+    }
+
+    self.scheduledClose = self.eventLoop.scheduleTask(deadline: deadline) {
+      // When the error hits the tail we'll call 'close()', this will cancel the transport if
+      // necessary.
+      self.read(.error(GRPCError.RPCTimedOut(timeLimit)))
+    }
   }
 }

+ 28 - 12
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -636,24 +636,40 @@ extension ClientTransport {
       "request_parts": "\(self.writeBuffer.count)",
     ], source: "GRPC")
 
-    while self.state.isUnbuffering, let write = self.writeBuffer.popFirst() {
-      self.logger.debug("unbuffering request part", metadata: [
-        "request_part": "\(write.request.name)",
-      ], source: "GRPC")
+    // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
+    // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
+    // may miss more buffered writes.
+    while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
+      // Pull out as many writes as possible.
+      while let write = self.writeBuffer.popFirst() {
+        self.logger.debug("unbuffering request part", metadata: [
+          "request_part": "\(write.request.name)",
+        ], source: "GRPC")
+
+        if !shouldFlush {
+          shouldFlush = self.shouldFlush(after: write.request)
+        }
+
+        self.write(write.request, to: channel, promise: write.promise, flush: false)
+      }
 
-      if !shouldFlush {
-        shouldFlush = self.shouldFlush(after: write.request)
+      // Okay, flush now.
+      if shouldFlush {
+        shouldFlush = false
+        channel.flush()
       }
-      self.write(write.request, to: channel, promise: write.promise, flush: false)
     }
 
-    // Okay, flush now.
-    if shouldFlush {
-      channel.flush()
+    if self.writeBuffer.isEmpty {
+      self.logger.debug("request buffer drained", source: "GRPC")
+    } else {
+      self.logger.notice(
+        "unbuffering aborted",
+        metadata: ["call_state": self.stateForLogging],
+        source: "GRPC"
+      )
     }
 
-    self.logger.debug("request buffer drained", source: "GRPC")
-
     // We're unbuffered. What now?
     self.act(on: self.state.unbuffered())
   }

+ 112 - 9
Tests/GRPCTests/ClientInterceptorPipelineTests.swift

@@ -20,24 +20,26 @@ import NIOHPACK
 import XCTest
 
 class ClientInterceptorPipelineTests: GRPCTestCase {
+  override func setUp() {
+    super.setUp()
+    self.embeddedEventLoop = EmbeddedEventLoop()
+  }
+
+  private var embeddedEventLoop: EmbeddedEventLoop!
+
   private func makePipeline<Request, Response>(
     requests: Request.Type = Request.self,
     responses: Response.Type = Response.self,
+    details: CallDetails? = nil,
     interceptors: [ClientInterceptor<Request, Response>] = [],
     errorDelegate: ClientErrorDelegate? = nil,
-    onCancel: @escaping (EventLoopPromise<Void>?) -> Void = { _ in XCTFail("Unexpected cancel") },
+    onCancel: @escaping (EventLoopPromise<Void>?) -> Void = { _ in },
     onRequestPart: @escaping (ClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void,
     onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
   ) -> ClientInterceptorPipeline<Request, Response> {
     return ClientInterceptorPipeline(
-      eventLoop: EmbeddedEventLoop(),
-      details: CallDetails(
-        type: .unary,
-        path: "ignored",
-        authority: "ignored",
-        scheme: "ignored",
-        options: .init(logger: self.clientLogger)
-      ),
+      eventLoop: self.embeddedEventLoop,
+      details: details ?? self.makeCallDetails(),
       interceptors: interceptors,
       errorDelegate: errorDelegate,
       onCancel: onCancel,
@@ -46,6 +48,16 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     )
   }
 
+  private func makeCallDetails(timeLimit: TimeLimit = .none) -> CallDetails {
+    return CallDetails(
+      type: .unary,
+      path: "ignored",
+      authority: "ignored",
+      scheme: "ignored",
+      options: CallOptions(timeLimit: timeLimit, logger: self.clientLogger)
+    )
+  }
+
   func testEmptyPipeline() throws {
     var requestParts: [ClientRequestPart<String>] = []
     var responseParts: [ClientResponsePart<String>] = []
@@ -115,6 +127,97 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     pipeline.read(.metadata([:]))
   }
 
+  func testPipelineWithTimeout() throws {
+    var cancelled = false
+    var timedOut = false
+
+    class FailOnCancel<Request, Response>: ClientInterceptor<Request, Response> {
+      override func cancel(
+        promise: EventLoopPromise<Void>?,
+        context: ClientInterceptorContext<Request, Response>
+      ) {
+        XCTFail("Unexpected cancellation")
+        context.cancel(promise: promise)
+      }
+    }
+
+    let deadline = NIODeadline.uptimeNanoseconds(100)
+    let pipeline = self.makePipeline(
+      requests: String.self,
+      responses: String.self,
+      details: self.makeCallDetails(timeLimit: .deadline(deadline)),
+      interceptors: [FailOnCancel()],
+      onCancel: { promise in
+        assertThat(cancelled, .is(false))
+        cancelled = true
+        // We don't expect a promise: this cancellation is fired by the pipeline.
+        assertThat(promise, .is(.nil()))
+      },
+      onRequestPart: { _, _ in
+        XCTFail("Unexpected request part")
+      },
+      onResponsePart: { part in
+        assertThat(part.error, .is(.instanceOf(GRPCError.RPCTimedOut.self)))
+        assertThat(timedOut, .is(false))
+        timedOut = true
+      }
+    )
+
+    // Trigger the timeout.
+    self.embeddedEventLoop.advanceTime(to: deadline)
+    assertThat(timedOut, .is(true))
+
+    // We'll receive a cancellation; we only get this 'onCancel' callback. We'll fail in the
+    // interceptor if a cancellation is received.
+    assertThat(cancelled, .is(true))
+
+    // Pipeline should be torn down. Writes and cancellation should fail.
+    let p1 = pipeline.eventLoop.makePromise(of: Void.self)
+    pipeline.write(.end, promise: p1)
+    assertThat(try p1.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
+
+    let p2 = pipeline.eventLoop.makePromise(of: Void.self)
+    pipeline.cancel(promise: p2)
+    assertThat(try p2.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
+
+    // Reads should be ignored too. (We'll fail in `onRequestPart` if this goes through.)
+    pipeline.read(.metadata([:]))
+  }
+
+  func testTimeoutIsCancelledOnCompletion() throws {
+    let deadline = NIODeadline.uptimeNanoseconds(100)
+    var cancellations = 0
+
+    let pipeline = self.makePipeline(
+      requests: String.self,
+      responses: String.self,
+      details: self.makeCallDetails(timeLimit: .deadline(deadline)),
+      onCancel: { promise in
+        assertThat(cancellations, .is(0))
+        cancellations += 1
+        // We don't expect a promise: this cancellation is fired by the pipeline.
+        assertThat(promise, .is(.nil()))
+      },
+      onRequestPart: { _, _ in
+        XCTFail("Unexpected request part")
+      },
+      onResponsePart: { part in
+        // We only expect the end.
+        assertThat(part.end, .is(.notNil()))
+      }
+    )
+
+    // Read the end part.
+    pipeline.read(.end(.ok, [:]))
+    // Just a single cancellation.
+    assertThat(cancellations, .is(1))
+
+    // Pass the deadline.
+    self.embeddedEventLoop.advanceTime(to: deadline)
+    // We should still have just the one cancellation.
+    assertThat(cancellations, .is(1))
+  }
+
   func testPipelineWithInterceptor() throws {
     // We're not testing much here, just that the interceptors are in the right order, from outbound
     // to inbound.

+ 18 - 0
Tests/GRPCTests/XCTestHelpers.swift

@@ -124,6 +124,24 @@ struct Matcher<Value> {
     }
   }
 
+  /// Matches if the value is `nil`.
+  static func `nil`<Value>() -> Matcher<Value?> {
+    return .init { actual in
+      actual == nil
+        ? .match
+        : .noMatch(actual: String(describing: actual), expected: "nil")
+    }
+  }
+
+  /// Matches if the value is not `nil`.
+  static func notNil<Value>() -> Matcher<Value?> {
+    return .init { actual in
+      actual != nil
+        ? .match
+        : .noMatch(actual: "nil", expected: "not nil")
+    }
+  }
+
   // MARK: Type
 
   /// Checks that the actual value is an instance of the given type.

+ 2 - 0
Tests/GRPCTests/XCTestManifests.swift

@@ -116,6 +116,8 @@ extension ClientInterceptorPipelineTests {
         ("testErrorDelegateIsCalled", testErrorDelegateIsCalled),
         ("testPipelineWhenClosed", testPipelineWhenClosed),
         ("testPipelineWithInterceptor", testPipelineWithInterceptor),
+        ("testPipelineWithTimeout", testPipelineWithTimeout),
+        ("testTimeoutIsCancelledOnCompletion", testTimeoutIsCancelledOnCompletion),
     ]
 }