BaseClientCall.swift 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework
  23. /// users.
  24. ///
  25. /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
  26. /// channel will be configured as such:
  27. ///
  28. /// ┌──────────────────────────────┐
  29. /// │ ClientResponseChannelHandler │
  30. /// └────────────▲─────────────────┘
  31. /// │ ┌─────────────────────────────┐
  32. /// │ │ ClientRequestChannelHandler │
  33. /// │ └────────────────┬────────────┘
  34. /// GRPCClientResponsePart<T1>│ │GRPCClientRequestPart<T2>
  35. /// ┌─┴───────────────────────▼─┐
  36. /// │ GRPCClientCodec │
  37. /// └─▲───────────────────────┬─┘
  38. /// RawGRPCClientResponsePart│ │RawGRPCClientRequestPart
  39. /// ┌─┴───────────────────────▼─┐
  40. /// │ HTTP1ToRawGRPCClientCodec │
  41. /// └─▲───────────────────────┬─┘
  42. /// HTTPClientResponsePart│ │HTTPClientRequestPart
  43. /// ┌─┴───────────────────────▼─┐
  44. /// │ HTTP2ToHTTP1ClientCodec │
  45. /// └─▲───────────────────────┬─┘
  46. /// HTTP2Frame│ │HTTP2Frame
  47. /// | |
  48. ///
  49. /// Note: below the `HTTP2ToHTTP1ClientCodec` is the "main" pipeline provided by the channel in
  50. /// `ClientConnection`.
  51. ///
  52. /// Setup includes:
  53. /// - creation of an HTTP/2 stream for the call to execute on,
  54. /// - configuration of the NIO channel handlers for the stream, and
  55. /// - setting a call timeout, if one is provided.
  56. ///
  57. /// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
  58. open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
  59. internal let logger: Logger
  60. /// The underlying `ClientConnection` providing the HTTP/2 channel and multiplexer.
  61. internal let connection: ClientConnection
  62. /// Promise for an HTTP/2 stream to execute the call on.
  63. internal let streamPromise: EventLoopPromise<Channel>
  64. /// Channel handler for responses.
  65. internal let responseHandler: ClientResponseChannelHandler<ResponseMessage>
  66. /// Channel handler for requests.
  67. internal let requestHandler: ClientRequestChannelHandler<RequestMessage>
  68. // Note: documentation is inherited from the `ClientCall` protocol.
  69. public let subchannel: EventLoopFuture<Channel>
  70. public let initialMetadata: EventLoopFuture<HTTPHeaders>
  71. public let trailingMetadata: EventLoopFuture<HTTPHeaders>
  72. public let status: EventLoopFuture<GRPCStatus>
  73. /// Sets up a gRPC call.
  74. ///
  75. /// This involves creating a new HTTP/2 stream on the multiplexer provided by `connection`. The
  76. /// channel associated with the stream is configured to use the provided request and response
  77. /// handlers. Note that the request head will be sent automatically from the request handler when
  78. /// the channel becomes active.
  79. ///
  80. /// - Parameters:
  81. /// - connection: connection containing the HTTP/2 channel and multiplexer to use for this call.
  82. /// - responseHandler: a channel handler for receiving responses.
  83. /// - requestHandler: a channel handler for sending requests.
  84. init(
  85. connection: ClientConnection,
  86. responseHandler: ClientResponseChannelHandler<ResponseMessage>,
  87. requestHandler: ClientRequestChannelHandler<RequestMessage>,
  88. logger: Logger
  89. ) {
  90. self.logger = logger
  91. self.connection = connection
  92. self.responseHandler = responseHandler
  93. self.requestHandler = requestHandler
  94. self.streamPromise = connection.channel.eventLoop.makePromise()
  95. self.subchannel = self.streamPromise.futureResult
  96. self.initialMetadata = self.responseHandler.initialMetadataPromise.futureResult
  97. self.trailingMetadata = self.responseHandler.trailingMetadataPromise.futureResult
  98. self.status = self.responseHandler.statusPromise.futureResult
  99. self.streamPromise.futureResult.whenFailure { error in
  100. self.logger.error("failed to create http/2 stream", metadata: [MetadataKey.error: "\(error)"])
  101. self.responseHandler.observeError(.unknown(error, origin: .client))
  102. }
  103. self.createStreamChannel()
  104. self.responseHandler.scheduleTimeout(eventLoop: connection.eventLoop)
  105. }
  106. /// Creates and configures an HTTP/2 stream channel. The `self.subchannel` future will hold the
  107. /// stream channel once it has been created.
  108. private func createStreamChannel() {
  109. self.connection.multiplexer.whenFailure { error in
  110. self.logger.error("failed to get http/2 multiplexer", metadata: [MetadataKey.error: "\(error)"])
  111. self.streamPromise.fail(error)
  112. }
  113. self.connection.multiplexer.whenSuccess { multiplexer in
  114. multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
  115. subchannel.pipeline.addHandlers(
  116. HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.configuration.httpProtocol),
  117. HTTP1ToRawGRPCClientCodec(logger: self.logger),
  118. GRPCClientCodec<RequestMessage, ResponseMessage>(logger: self.logger),
  119. self.requestHandler,
  120. self.responseHandler)
  121. }
  122. }
  123. }
  124. }
  125. extension BaseClientCall: ClientCall {
  126. public func cancel() {
  127. self.logger.info("cancelling call")
  128. self.connection.channel.eventLoop.execute {
  129. self.subchannel.whenComplete { result in
  130. switch result {
  131. case .success(let channel):
  132. self.logger.debug("firing .cancelled event")
  133. channel.pipeline.fireUserInboundEventTriggered(GRPCClientUserEvent.cancelled)
  134. case .failure(let error):
  135. self.logger.debug(
  136. "cancelling call will no-op because no http/2 stream creation",
  137. metadata: [MetadataKey.error: "\(error)"]
  138. )
  139. }
  140. }
  141. }
  142. }
  143. }
  144. /// Makes a request head.
  145. ///
  146. /// - Parameter path: The path of the gRPC call, e.g. "/serviceName/methodName".
  147. /// - Parameter host: The host serving the call.
  148. /// - Parameter callOptions: Options used when making this call.
  149. /// - Parameter requestID: The request ID used for this call. If `callOptions` specifies a
  150. /// non-nil `reqeuestIDHeader` then this request ID will be added to the headers with the
  151. /// specified header name.
  152. internal func makeRequestHead(path: String, host: String, callOptions: CallOptions, requestID: String) -> HTTPRequestHead {
  153. var headers: HTTPHeaders = [
  154. "content-type": "application/grpc",
  155. // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  156. "te": "trailers",
  157. //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
  158. "user-agent": "grpc-swift-nio",
  159. // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
  160. "host": host,
  161. GRPCHeaderName.acceptEncoding: CompressionMechanism.acceptEncodingHeader,
  162. ]
  163. if callOptions.timeout != .infinite {
  164. headers.add(name: GRPCHeaderName.timeout, value: String(describing: callOptions.timeout))
  165. }
  166. headers.add(contentsOf: callOptions.customMetadata)
  167. if let headerName = callOptions.requestIDHeader {
  168. headers.add(name: headerName, value: requestID)
  169. }
  170. let method: HTTPMethod = callOptions.cacheable ? .GET : .POST
  171. return HTTPRequestHead(version: .init(major: 2, minor: 0), method: method, uri: path, headers: headers)
  172. }