|
|
@@ -165,13 +165,11 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
|
|
|
private let stats: StatsWithLock
|
|
|
|
|
|
- /// Has a stop been requested - if it has don't submit any more
|
|
|
- /// requests and when all existing requests are complete signal
|
|
|
- /// using `stopComplete`
|
|
|
- private var stopRequested = false
|
|
|
/// Succeeds after a stop has been requested and all outstanding requests have completed.
|
|
|
private var stopComplete: EventLoopPromise<Void>
|
|
|
- private var numberOfOutstandingRequests = 0
|
|
|
+
|
|
|
+ private let running: NIOAtomic<Bool> = .makeAtomic(value: false)
|
|
|
+ private let outstanding: NIOAtomic<Int> = .makeAtomic(value: 0)
|
|
|
|
|
|
private var requestMaker: RequestMakerType
|
|
|
|
|
|
@@ -205,50 +203,39 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
)
|
|
|
}
|
|
|
|
|
|
- /// Launch as many requests as allowed on the channel.
|
|
|
- /// This must be called from the connection eventLoop.
|
|
|
+ /// Launch as many requests as allowed on the channel. Must only be called once.
|
|
|
private func launchRequests() {
|
|
|
- self.eventLoop.preconditionInEventLoop()
|
|
|
-
|
|
|
- while self.canMakeRequest {
|
|
|
- self.makeRequestAndRepeat()
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /// Returns if it is permissible to make another request - ie we've not been asked to stop, and we're not at the limit of outstanding requests.
|
|
|
- private var canMakeRequest: Bool {
|
|
|
- self.eventLoop.assertInEventLoop()
|
|
|
- return !self.stopRequested
|
|
|
- && self.numberOfOutstandingRequests < self.maxPermittedOutstandingRequests
|
|
|
- }
|
|
|
-
|
|
|
- /// If there is spare permitted capacity make a request and repeat when it is done.
|
|
|
- private func makeRequestAndRepeat() {
|
|
|
- self.eventLoop.preconditionInEventLoop()
|
|
|
- // Check for capacity.
|
|
|
- if !self.canMakeRequest {
|
|
|
- return
|
|
|
- }
|
|
|
- self.numberOfOutstandingRequests += 1
|
|
|
- let resultStatus = self.requestMaker.makeRequest()
|
|
|
+ // The plan here is:
|
|
|
+ // - store the max number of outstanding requests in an atomic
|
|
|
+ // - start that many requests asynchronously
|
|
|
+ // - when a request finishes it will either start a new request or decrement the
|
|
|
+ // the atomic counter (if we've been told to stop)
|
|
|
+ // - if the counter drops to zero we're finished.
|
|
|
+ let exchangedRunning = self.running.compareAndExchange(expected: false, desired: true)
|
|
|
+ precondition(exchangedRunning, "launchRequests should only be called once")
|
|
|
+
|
|
|
+ // We only decrement the outstanding count when running has been changed back to false.
|
|
|
+ let exchangedOutstanding = self.outstanding.compareAndExchange(
|
|
|
+ expected: 0,
|
|
|
+ desired: self.maxPermittedOutstandingRequests
|
|
|
+ )
|
|
|
+ precondition(exchangedOutstanding, "launchRequests should only be called once")
|
|
|
|
|
|
- // Wait for the request to complete.
|
|
|
- resultStatus.whenSuccess { status in
|
|
|
- self.requestCompleted(status: status)
|
|
|
+ for _ in 0 ..< self.maxPermittedOutstandingRequests {
|
|
|
+ self.requestMaker.makeRequest().whenComplete { _ in
|
|
|
+ self.makeRequest()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Call when a request has completed.
|
|
|
- /// Records stats and attempts to make more requests if there is available capacity.
|
|
|
- private func requestCompleted(status: GRPCStatus) {
|
|
|
- self.eventLoop.preconditionInEventLoop()
|
|
|
- self.numberOfOutstandingRequests -= 1
|
|
|
- if self.stopRequested, self.numberOfOutstandingRequests == 0 {
|
|
|
+ private func makeRequest() {
|
|
|
+ if self.running.load() {
|
|
|
+ self.requestMaker.makeRequest().whenComplete { _ in
|
|
|
+ self.makeRequest()
|
|
|
+ }
|
|
|
+ } else if self.outstanding.sub(1) == 1 {
|
|
|
self.stopIsComplete()
|
|
|
- } else {
|
|
|
- // Try scheduling another request.
|
|
|
- self.makeRequestAndRepeat()
|
|
|
- }
|
|
|
+ } // else we're no longer running but not all RPCs have finished.
|
|
|
}
|
|
|
|
|
|
/// Get stats for sending to the driver.
|
|
|
@@ -261,18 +248,10 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
|
|
|
/// Start sending requests to the server.
|
|
|
func start() {
|
|
|
- if self.eventLoop.inEventLoop {
|
|
|
- self.launchRequests()
|
|
|
- } else {
|
|
|
- self.eventLoop.execute {
|
|
|
- self.launchRequests()
|
|
|
- }
|
|
|
- }
|
|
|
+ self.launchRequests()
|
|
|
}
|
|
|
|
|
|
private func stopIsComplete() {
|
|
|
- assert(self.stopRequested)
|
|
|
- assert(self.numberOfOutstandingRequests == 0)
|
|
|
// Close the connection then signal done.
|
|
|
self.channel.close().cascade(to: self.stopComplete)
|
|
|
}
|
|
|
@@ -280,13 +259,8 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
/// Stop sending requests to the server.
|
|
|
/// - returns: A future which can be waited on to signal when all activity has ceased.
|
|
|
func stop() -> EventLoopFuture<Void> {
|
|
|
- self.eventLoop.execute {
|
|
|
- self.stopRequested = true
|
|
|
- self.requestMaker.requestStop()
|
|
|
- if self.numberOfOutstandingRequests == 0 {
|
|
|
- self.stopIsComplete()
|
|
|
- }
|
|
|
- }
|
|
|
+ self.requestMaker.requestStop()
|
|
|
+ self.running.store(false)
|
|
|
return self.stopComplete.futureResult
|
|
|
}
|
|
|
}
|