|
|
@@ -14,6 +14,7 @@
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
|
|
|
+import Atomics
|
|
|
import BenchmarkUtils
|
|
|
import Foundation
|
|
|
import GRPC
|
|
|
@@ -168,8 +169,8 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
/// Succeeds after a stop has been requested and all outstanding requests have completed.
|
|
|
private var stopComplete: EventLoopPromise<Void>
|
|
|
|
|
|
- private let running: NIOAtomic<Bool> = .makeAtomic(value: false)
|
|
|
- private let outstanding: NIOAtomic<Int> = .makeAtomic(value: 0)
|
|
|
+ private let running = ManagedAtomic<Bool>(false)
|
|
|
+ private let outstanding = ManagedAtomic<Int>(0)
|
|
|
|
|
|
private var requestMaker: RequestMakerType
|
|
|
|
|
|
@@ -211,15 +212,20 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
// - when a request finishes it will either start a new request or decrement the
|
|
|
// the atomic counter (if we've been told to stop)
|
|
|
// - if the counter drops to zero we're finished.
|
|
|
- let exchangedRunning = self.running.compareAndExchange(expected: false, desired: true)
|
|
|
- precondition(exchangedRunning, "launchRequests should only be called once")
|
|
|
+ let exchangedRunning = self.running.compareExchange(
|
|
|
+ expected: false,
|
|
|
+ desired: true,
|
|
|
+ ordering: .relaxed
|
|
|
+ )
|
|
|
+ precondition(exchangedRunning.exchanged, "launchRequests should only be called once")
|
|
|
|
|
|
// We only decrement the outstanding count when running has been changed back to false.
|
|
|
- let exchangedOutstanding = self.outstanding.compareAndExchange(
|
|
|
+ let exchangedOutstanding = self.outstanding.compareExchange(
|
|
|
expected: 0,
|
|
|
- desired: self.maxPermittedOutstandingRequests
|
|
|
+ desired: self.maxPermittedOutstandingRequests,
|
|
|
+ ordering: .relaxed
|
|
|
)
|
|
|
- precondition(exchangedOutstanding, "launchRequests should only be called once")
|
|
|
+ precondition(exchangedOutstanding.exchanged, "launchRequests should only be called once")
|
|
|
|
|
|
for _ in 0 ..< self.maxPermittedOutstandingRequests {
|
|
|
self.requestMaker.makeRequest().whenComplete { _ in
|
|
|
@@ -229,11 +235,11 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
}
|
|
|
|
|
|
private func makeRequest() {
|
|
|
- if self.running.load() {
|
|
|
+ if self.running.load(ordering: .relaxed) {
|
|
|
self.requestMaker.makeRequest().whenComplete { _ in
|
|
|
self.makeRequest()
|
|
|
}
|
|
|
- } else if self.outstanding.sub(1) == 1 {
|
|
|
+ } else if self.outstanding.loadThenWrappingDecrement(ordering: .relaxed) == 1 {
|
|
|
self.stopIsComplete()
|
|
|
} // else we're no longer running but not all RPCs have finished.
|
|
|
}
|
|
|
@@ -260,7 +266,7 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
/// - returns: A future which can be waited on to signal when all activity has ceased.
|
|
|
func stop() -> EventLoopFuture<Void> {
|
|
|
self.requestMaker.requestStop()
|
|
|
- self.running.store(false)
|
|
|
+ self.running.store(false, ordering: .relaxed)
|
|
|
return self.stopComplete.futureResult
|
|
|
}
|
|
|
}
|