| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- /*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import Foundation
- import NIO
- import NIOHTTP1
- import NIOHTTP2
- import SwiftProtobuf
- /// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework
- /// users.
- ///
- /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
- /// channel will be configured as such:
- ///
- /// ┌───────────────────────────┐
- /// │ GRPCClientChannelHandler │
- /// └─▲───────────────────────┬─┘
- /// GRPCClientResponsePart<T1>│ │GRPCClientRequestPart<T2>
- /// ┌─┴───────────────────────▼─┐
- /// │ GRPCClientCodec │
- /// └─▲───────────────────────┬─┘
- /// RawGRPCClientResponsePart│ │RawGRPCClientRequestPart
- /// ┌─┴───────────────────────▼─┐
- /// │ HTTP1ToRawGRPCClientCodec │
- /// └─▲───────────────────────┬─┘
- /// HTTPClientResponsePart│ │HTTPClientRequestPart
- /// ┌─┴───────────────────────▼─┐
- /// │ HTTP2ToHTTP1ClientCodec │
- /// └─▲───────────────────────┬─┘
- /// HTTP2Frame│ │HTTP2Frame
- /// | |
- ///
- /// Note: below the `HTTP2ToHTTP1ClientCodec` is the "main" pipeline provided by the channel in
- /// `GRPCClientConnection`.
- ///
- /// Setup includes:
- /// - creation of an HTTP/2 stream for the call to execute on,
- /// - configuration of the NIO channel handlers for the stream, and
- /// - setting a call timeout, if one is provided.
- ///
- /// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
- open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
- /// The underlying `GRPCClientConnection` providing the HTTP/2 channel and multiplexer.
- internal let connection: GRPCClientConnection
- /// Promise for an HTTP/2 stream to execute the call on.
- internal let streamPromise: EventLoopPromise<Channel>
- /// Client channel handler. Handles internal state for reading/writing messages to the channel.
- /// The handler also owns the promises for the futures that this class surfaces to the user (such as
- /// `initialMetadata` and `status`).
- internal let clientChannelHandler: GRPCClientChannelHandler<RequestMessage, ResponseMessage>
- /// Sets up a gRPC call.
- ///
- /// A number of actions are performed:
- /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`,
- /// - a callback is registered on the new stream (`subchannel`) to send the request head,
- /// - a timeout is scheduled if one is set in the `callOptions`.
- ///
- /// - Parameters:
- /// - connection: connection containing the HTTP/2 channel and multiplexer to use for this call.
- /// - path: path for this RPC method.
- /// - callOptions: options to use when configuring this call.
- /// - responseObserver: observer for received messages.
- init(
- connection: GRPCClientConnection,
- path: String,
- callOptions: CallOptions,
- responseObserver: ResponseObserver<ResponseMessage>
- ) {
- self.connection = connection
- self.streamPromise = connection.channel.eventLoop.makePromise()
- self.clientChannelHandler = GRPCClientChannelHandler(
- initialMetadataPromise: connection.channel.eventLoop.makePromise(),
- statusPromise: connection.channel.eventLoop.makePromise(),
- responseObserver: responseObserver)
- self.streamPromise.futureResult.whenFailure { error in
- self.clientChannelHandler.observeError(error)
- }
- self.createStreamChannel()
- self.setTimeout(callOptions.timeout)
- }
- }
- extension BaseClientCall: ClientCall {
- public var subchannel: EventLoopFuture<Channel> {
- return self.streamPromise.futureResult
- }
- public var initialMetadata: EventLoopFuture<HTTPHeaders> {
- return self.clientChannelHandler.initialMetadataPromise.futureResult
- }
- public var status: EventLoopFuture<GRPCStatus> {
- return self.clientChannelHandler.statusPromise.futureResult
- }
- // Workaround for: https://bugs.swift.org/browse/SR-10128
- // Once resolved this can become a default implementation on `ClientCall`.
- public var trailingMetadata: EventLoopFuture<HTTPHeaders> {
- return status.map { $0.trailingMetadata }
- }
- public func cancel() {
- self.connection.channel.eventLoop.execute {
- self.subchannel.whenSuccess { channel in
- channel.close(mode: .all, promise: nil)
- }
- }
- }
- }
- extension BaseClientCall {
- /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created.
- ///
- /// - Important: This should only ever be called once.
- private func createStreamChannel() {
- self.connection.channel.eventLoop.execute {
- self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
- subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.httpProtocol),
- HTTP1ToRawGRPCClientCodec(),
- GRPCClientCodec<RequestMessage, ResponseMessage>(),
- self.clientChannelHandler)
- }
- }
- }
- /// Send the request head once `subchannel` becomes available.
- ///
- /// - Important: This should only ever be called once.
- ///
- /// - Parameters:
- /// - requestHead: The request head to send.
- /// - promise: A promise to fulfill once the request head has been sent.
- internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise<Void>?) {
- self.writeAndFlushOnStream(.head(requestHead), promise: promise)
- }
- /// Send the request head once `subchannel` becomes available.
- ///
- /// - Important: This should only ever be called once.
- ///
- /// - Parameter requestHead: The request head to send.
- /// - Returns: A future which will be succeeded once the request head has been sent.
- internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
- let promise = connection.channel.eventLoop.makePromise(of: Void.self)
- self.sendHead(requestHead, promise: promise)
- return promise.futureResult
- }
- /// Send the given message once `subchannel` becomes available.
- ///
- /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
- /// - Parameters:
- /// - message: The message to send.
- /// - promise: A promise to fulfil when the message reaches the network.
- internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
- self.writeAndFlushOnStream(.message(message), promise: promise)
- }
- /// Send the given message once `subchannel` becomes available.
- ///
- /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
- /// - Returns: A future which will be fullfilled when the message reaches the network.
- internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
- let promise = connection.channel.eventLoop.makePromise(of: Void.self)
- self._sendMessage(message, promise: promise)
- return promise.futureResult
- }
- /// Send `end` once `subchannel` becomes available.
- ///
- /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
- /// - Important: This should only ever be called once.
- /// - Parameter promise: A promise to succeed once then end has been sent.
- internal func _sendEnd(promise: EventLoopPromise<Void>?) {
- self.writeAndFlushOnStream(.end, promise: promise)
- }
- /// Send `end` once `subchannel` becomes available.
- ///
- /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
- /// - Important: This should only ever be called once.
- ///- Returns: A future which will be succeeded once the end has been sent.
- internal func _sendEnd() -> EventLoopFuture<Void> {
- let promise = connection.channel.eventLoop.makePromise(of: Void.self)
- self._sendEnd(promise: promise)
- return promise.futureResult
- }
- /// Writes the given request on the future `Channel` for the HTTP/2 stream used to make this call.
- ///
- /// This method is intended to be used by the `sendX` methods in order to ensure that they fail
- /// futures associated with this call should the write fail (e.g. due to a closed connection).
- private func writeAndFlushOnStream(_ request: GRPCClientRequestPart<RequestMessage>, promise: EventLoopPromise<Void>?) {
- // We need to use a promise here; if the write fails then it _must_ be observed by the handler
- // to ensure that any futures given to the user are fulfilled.
- let promise = promise ?? self.connection.channel.eventLoop.makePromise()
- promise.futureResult.whenFailure { error in
- self.clientChannelHandler.observeError(error)
- }
- self.subchannel.cascadeFailure(to: promise)
- self.subchannel.whenSuccess { channel in
- channel.writeAndFlush(NIOAny(request), promise: promise)
- }
- }
- /// Creates a client-side timeout for this call.
- ///
- /// - Important: This should only ever be called once.
- private func setTimeout(_ timeout: GRPCTimeout) {
- if timeout == .infinite { return }
- self.connection.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
- self?.clientChannelHandler.observeError(GRPCError.client(.deadlineExceeded(timeout)))
- }
- }
- /// Makes a new `HTTPRequestHead` for a call with this signature.
- ///
- /// - Parameters:
- /// - path: path for this RPC method.
- /// - host: the address of the host we are connected to.
- /// - callOptions: options to use when configuring this call.
- /// - Returns: `HTTPRequestHead` configured for this call.
- internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
- let method: HTTPMethod = callOptions.cacheable ? .GET : .POST
- var requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: method, uri: path)
- callOptions.customMetadata.forEach { name, value in
- requestHead.headers.add(name: name, value: value)
- }
- // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
- requestHead.headers.add(name: "host", value: host)
- requestHead.headers.add(name: "content-type", value: "application/grpc")
- // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
- requestHead.headers.add(name: "te", value: "trailers")
- //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
- requestHead.headers.add(name: "user-agent", value: "grpc-swift-nio")
- requestHead.headers.add(name: GRPCHeaderName.acceptEncoding, value: CompressionMechanism.acceptEncodingHeader)
- if callOptions.timeout != .infinite {
- requestHead.headers.add(name: GRPCHeaderName.timeout, value: String(describing: callOptions.timeout))
- }
- return requestHead
- }
- }
|