BaseClientCall.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework
  22. /// users.
  23. ///
  24. /// Setup includes:
  25. /// - creation of an HTTP/2 stream for the call to execute on,
  26. /// - configuration of the NIO channel handlers for the stream, and
  27. /// - setting a call timeout, if one is provided.
  28. ///
  29. /// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
  30. open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
  31. /// The underlying `GRPCClient` providing the HTTP/2 channel and multiplexer.
  32. internal let client: GRPCClient
  33. /// Promise for an HTTP/2 stream to execute the call on.
  34. internal let streamPromise: EventLoopPromise<Channel>
  35. /// Client channel handler. Handles internal state for reading/writing messages to the channel.
  36. /// The handler also owns the promises for the futures that this class surfaces to the user (such as
  37. /// `initialMetadata` and `status`).
  38. internal let clientChannelHandler: GRPCClientChannelHandler<RequestMessage, ResponseMessage>
  39. /// Sets up a gRPC call.
  40. ///
  41. /// A number of actions are performed:
  42. /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`,
  43. /// - a callback is registered on the new stream (`subchannel`) to send the request head,
  44. /// - a timeout is scheduled if one is set in the `callOptions`.
  45. ///
  46. /// - Parameters:
  47. /// - client: client containing the HTTP/2 channel and multiplexer to use for this call.
  48. /// - path: path for this RPC method.
  49. /// - callOptions: options to use when configuring this call.
  50. /// - responseObserver: observer for received messages.
  51. init(
  52. client: GRPCClient,
  53. path: String,
  54. callOptions: CallOptions,
  55. responseObserver: ResponseObserver<ResponseMessage>
  56. ) {
  57. self.client = client
  58. self.streamPromise = client.channel.eventLoop.newPromise()
  59. self.clientChannelHandler = GRPCClientChannelHandler(
  60. initialMetadataPromise: client.channel.eventLoop.newPromise(),
  61. statusPromise: client.channel.eventLoop.newPromise(),
  62. responseObserver: responseObserver)
  63. self.createStreamChannel()
  64. self.setTimeout(callOptions.timeout)
  65. }
  66. }
  67. extension BaseClientCall: ClientCall {
  68. public var subchannel: EventLoopFuture<Channel> {
  69. return self.streamPromise.futureResult
  70. }
  71. public var initialMetadata: EventLoopFuture<HTTPHeaders> {
  72. return self.clientChannelHandler.initialMetadataPromise.futureResult
  73. }
  74. public var status: EventLoopFuture<GRPCStatus> {
  75. return self.clientChannelHandler.statusPromise.futureResult
  76. }
  77. // Workaround for: https://bugs.swift.org/browse/SR-10128
  78. // TODO: Make this a default implementation on `ClientCall` when SR-10128 is resolved.
  79. public var trailingMetadata: EventLoopFuture<HTTPHeaders> {
  80. return status.map { $0.trailingMetadata }
  81. }
  82. public func cancel() {
  83. self.client.channel.eventLoop.execute {
  84. self.subchannel.whenSuccess { channel in
  85. channel.close(mode: .all, promise: nil)
  86. }
  87. }
  88. }
  89. }
  90. extension BaseClientCall {
  91. /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created.
  92. ///
  93. /// - Important: This should only ever be called once.
  94. private func createStreamChannel() {
  95. self.client.channel.eventLoop.execute {
  96. self.client.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
  97. subchannel.pipeline.addHandlers([HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: .http),
  98. HTTP1ToRawGRPCClientCodec(),
  99. GRPCClientCodec<RequestMessage, ResponseMessage>(),
  100. self.clientChannelHandler],
  101. first: false)
  102. }
  103. }
  104. }
  105. /// Send the request head once `subchannel` becomes available.
  106. ///
  107. /// - Important: This should only ever be called once.
  108. ///
  109. /// - Parameters:
  110. /// - requestHead: The request head to send.
  111. /// - promise: A promise to fulfill once the request head has been sent.
  112. internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise<Void>?) {
  113. // The nghttp2 implementation of NIOHTTP2 has a known defect where "promises on control frame
  114. // writes do not work and will be leaked. Promises on DATA frame writes work just fine and will
  115. // be fulfilled correctly." Succeed the promise here as a temporary workaround.
  116. //! TODO: remove this and pass the promise to `writeAndFlush` when NIOHTTP2 supports it.
  117. promise?.succeed(result: ())
  118. self.subchannel.whenSuccess { channel in
  119. channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.head(requestHead), promise: nil)
  120. }
  121. }
  122. /// Send the request head once `subchannel` becomes available.
  123. ///
  124. /// - Important: This should only ever be called once.
  125. ///
  126. /// - Parameter requestHead: The request head to send.
  127. /// - Returns: A future which will be succeeded once the request head has been sent.
  128. internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
  129. let promise = client.channel.eventLoop.newPromise(of: Void.self)
  130. self.sendHead(requestHead, promise: promise)
  131. return promise.futureResult
  132. }
  133. /// Send the given message once `subchannel` becomes available.
  134. ///
  135. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  136. /// - Parameters:
  137. /// - message: The message to send.
  138. /// - promise: A promise to fulfil when the message reaches the network.
  139. internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
  140. self.subchannel.whenSuccess { channel in
  141. channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.message(message), promise: promise)
  142. }
  143. }
  144. /// Send the given message once `subchannel` becomes available.
  145. ///
  146. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  147. /// - Returns: A future which will be fullfilled when the message reaches the network.
  148. internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
  149. let promise = client.channel.eventLoop.newPromise(of: Void.self)
  150. self._sendMessage(message, promise: promise)
  151. return promise.futureResult
  152. }
  153. /// Send `end` once `subchannel` becomes available.
  154. ///
  155. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  156. /// - Important: This should only ever be called once.
  157. /// - Parameter promise: A promise to succeed once then end has been sent.
  158. internal func _sendEnd(promise: EventLoopPromise<Void>?) {
  159. // The nghttp2 implementation of NIOHTTP2 has a known defect where "promises on control frame
  160. // writes do not work and will be leaked. Promises on DATA frame writes work just fine and will
  161. // be fulfilled correctly." Succeed the promise here as a temporary workaround.
  162. //! TODO: remove this and pass the promise to `writeAndFlush` when NIOHTTP2 supports it.
  163. promise?.succeed(result: ())
  164. self.subchannel.whenSuccess { channel in
  165. channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.end, promise: nil)
  166. }
  167. }
  168. /// Send `end` once `subchannel` becomes available.
  169. ///
  170. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  171. /// - Important: This should only ever be called once.
  172. ///- Returns: A future which will be succeeded once the end has been sent.
  173. internal func _sendEnd() -> EventLoopFuture<Void> {
  174. let promise = client.channel.eventLoop.newPromise(of: Void.self)
  175. self._sendEnd(promise: promise)
  176. return promise.futureResult
  177. }
  178. /// Creates a client-side timeout for this call.
  179. ///
  180. /// - Important: This should only ever be called once.
  181. private func setTimeout(_ timeout: GRPCTimeout) {
  182. if timeout == .infinite { return }
  183. self.client.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  184. self?.clientChannelHandler.observeError(.client(.deadlineExceeded(timeout)))
  185. }
  186. }
  187. /// Makes a new `HTTPRequestHead` for a call with this signature.
  188. ///
  189. /// - Parameters:
  190. /// - path: path for this RPC method.
  191. /// - host: the address of the host we are connected to.
  192. /// - callOptions: options to use when configuring this call.
  193. /// - Returns: `HTTPRequestHead` configured for this call.
  194. internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
  195. var requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: path)
  196. callOptions.customMetadata.forEach { name, value in
  197. requestHead.headers.add(name: name, value: value)
  198. }
  199. // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
  200. requestHead.headers.add(name: "host", value: host)
  201. requestHead.headers.add(name: "content-type", value: "application/grpc")
  202. // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  203. requestHead.headers.add(name: "te", value: "trailers")
  204. //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
  205. requestHead.headers.add(name: "user-agent", value: "grpc-swift-nio")
  206. requestHead.headers.add(name: "grpc-accept-encoding", value: CompressionMechanism.acceptEncodingHeader)
  207. if callOptions.timeout != .infinite {
  208. requestHead.headers.add(name: "grpc-timeout", value: String(describing: callOptions.timeout))
  209. }
  210. return requestHead
  211. }
  212. }