BaseClientCall.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework
  22. /// users.
  23. ///
  24. /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
  25. /// channel will be configured as such:
  26. ///
  27. /// ┌───────────────────────────┐
  28. /// │ GRPCClientChannelHandler │
  29. /// └─▲───────────────────────┬─┘
  30. /// GRPCClientResponsePart<T1>│ │GRPCClientRequestPart<T2>
  31. /// ┌─┴───────────────────────▼─┐
  32. /// │ GRPCClientCodec │
  33. /// └─▲───────────────────────┬─┘
  34. /// RawGRPCClientResponsePart│ │RawGRPCClientRequestPart
  35. /// ┌─┴───────────────────────▼─┐
  36. /// │ HTTP1ToRawGRPCClientCodec │
  37. /// └─▲───────────────────────┬─┘
  38. /// HTTPClientResponsePart│ │HTTPClientRequestPart
  39. /// ┌─┴───────────────────────▼─┐
  40. /// │ HTTP2ToHTTP1ClientCodec │
  41. /// └─▲───────────────────────┬─┘
  42. /// HTTP2Frame│ │HTTP2Frame
  43. /// | |
  44. ///
  45. /// Note: below the `HTTP2ToHTTP1ClientCodec` is the "main" pipeline provided by the channel in
  46. /// `GRPCClientConnection`.
  47. ///
  48. /// Setup includes:
  49. /// - creation of an HTTP/2 stream for the call to execute on,
  50. /// - configuration of the NIO channel handlers for the stream, and
  51. /// - setting a call timeout, if one is provided.
  52. ///
  53. /// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
  54. open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
  55. /// The underlying `GRPCClientConnection` providing the HTTP/2 channel and multiplexer.
  56. internal let connection: GRPCClientConnection
  57. /// Promise for an HTTP/2 stream to execute the call on.
  58. internal let streamPromise: EventLoopPromise<Channel>
  59. /// Client channel handler. Handles internal state for reading/writing messages to the channel.
  60. /// The handler also owns the promises for the futures that this class surfaces to the user (such as
  61. /// `initialMetadata` and `status`).
  62. internal let clientChannelHandler: GRPCClientChannelHandler<RequestMessage, ResponseMessage>
  63. /// Sets up a gRPC call.
  64. ///
  65. /// A number of actions are performed:
  66. /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`,
  67. /// - a callback is registered on the new stream (`subchannel`) to send the request head,
  68. /// - a timeout is scheduled if one is set in the `callOptions`.
  69. ///
  70. /// - Parameters:
  71. /// - connection: connection containing the HTTP/2 channel and multiplexer to use for this call.
  72. /// - path: path for this RPC method.
  73. /// - callOptions: options to use when configuring this call.
  74. /// - responseObserver: observer for received messages.
  75. init(
  76. connection: GRPCClientConnection,
  77. path: String,
  78. callOptions: CallOptions,
  79. responseObserver: ResponseObserver<ResponseMessage>,
  80. errorDelegate: ClientErrorDelegate?
  81. ) {
  82. self.connection = connection
  83. self.streamPromise = connection.channel.eventLoop.makePromise()
  84. self.clientChannelHandler = GRPCClientChannelHandler(
  85. initialMetadataPromise: connection.channel.eventLoop.makePromise(),
  86. statusPromise: connection.channel.eventLoop.makePromise(),
  87. responseObserver: responseObserver,
  88. errorDelegate: errorDelegate)
  89. self.streamPromise.futureResult.whenFailure { error in
  90. self.clientChannelHandler.observeError(.unknown(error, origin: .client))
  91. }
  92. self.createStreamChannel()
  93. self.setTimeout(callOptions.timeout)
  94. }
  95. }
  96. extension BaseClientCall: ClientCall {
  97. public var subchannel: EventLoopFuture<Channel> {
  98. return self.streamPromise.futureResult
  99. }
  100. public var initialMetadata: EventLoopFuture<HTTPHeaders> {
  101. return self.clientChannelHandler.initialMetadataPromise.futureResult
  102. }
  103. public var status: EventLoopFuture<GRPCStatus> {
  104. return self.clientChannelHandler.statusPromise.futureResult
  105. }
  106. // Workaround for: https://bugs.swift.org/browse/SR-10128
  107. // Once resolved this can become a default implementation on `ClientCall`.
  108. public var trailingMetadata: EventLoopFuture<HTTPHeaders> {
  109. return status.map { $0.trailingMetadata }
  110. }
  111. public func cancel() {
  112. self.connection.channel.eventLoop.execute {
  113. self.subchannel.whenSuccess { channel in
  114. channel.pipeline.fireUserInboundEventTriggered(GRPCClientUserEvent.cancelled)
  115. }
  116. }
  117. }
  118. }
  119. extension BaseClientCall {
  120. /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created.
  121. ///
  122. /// - Important: This should only ever be called once.
  123. private func createStreamChannel() {
  124. self.connection.channel.eventLoop.execute {
  125. self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
  126. subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.httpProtocol),
  127. HTTP1ToRawGRPCClientCodec(),
  128. GRPCClientCodec<RequestMessage, ResponseMessage>(),
  129. self.clientChannelHandler)
  130. }
  131. }
  132. }
  133. /// Send the request head once `subchannel` becomes available.
  134. ///
  135. /// - Important: This should only ever be called once.
  136. ///
  137. /// - Parameters:
  138. /// - requestHead: The request head to send.
  139. /// - promise: A promise to fulfill once the request head has been sent.
  140. internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise<Void>?) {
  141. self.writeAndFlushOnStream(.head(requestHead), promise: promise)
  142. }
  143. /// Send the request head once `subchannel` becomes available.
  144. ///
  145. /// - Important: This should only ever be called once.
  146. ///
  147. /// - Parameter requestHead: The request head to send.
  148. /// - Returns: A future which will be succeeded once the request head has been sent.
  149. internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
  150. let promise = connection.channel.eventLoop.makePromise(of: Void.self)
  151. self.sendHead(requestHead, promise: promise)
  152. return promise.futureResult
  153. }
  154. /// Send the given message once `subchannel` becomes available.
  155. ///
  156. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  157. /// - Parameters:
  158. /// - message: The message to send.
  159. /// - promise: A promise to fulfil when the message reaches the network.
  160. internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
  161. self.writeAndFlushOnStream(.message(message), promise: promise)
  162. }
  163. /// Send the given message once `subchannel` becomes available.
  164. ///
  165. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  166. /// - Returns: A future which will be fullfilled when the message reaches the network.
  167. internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
  168. let promise = connection.channel.eventLoop.makePromise(of: Void.self)
  169. self._sendMessage(message, promise: promise)
  170. return promise.futureResult
  171. }
  172. /// Send `end` once `subchannel` becomes available.
  173. ///
  174. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  175. /// - Important: This should only ever be called once.
  176. /// - Parameter promise: A promise to succeed once then end has been sent.
  177. internal func _sendEnd(promise: EventLoopPromise<Void>?) {
  178. self.writeAndFlushOnStream(.end, promise: promise)
  179. }
  180. /// Send `end` once `subchannel` becomes available.
  181. ///
  182. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  183. /// - Important: This should only ever be called once.
  184. ///- Returns: A future which will be succeeded once the end has been sent.
  185. internal func _sendEnd() -> EventLoopFuture<Void> {
  186. let promise = connection.channel.eventLoop.makePromise(of: Void.self)
  187. self._sendEnd(promise: promise)
  188. return promise.futureResult
  189. }
  190. /// Writes the given request on the future `Channel` for the HTTP/2 stream used to make this call.
  191. ///
  192. /// This method is intended to be used by the `sendX` methods in order to ensure that they fail
  193. /// futures associated with this call should the write fail (e.g. due to a closed connection).
  194. private func writeAndFlushOnStream(_ request: GRPCClientRequestPart<RequestMessage>, promise: EventLoopPromise<Void>?) {
  195. // We need to use a promise here; if the write fails then it _must_ be observed by the handler
  196. // to ensure that any futures given to the user are fulfilled.
  197. let promise = promise ?? self.connection.channel.eventLoop.makePromise()
  198. promise.futureResult.whenFailure { error in
  199. self.clientChannelHandler.observeError(.unknown(error, origin: .client))
  200. }
  201. self.subchannel.cascadeFailure(to: promise)
  202. self.subchannel.whenSuccess { channel in
  203. channel.writeAndFlush(NIOAny(request), promise: promise)
  204. }
  205. }
  206. /// Creates a client-side timeout for this call.
  207. ///
  208. /// - Important: This should only ever be called once.
  209. private func setTimeout(_ timeout: GRPCTimeout) {
  210. if timeout == .infinite { return }
  211. self.connection.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  212. self?.subchannel.whenSuccess { stream in
  213. stream.pipeline.fireUserInboundEventTriggered(GRPCClientUserEvent.timedOut(timeout))
  214. }
  215. }
  216. }
  217. /// Makes a new `HTTPRequestHead` for a call with this signature.
  218. ///
  219. /// - Parameters:
  220. /// - path: path for this RPC method.
  221. /// - host: the address of the host we are connected to.
  222. /// - callOptions: options to use when configuring this call.
  223. /// - Returns: `HTTPRequestHead` configured for this call.
  224. internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
  225. let method: HTTPMethod = callOptions.cacheable ? .GET : .POST
  226. var requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: method, uri: path)
  227. callOptions.customMetadata.forEach { name, value in
  228. requestHead.headers.add(name: name, value: value)
  229. }
  230. // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
  231. requestHead.headers.add(name: "host", value: host)
  232. requestHead.headers.add(name: "content-type", value: "application/grpc")
  233. // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  234. requestHead.headers.add(name: "te", value: "trailers")
  235. //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
  236. requestHead.headers.add(name: "user-agent", value: "grpc-swift-nio")
  237. requestHead.headers.add(name: GRPCHeaderName.acceptEncoding, value: CompressionMechanism.acceptEncodingHeader)
  238. if callOptions.timeout != .infinite {
  239. requestHead.headers.add(name: GRPCHeaderName.timeout, value: String(describing: callOptions.timeout))
  240. }
  241. return requestHead
  242. }
  243. }