|
|
@@ -29,8 +29,8 @@ import SwiftProtobuf
|
|
|
///
|
|
|
/// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
|
|
|
open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
|
|
|
- /// The underlying `GRPCClient` providing the HTTP/2 channel and multiplexer.
|
|
|
- internal let client: GRPCClient
|
|
|
+ /// The underlying `GRPCClientConnection` providing the HTTP/2 channel and multiplexer.
|
|
|
+ internal let connection: GRPCClientConnection
|
|
|
|
|
|
/// Promise for an HTTP/2 stream to execute the call on.
|
|
|
internal let streamPromise: EventLoopPromise<Channel>
|
|
|
@@ -48,21 +48,21 @@ open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
|
|
|
/// - a timeout is scheduled if one is set in the `callOptions`.
|
|
|
///
|
|
|
/// - Parameters:
|
|
|
- /// - client: client containing the HTTP/2 channel and multiplexer to use for this call.
|
|
|
+ /// - connection: connection containing the HTTP/2 channel and multiplexer to use for this call.
|
|
|
/// - path: path for this RPC method.
|
|
|
/// - callOptions: options to use when configuring this call.
|
|
|
/// - responseObserver: observer for received messages.
|
|
|
init(
|
|
|
- client: GRPCClient,
|
|
|
+ connection: GRPCClientConnection,
|
|
|
path: String,
|
|
|
callOptions: CallOptions,
|
|
|
responseObserver: ResponseObserver<ResponseMessage>
|
|
|
) {
|
|
|
- self.client = client
|
|
|
- self.streamPromise = client.channel.eventLoop.makePromise()
|
|
|
+ self.connection = connection
|
|
|
+ self.streamPromise = connection.channel.eventLoop.makePromise()
|
|
|
self.clientChannelHandler = GRPCClientChannelHandler(
|
|
|
- initialMetadataPromise: client.channel.eventLoop.makePromise(),
|
|
|
- statusPromise: client.channel.eventLoop.makePromise(),
|
|
|
+ initialMetadataPromise: connection.channel.eventLoop.makePromise(),
|
|
|
+ statusPromise: connection.channel.eventLoop.makePromise(),
|
|
|
responseObserver: responseObserver)
|
|
|
|
|
|
self.createStreamChannel()
|
|
|
@@ -90,7 +90,7 @@ extension BaseClientCall: ClientCall {
|
|
|
}
|
|
|
|
|
|
public func cancel() {
|
|
|
- self.client.channel.eventLoop.execute {
|
|
|
+ self.connection.channel.eventLoop.execute {
|
|
|
self.subchannel.whenSuccess { channel in
|
|
|
channel.close(mode: .all, promise: nil)
|
|
|
}
|
|
|
@@ -103,9 +103,9 @@ extension BaseClientCall {
|
|
|
///
|
|
|
/// - Important: This should only ever be called once.
|
|
|
private func createStreamChannel() {
|
|
|
- self.client.channel.eventLoop.execute {
|
|
|
- self.client.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
|
|
|
- subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.client.httpProtocol),
|
|
|
+ self.connection.channel.eventLoop.execute {
|
|
|
+ self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
|
|
|
+ subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.httpProtocol),
|
|
|
HTTP1ToRawGRPCClientCodec(),
|
|
|
GRPCClientCodec<RequestMessage, ResponseMessage>(),
|
|
|
self.clientChannelHandler)
|
|
|
@@ -133,7 +133,7 @@ extension BaseClientCall {
|
|
|
/// - Parameter requestHead: The request head to send.
|
|
|
/// - Returns: A future which will be succeeded once the request head has been sent.
|
|
|
internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
|
|
|
- let promise = client.channel.eventLoop.makePromise(of: Void.self)
|
|
|
+ let promise = connection.channel.eventLoop.makePromise(of: Void.self)
|
|
|
self.sendHead(requestHead, promise: promise)
|
|
|
return promise.futureResult
|
|
|
}
|
|
|
@@ -155,7 +155,7 @@ extension BaseClientCall {
|
|
|
/// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
|
|
|
/// - Returns: A future which will be fullfilled when the message reaches the network.
|
|
|
internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
|
|
|
- let promise = client.channel.eventLoop.makePromise(of: Void.self)
|
|
|
+ let promise = connection.channel.eventLoop.makePromise(of: Void.self)
|
|
|
self._sendMessage(message, promise: promise)
|
|
|
return promise.futureResult
|
|
|
}
|
|
|
@@ -177,7 +177,7 @@ extension BaseClientCall {
|
|
|
/// - Important: This should only ever be called once.
|
|
|
///- Returns: A future which will be succeeded once the end has been sent.
|
|
|
internal func _sendEnd() -> EventLoopFuture<Void> {
|
|
|
- let promise = client.channel.eventLoop.makePromise(of: Void.self)
|
|
|
+ let promise = connection.channel.eventLoop.makePromise(of: Void.self)
|
|
|
self._sendEnd(promise: promise)
|
|
|
return promise.futureResult
|
|
|
}
|
|
|
@@ -188,7 +188,7 @@ extension BaseClientCall {
|
|
|
private func setTimeout(_ timeout: GRPCTimeout) {
|
|
|
if timeout == .infinite { return }
|
|
|
|
|
|
- self.client.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
|
|
|
+ self.connection.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
|
|
|
self?.clientChannelHandler.observeError(.client(.deadlineExceeded(timeout)))
|
|
|
}
|
|
|
}
|