BaseClientCall.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework
  22. /// users.
  23. ///
  24. /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
  25. /// channel will be configured as such:
  26. ///
  27. /// ┌───────────────────────────┐
  28. /// │ GRPCClientChannelHandler │
  29. /// └─▲───────────────────────┬─┘
  30. /// GRPCClientResponsePart<T1>│ │GRPCClientRequestPart<T2>
  31. /// ┌─┴───────────────────────▼─┐
  32. /// │ GRPCClientCodec │
  33. /// └─▲───────────────────────┬─┘
  34. /// RawGRPCClientResponsePart│ │RawGRPCClientRequestPart
  35. /// ┌─┴───────────────────────▼─┐
  36. /// │ HTTP1ToRawGRPCClientCodec │
  37. /// └─▲───────────────────────┬─┘
  38. /// HTTPClientResponsePart│ │HTTPClientRequestPart
  39. /// ┌─┴───────────────────────▼─┐
  40. /// │ HTTP2ToHTTP1ClientCodec │
  41. /// └─▲───────────────────────┬─┘
  42. /// HTTP2Frame│ │HTTP2Frame
  43. /// | |
  44. ///
  45. /// Note: below the `HTTP2ToHTTP1ClientCodec` is the "main" pipeline provided by the channel in
  46. /// `GRPCClientConnection`.
  47. ///
  48. /// Setup includes:
  49. /// - creation of an HTTP/2 stream for the call to execute on,
  50. /// - configuration of the NIO channel handlers for the stream, and
  51. /// - setting a call timeout, if one is provided.
  52. ///
  53. /// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
  54. open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
  55. /// The underlying `GRPCClientConnection` providing the HTTP/2 channel and multiplexer.
  56. internal let connection: GRPCClientConnection
  57. /// Promise for an HTTP/2 stream to execute the call on.
  58. internal let streamPromise: EventLoopPromise<Channel>
  59. /// Client channel handler. Handles internal state for reading/writing messages to the channel.
  60. /// The handler also owns the promises for the futures that this class surfaces to the user (such as
  61. /// `initialMetadata` and `status`).
  62. internal let clientChannelHandler: GRPCClientChannelHandler<RequestMessage, ResponseMessage>
  63. /// Sets up a gRPC call.
  64. ///
  65. /// A number of actions are performed:
  66. /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`,
  67. /// - a callback is registered on the new stream (`subchannel`) to send the request head,
  68. /// - a timeout is scheduled if one is set in the `callOptions`.
  69. ///
  70. /// - Parameters:
  71. /// - connection: connection containing the HTTP/2 channel and multiplexer to use for this call.
  72. /// - path: path for this RPC method.
  73. /// - callOptions: options to use when configuring this call.
  74. /// - responseObserver: observer for received messages.
  75. init(
  76. connection: GRPCClientConnection,
  77. path: String,
  78. callOptions: CallOptions,
  79. responseObserver: ResponseObserver<ResponseMessage>
  80. ) {
  81. self.connection = connection
  82. self.streamPromise = connection.channel.eventLoop.makePromise()
  83. self.clientChannelHandler = GRPCClientChannelHandler(
  84. initialMetadataPromise: connection.channel.eventLoop.makePromise(),
  85. statusPromise: connection.channel.eventLoop.makePromise(),
  86. responseObserver: responseObserver)
  87. self.streamPromise.futureResult.whenFailure { error in
  88. self.clientChannelHandler.observeError(error)
  89. }
  90. self.createStreamChannel()
  91. self.setTimeout(callOptions.timeout)
  92. }
  93. }
  94. extension BaseClientCall: ClientCall {
  95. public var subchannel: EventLoopFuture<Channel> {
  96. return self.streamPromise.futureResult
  97. }
  98. public var initialMetadata: EventLoopFuture<HTTPHeaders> {
  99. return self.clientChannelHandler.initialMetadataPromise.futureResult
  100. }
  101. public var status: EventLoopFuture<GRPCStatus> {
  102. return self.clientChannelHandler.statusPromise.futureResult
  103. }
  104. // Workaround for: https://bugs.swift.org/browse/SR-10128
  105. // Once resolved this can become a default implementation on `ClientCall`.
  106. public var trailingMetadata: EventLoopFuture<HTTPHeaders> {
  107. return status.map { $0.trailingMetadata }
  108. }
  109. public func cancel() {
  110. self.connection.channel.eventLoop.execute {
  111. self.subchannel.whenSuccess { channel in
  112. channel.close(mode: .all, promise: nil)
  113. }
  114. }
  115. }
  116. }
  117. extension BaseClientCall {
  118. /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created.
  119. ///
  120. /// - Important: This should only ever be called once.
  121. private func createStreamChannel() {
  122. self.connection.channel.eventLoop.execute {
  123. self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
  124. subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.httpProtocol),
  125. HTTP1ToRawGRPCClientCodec(),
  126. GRPCClientCodec<RequestMessage, ResponseMessage>(),
  127. self.clientChannelHandler)
  128. }
  129. }
  130. }
  131. /// Send the request head once `subchannel` becomes available.
  132. ///
  133. /// - Important: This should only ever be called once.
  134. ///
  135. /// - Parameters:
  136. /// - requestHead: The request head to send.
  137. /// - promise: A promise to fulfill once the request head has been sent.
  138. internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise<Void>?) {
  139. self.writeAndFlushOnStream(.head(requestHead), promise: promise)
  140. }
  141. /// Send the request head once `subchannel` becomes available.
  142. ///
  143. /// - Important: This should only ever be called once.
  144. ///
  145. /// - Parameter requestHead: The request head to send.
  146. /// - Returns: A future which will be succeeded once the request head has been sent.
  147. internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
  148. let promise = connection.channel.eventLoop.makePromise(of: Void.self)
  149. self.sendHead(requestHead, promise: promise)
  150. return promise.futureResult
  151. }
  152. /// Send the given message once `subchannel` becomes available.
  153. ///
  154. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  155. /// - Parameters:
  156. /// - message: The message to send.
  157. /// - promise: A promise to fulfil when the message reaches the network.
  158. internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
  159. self.writeAndFlushOnStream(.message(message), promise: promise)
  160. }
  161. /// Send the given message once `subchannel` becomes available.
  162. ///
  163. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  164. /// - Returns: A future which will be fullfilled when the message reaches the network.
  165. internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
  166. let promise = connection.channel.eventLoop.makePromise(of: Void.self)
  167. self._sendMessage(message, promise: promise)
  168. return promise.futureResult
  169. }
  170. /// Send `end` once `subchannel` becomes available.
  171. ///
  172. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  173. /// - Important: This should only ever be called once.
  174. /// - Parameter promise: A promise to succeed once then end has been sent.
  175. internal func _sendEnd(promise: EventLoopPromise<Void>?) {
  176. self.writeAndFlushOnStream(.end, promise: promise)
  177. }
  178. /// Send `end` once `subchannel` becomes available.
  179. ///
  180. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  181. /// - Important: This should only ever be called once.
  182. ///- Returns: A future which will be succeeded once the end has been sent.
  183. internal func _sendEnd() -> EventLoopFuture<Void> {
  184. let promise = connection.channel.eventLoop.makePromise(of: Void.self)
  185. self._sendEnd(promise: promise)
  186. return promise.futureResult
  187. }
  188. /// Writes the given request on the future `Channel` for the HTTP/2 stream used to make this call.
  189. ///
  190. /// This method is intended to be used by the `sendX` methods in order to ensure that they fail
  191. /// futures associated with this call should the write fail (e.g. due to a closed connection).
  192. private func writeAndFlushOnStream(_ request: GRPCClientRequestPart<RequestMessage>, promise: EventLoopPromise<Void>?) {
  193. // We need to use a promise here; if the write fails then it _must_ be observed by the handler
  194. // to ensure that any futures given to the user are fulfilled.
  195. let promise = promise ?? self.connection.channel.eventLoop.makePromise()
  196. promise.futureResult.whenFailure { error in
  197. self.clientChannelHandler.observeError(error)
  198. }
  199. self.subchannel.cascadeFailure(to: promise)
  200. self.subchannel.whenSuccess { channel in
  201. channel.writeAndFlush(NIOAny(request), promise: promise)
  202. }
  203. }
  204. /// Creates a client-side timeout for this call.
  205. ///
  206. /// - Important: This should only ever be called once.
  207. private func setTimeout(_ timeout: GRPCTimeout) {
  208. if timeout == .infinite { return }
  209. self.connection.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  210. self?.clientChannelHandler.observeError(GRPCError.client(.deadlineExceeded(timeout)))
  211. }
  212. }
  213. /// Makes a new `HTTPRequestHead` for a call with this signature.
  214. ///
  215. /// - Parameters:
  216. /// - path: path for this RPC method.
  217. /// - host: the address of the host we are connected to.
  218. /// - callOptions: options to use when configuring this call.
  219. /// - Returns: `HTTPRequestHead` configured for this call.
  220. internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
  221. let method: HTTPMethod = callOptions.cacheable ? .GET : .POST
  222. var requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: method, uri: path)
  223. callOptions.customMetadata.forEach { name, value in
  224. requestHead.headers.add(name: name, value: value)
  225. }
  226. // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
  227. requestHead.headers.add(name: "host", value: host)
  228. requestHead.headers.add(name: "content-type", value: "application/grpc")
  229. // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  230. requestHead.headers.add(name: "te", value: "trailers")
  231. //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
  232. requestHead.headers.add(name: "user-agent", value: "grpc-swift-nio")
  233. requestHead.headers.add(name: GRPCHeaderName.acceptEncoding, value: CompressionMechanism.acceptEncodingHeader)
  234. if callOptions.timeout != .infinite {
  235. requestHead.headers.add(name: GRPCHeaderName.timeout, value: String(describing: callOptions.timeout))
  236. }
  237. return requestHead
  238. }
  239. }