|
|
@@ -20,8 +20,8 @@ import GRPC
|
|
|
import Logging
|
|
|
import NIO
|
|
|
|
|
|
-/// Client to make a series of asynchronous unary calls.
|
|
|
-final class AsyncUnaryQPSClient: QPSClient {
|
|
|
+/// Client to make a series of asynchronous calls.
|
|
|
+final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
|
|
|
private let eventLoopGroup: MultiThreadedEventLoopGroup
|
|
|
private let threadCount: Int
|
|
|
|
|
|
@@ -32,7 +32,7 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
private var statsPeriodStart: DispatchTime
|
|
|
private var cpuStatsPeriodStart: CPUTime
|
|
|
|
|
|
- /// Initialise a client to send unary requests.
|
|
|
+ /// Initialise a client to send requests.
|
|
|
/// - parameters:
|
|
|
/// - config: Config from the driver specifying how the client should behave.
|
|
|
init(config: Grpc_Testing_ClientConfig) throws {
|
|
|
@@ -51,7 +51,7 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
self.statsPeriodStart = grpcTimeNow()
|
|
|
self.cpuStatsPeriodStart = getResourceUsage()
|
|
|
|
|
|
- let requestMessage = try AsyncUnaryQPSClient
|
|
|
+ let requestMessage = try AsyncQPSClient
|
|
|
.makeClientRequest(payloadConfig: config.payloadConfig)
|
|
|
|
|
|
// Start the requested number of channels.
|
|
|
@@ -158,12 +158,9 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
/// Class to manage a channel. Repeatedly makes requests on that channel and records what happens.
|
|
|
private class ChannelRepeater {
|
|
|
private let connection: ClientConnection
|
|
|
- private let client: Grpc_Testing_BenchmarkServiceClient
|
|
|
- private let requestMessage: Grpc_Testing_SimpleRequest
|
|
|
- private let logger = Logger(label: "ChannelRepeater")
|
|
|
private let maxPermittedOutstandingRequests: Int
|
|
|
|
|
|
- private var stats: StatsWithLock
|
|
|
+ private let stats: StatsWithLock
|
|
|
|
|
|
/// Has a stop been requested - if it has don't submit any more
|
|
|
/// requests and when all existing requests are complete signal
|
|
|
@@ -173,6 +170,8 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
private var stopComplete: EventLoopPromise<Void>
|
|
|
private var numberOfOutstandingRequests = 0
|
|
|
|
|
|
+ private var requestMaker: RequestMakerType
|
|
|
+
|
|
|
init(target: HostAndPort,
|
|
|
requestMessage: Grpc_Testing_SimpleRequest,
|
|
|
config: Grpc_Testing_ClientConfig,
|
|
|
@@ -180,17 +179,27 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
// TODO: Support TLS if requested.
|
|
|
self.connection = ClientConnection.insecure(group: eventLoopGroup)
|
|
|
.connect(host: target.host, port: target.port)
|
|
|
- self.client = Grpc_Testing_BenchmarkServiceClient(channel: self.connection)
|
|
|
- self.requestMessage = requestMessage
|
|
|
+
|
|
|
+ let logger = Logger(label: "ChannelRepeater")
|
|
|
+ let client = Grpc_Testing_BenchmarkServiceClient(channel: self.connection)
|
|
|
self.maxPermittedOutstandingRequests = Int(config.outstandingRpcsPerChannel)
|
|
|
self.stopComplete = self.connection.eventLoop.makePromise()
|
|
|
self.stats = StatsWithLock()
|
|
|
+
|
|
|
+ self.requestMaker = RequestMakerType(
|
|
|
+ config: config,
|
|
|
+ client: client,
|
|
|
+ requestMessage: requestMessage,
|
|
|
+ logger: logger,
|
|
|
+ stats: self.stats
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
/// Launch as many requests as allowed on the channel.
|
|
|
/// This must be called from the connection eventLoop.
|
|
|
private func launchRequests() {
|
|
|
- precondition(self.connection.eventLoop.inEventLoop)
|
|
|
+ self.connection.eventLoop.preconditionInEventLoop()
|
|
|
+
|
|
|
while self.canMakeRequest {
|
|
|
self.makeRequestAndRepeat()
|
|
|
}
|
|
|
@@ -198,40 +207,32 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
|
|
|
/// Returns if it is permissible to make another request - ie we've not been asked to stop, and we're not at the limit of outstanding requests.
|
|
|
private var canMakeRequest: Bool {
|
|
|
+ self.connection.eventLoop.assertInEventLoop()
|
|
|
return !self.stopRequested
|
|
|
&& self.numberOfOutstandingRequests < self.maxPermittedOutstandingRequests
|
|
|
}
|
|
|
|
|
|
/// If there is spare permitted capacity make a request and repeat when it is done.
|
|
|
private func makeRequestAndRepeat() {
|
|
|
+ self.connection.eventLoop.preconditionInEventLoop()
|
|
|
// Check for capacity.
|
|
|
if !self.canMakeRequest {
|
|
|
return
|
|
|
}
|
|
|
- let startTime = grpcTimeNow()
|
|
|
self.numberOfOutstandingRequests += 1
|
|
|
- let result = self.client.unaryCall(self.requestMessage)
|
|
|
+ let resultStatus = self.requestMaker.makeRequest()
|
|
|
|
|
|
// Wait for the request to complete.
|
|
|
- result.status.whenSuccess { status in
|
|
|
- self.requestCompleted(status: status, startTime: startTime)
|
|
|
+ resultStatus.whenSuccess { status in
|
|
|
+ self.requestCompleted(status: status)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// Call when a request has completed.
|
|
|
/// Records stats and attempts to make more requests if there is available capacity.
|
|
|
- private func requestCompleted(status: GRPCStatus, startTime: DispatchTime) {
|
|
|
- precondition(self.connection.eventLoop.inEventLoop)
|
|
|
+ private func requestCompleted(status: GRPCStatus) {
|
|
|
+ self.connection.eventLoop.preconditionInEventLoop()
|
|
|
self.numberOfOutstandingRequests -= 1
|
|
|
- if status.isOk {
|
|
|
- let endTime = grpcTimeNow()
|
|
|
- self.recordLatency(endTime - startTime)
|
|
|
- } else {
|
|
|
- self.logger.error(
|
|
|
- "Bad status from unary request",
|
|
|
- metadata: ["status": "\(status)"]
|
|
|
- )
|
|
|
- }
|
|
|
if self.stopRequested, self.numberOfOutstandingRequests == 0 {
|
|
|
self.stopIsComplete()
|
|
|
} else {
|
|
|
@@ -240,10 +241,6 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func recordLatency(_ latency: Nanoseconds) {
|
|
|
- self.stats.add(latency: Double(latency.value))
|
|
|
- }
|
|
|
-
|
|
|
/// Get stats for sending to the driver.
|
|
|
/// - parameters:
|
|
|
/// - reset: Should the stats reset after copying.
|
|
|
@@ -275,6 +272,7 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
func stop() -> EventLoopFuture<Void> {
|
|
|
self.connection.eventLoop.execute {
|
|
|
self.stopRequested = true
|
|
|
+ self.requestMaker.requestStop()
|
|
|
if self.numberOfOutstandingRequests == 0 {
|
|
|
self.stopIsComplete()
|
|
|
}
|
|
|
@@ -291,9 +289,9 @@ final class AsyncUnaryQPSClient: QPSClient {
|
|
|
func makeAsyncClient(config: Grpc_Testing_ClientConfig) throws -> QPSClient {
|
|
|
switch config.rpcType {
|
|
|
case .unary:
|
|
|
- return try AsyncUnaryQPSClient(config: config)
|
|
|
+ return try AsyncQPSClient<AsyncUnaryRequestMaker>(config: config)
|
|
|
case .streaming:
|
|
|
- throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
|
|
|
+ return try AsyncQPSClient<AsyncPingPongRequestMaker>(config: config)
|
|
|
case .streamingFromClient:
|
|
|
throw GRPCStatus(code: .unimplemented, message: "Client Type not implemented")
|
|
|
case .streamingFromServer:
|