|
|
@@ -169,7 +169,7 @@ internal class ChannelTransport<Request, Response> {
|
|
|
switch result {
|
|
|
case let .success(mux):
|
|
|
mux.createStreamChannel(promise: streamPromise) { stream in
|
|
|
- logger.trace("created http/2 stream")
|
|
|
+ logger.trace("created http/2 stream", source: "GRPC")
|
|
|
|
|
|
return stream.pipeline.addHandlers([
|
|
|
_GRPCClientChannelHandler(callType: callType, logger: logger),
|
|
|
@@ -262,7 +262,7 @@ extension ChannelTransport: ClientCallOutbound {
|
|
|
///
|
|
|
/// Does not have to be called from the event loop.
|
|
|
internal func cancel(promise: EventLoopPromise<Void>?) {
|
|
|
- self.logger.info("rpc cancellation requested")
|
|
|
+ self.logger.info("rpc cancellation requested", source: "GRPC")
|
|
|
|
|
|
if self.eventLoop.inEventLoop {
|
|
|
self.handleError(GRPCError.RPCCancelledByClient().captureContext(), promise: promise)
|
|
|
@@ -371,7 +371,7 @@ extension ChannelTransport {
|
|
|
self.logger.debug("buffering request part", metadata: [
|
|
|
"request_part": "\(part.name)",
|
|
|
"call_state": "\(self.describeCallState())",
|
|
|
- ])
|
|
|
+ ], source: "GRPC")
|
|
|
self.requestBuffer.append(BufferedRequest(message: part, promise: promise))
|
|
|
if flush {
|
|
|
self.requestBuffer.mark()
|
|
|
@@ -379,7 +379,11 @@ extension ChannelTransport {
|
|
|
|
|
|
// We have an active stream, just pass the write and promise through.
|
|
|
case let .active(stream):
|
|
|
- self.logger.debug("writing request part", metadata: ["request_part": "\(part.name)"])
|
|
|
+ self.logger.debug(
|
|
|
+ "writing request part",
|
|
|
+ metadata: ["request_part": "\(part.name)"],
|
|
|
+ source: "GRPC"
|
|
|
+ )
|
|
|
stream.write(part, promise: promise)
|
|
|
if flush {
|
|
|
stream.flush()
|
|
|
@@ -390,7 +394,7 @@ extension ChannelTransport {
|
|
|
self.logger.debug("dropping request part", metadata: [
|
|
|
"request_part": "\(part.name)",
|
|
|
"call_state": "\(self.describeCallState())",
|
|
|
- ])
|
|
|
+ ], source: "GRPC")
|
|
|
promise?.fail(ChannelError.ioOnClosedChannel)
|
|
|
}
|
|
|
}
|
|
|
@@ -521,7 +525,11 @@ extension ChannelTransport: ClientCallInbound {
|
|
|
preconditionFailure("Received response part in 'buffering' state")
|
|
|
|
|
|
case .active:
|
|
|
- self.logger.debug("received response part", metadata: ["response_part": "\(part.name)"])
|
|
|
+ self.logger.debug(
|
|
|
+ "received response part",
|
|
|
+ metadata: ["response_part": "\(part.name)"],
|
|
|
+ source: "GRPC"
|
|
|
+ )
|
|
|
|
|
|
switch part {
|
|
|
case let .initialMetadata(metadata):
|
|
|
@@ -557,7 +565,7 @@ extension ChannelTransport: ClientCallInbound {
|
|
|
self.logger.debug("dropping response part", metadata: [
|
|
|
"response_part": "\(part.name)",
|
|
|
"call_state": "\(self.describeCallState())",
|
|
|
- ])
|
|
|
+ ], source: "GRPC")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -579,7 +587,8 @@ extension ChannelTransport: ClientCallInbound {
|
|
|
|
|
|
self.logger.debug(
|
|
|
"unbuffering request part",
|
|
|
- metadata: ["request_part": "\(request.message.name)"]
|
|
|
+ metadata: ["request_part": "\(request.message.name)"],
|
|
|
+ source: "GRPC"
|
|
|
)
|
|
|
stream.write(request.message, promise: request.promise)
|
|
|
if shouldFlush {
|
|
|
@@ -587,7 +596,7 @@ extension ChannelTransport: ClientCallInbound {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- self.logger.debug("request buffer drained")
|
|
|
+ self.logger.debug("request buffer drained", source: "GRPC")
|
|
|
self.state = .active(stream)
|
|
|
|
|
|
case .active:
|
|
|
@@ -620,7 +629,7 @@ extension ChannelTransport {
|
|
|
private func startTimer() {
|
|
|
assert(self.stopwatch == nil)
|
|
|
self.stopwatch = Stopwatch()
|
|
|
- self.logger.debug("starting rpc")
|
|
|
+ self.logger.debug("starting rpc", source: "GRPC")
|
|
|
}
|
|
|
|
|
|
private func stopTimer(status: GRPCStatus) {
|
|
|
@@ -632,7 +641,7 @@ extension ChannelTransport {
|
|
|
"duration_ms": "\(millis)",
|
|
|
"status_code": "\(status.code.rawValue)",
|
|
|
"status_message": "\(status.message ?? "nil")",
|
|
|
- ])
|
|
|
+ ], source: "GRPC")
|
|
|
self.stopwatch = nil
|
|
|
}
|
|
|
}
|