|
|
@@ -62,7 +62,7 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
target: serverTargets[channelNumber % serverTargets.count],
|
|
|
requestMessage: requestMessage,
|
|
|
config: config,
|
|
|
- eventLoopGroup: eventLoopGroup
|
|
|
+ eventLoop: eventLoopGroup.next()
|
|
|
)
|
|
|
}
|
|
|
|
|
|
@@ -159,7 +159,8 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
|
|
|
/// Class to manage a channel. Repeatedly makes requests on that channel and records what happens.
|
|
|
private class ChannelRepeater {
|
|
|
- private let connection: ClientConnection
|
|
|
+ private let channel: GRPCChannel
|
|
|
+ private let eventLoop: EventLoop
|
|
|
private let maxPermittedOutstandingRequests: Int
|
|
|
|
|
|
private let stats: StatsWithLock
|
|
|
@@ -174,18 +175,25 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
|
|
|
private var requestMaker: RequestMakerType
|
|
|
|
|
|
- init(target: HostAndPort,
|
|
|
- requestMessage: Grpc_Testing_SimpleRequest,
|
|
|
- config: Grpc_Testing_ClientConfig,
|
|
|
- eventLoopGroup: EventLoopGroup) {
|
|
|
+ init(
|
|
|
+ target: ConnectionTarget,
|
|
|
+ requestMessage: Grpc_Testing_SimpleRequest,
|
|
|
+ config: Grpc_Testing_ClientConfig,
|
|
|
+ eventLoop: EventLoop
|
|
|
+ ) {
|
|
|
+ self.eventLoop = eventLoop
|
|
|
+ // 'try!' is fine; it'll only throw if we can't make an SSL context
|
|
|
// TODO: Support TLS if requested.
|
|
|
- self.connection = ClientConnection.insecure(group: eventLoopGroup)
|
|
|
- .connect(host: target.host, port: target.port)
|
|
|
+ self.channel = try! GRPCChannelPool.with(
|
|
|
+ target: target,
|
|
|
+ transportSecurity: .plaintext,
|
|
|
+ eventLoopGroup: eventLoop
|
|
|
+ )
|
|
|
|
|
|
let logger = Logger(label: "ChannelRepeater")
|
|
|
- let client = Grpc_Testing_BenchmarkServiceClient(channel: self.connection)
|
|
|
+ let client = Grpc_Testing_BenchmarkServiceClient(channel: self.channel)
|
|
|
self.maxPermittedOutstandingRequests = Int(config.outstandingRpcsPerChannel)
|
|
|
- self.stopComplete = self.connection.eventLoop.makePromise()
|
|
|
+ self.stopComplete = eventLoop.makePromise()
|
|
|
self.stats = StatsWithLock()
|
|
|
|
|
|
self.requestMaker = RequestMakerType(
|
|
|
@@ -200,7 +208,7 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
/// Launch as many requests as allowed on the channel.
|
|
|
/// This must be called from the connection eventLoop.
|
|
|
private func launchRequests() {
|
|
|
- self.connection.eventLoop.preconditionInEventLoop()
|
|
|
+ self.eventLoop.preconditionInEventLoop()
|
|
|
|
|
|
while self.canMakeRequest {
|
|
|
self.makeRequestAndRepeat()
|
|
|
@@ -209,14 +217,14 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
|
|
|
/// Returns if it is permissible to make another request - ie we've not been asked to stop, and we're not at the limit of outstanding requests.
|
|
|
private var canMakeRequest: Bool {
|
|
|
- self.connection.eventLoop.assertInEventLoop()
|
|
|
+ self.eventLoop.assertInEventLoop()
|
|
|
return !self.stopRequested
|
|
|
&& self.numberOfOutstandingRequests < self.maxPermittedOutstandingRequests
|
|
|
}
|
|
|
|
|
|
/// If there is spare permitted capacity make a request and repeat when it is done.
|
|
|
private func makeRequestAndRepeat() {
|
|
|
- self.connection.eventLoop.preconditionInEventLoop()
|
|
|
+ self.eventLoop.preconditionInEventLoop()
|
|
|
// Check for capacity.
|
|
|
if !self.canMakeRequest {
|
|
|
return
|
|
|
@@ -233,7 +241,7 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
/// Call when a request has completed.
|
|
|
/// Records stats and attempts to make more requests if there is available capacity.
|
|
|
private func requestCompleted(status: GRPCStatus) {
|
|
|
- self.connection.eventLoop.preconditionInEventLoop()
|
|
|
+ self.eventLoop.preconditionInEventLoop()
|
|
|
self.numberOfOutstandingRequests -= 1
|
|
|
if self.stopRequested, self.numberOfOutstandingRequests == 0 {
|
|
|
self.stopIsComplete()
|
|
|
@@ -253,10 +261,10 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
|
|
|
/// Start sending requests to the server.
|
|
|
func start() {
|
|
|
- if self.connection.eventLoop.inEventLoop {
|
|
|
+ if self.eventLoop.inEventLoop {
|
|
|
self.launchRequests()
|
|
|
} else {
|
|
|
- self.connection.eventLoop.execute {
|
|
|
+ self.eventLoop.execute {
|
|
|
self.launchRequests()
|
|
|
}
|
|
|
}
|
|
|
@@ -266,13 +274,13 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
assert(self.stopRequested)
|
|
|
assert(self.numberOfOutstandingRequests == 0)
|
|
|
// Close the connection then signal done.
|
|
|
- self.connection.close().cascade(to: self.stopComplete)
|
|
|
+ self.channel.close().cascade(to: self.stopComplete)
|
|
|
}
|
|
|
|
|
|
/// Stop sending requests to the server.
|
|
|
/// - returns: A future which can be waited on to signal when all activity has ceased.
|
|
|
func stop() -> EventLoopFuture<Void> {
|
|
|
- self.connection.eventLoop.execute {
|
|
|
+ self.eventLoop.execute {
|
|
|
self.stopRequested = true
|
|
|
self.requestMaker.requestStop()
|
|
|
if self.numberOfOutstandingRequests == 0 {
|