2
0

BaseClientCall.swift 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework
  22. /// users.
  23. ///
  24. /// Setup includes:
  25. /// - creation of an HTTP/2 stream for the call to execute on,
  26. /// - configuration of the NIO channel handlers for the stream, and
  27. /// - setting a call timeout, if one is provided.
  28. ///
  29. /// This class also provides much of the framework user facing functionality via conformance to `ClientCall`.
  30. open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
  31. /// The underlying `GRPCClient` providing the HTTP/2 channel and multiplexer.
  32. internal let client: GRPCClient
  33. /// Promise for an HTTP/2 stream to execute the call on.
  34. internal let streamPromise: EventLoopPromise<Channel>
  35. /// Client channel handler. Handles internal state for reading/writing messages to the channel.
  36. /// The handler also owns the promises for the futures that this class surfaces to the user (such as
  37. /// `initialMetadata` and `status`).
  38. internal let clientChannelHandler: GRPCClientChannelHandler<RequestMessage, ResponseMessage>
  39. /// Sets up a gRPC call.
  40. ///
  41. /// A number of actions are performed:
  42. /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`,
  43. /// - a callback is registered on the new stream (`subchannel`) to send the request head,
  44. /// - a timeout is scheduled if one is set in the `callOptions`.
  45. ///
  46. /// - Parameters:
  47. /// - client: client containing the HTTP/2 channel and multiplexer to use for this call.
  48. /// - path: path for this RPC method.
  49. /// - callOptions: options to use when configuring this call.
  50. /// - responseObserver: observer for received messages.
  51. init(
  52. client: GRPCClient,
  53. path: String,
  54. callOptions: CallOptions,
  55. responseObserver: ResponseObserver<ResponseMessage>
  56. ) {
  57. self.client = client
  58. self.streamPromise = client.channel.eventLoop.makePromise()
  59. self.clientChannelHandler = GRPCClientChannelHandler(
  60. initialMetadataPromise: client.channel.eventLoop.makePromise(),
  61. statusPromise: client.channel.eventLoop.makePromise(),
  62. responseObserver: responseObserver)
  63. self.createStreamChannel()
  64. self.setTimeout(callOptions.timeout)
  65. }
  66. }
  67. extension BaseClientCall: ClientCall {
  68. public var subchannel: EventLoopFuture<Channel> {
  69. return self.streamPromise.futureResult
  70. }
  71. public var initialMetadata: EventLoopFuture<HTTPHeaders> {
  72. return self.clientChannelHandler.initialMetadataPromise.futureResult
  73. }
  74. public var status: EventLoopFuture<GRPCStatus> {
  75. return self.clientChannelHandler.statusPromise.futureResult
  76. }
  77. // Workaround for: https://bugs.swift.org/browse/SR-10128
  78. // Once resolved this can become a default implementation on `ClientCall`.
  79. public var trailingMetadata: EventLoopFuture<HTTPHeaders> {
  80. return status.map { $0.trailingMetadata }
  81. }
  82. public func cancel() {
  83. self.client.channel.eventLoop.execute {
  84. self.subchannel.whenSuccess { channel in
  85. channel.close(mode: .all, promise: nil)
  86. }
  87. }
  88. }
  89. }
  90. extension BaseClientCall {
  91. /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created.
  92. ///
  93. /// - Important: This should only ever be called once.
  94. private func createStreamChannel() {
  95. self.client.channel.eventLoop.execute {
  96. self.client.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
  97. subchannel.pipeline.addHandlers(HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.client.httpProtocol),
  98. HTTP1ToRawGRPCClientCodec(),
  99. GRPCClientCodec<RequestMessage, ResponseMessage>(),
  100. self.clientChannelHandler)
  101. }
  102. }
  103. }
  104. /// Send the request head once `subchannel` becomes available.
  105. ///
  106. /// - Important: This should only ever be called once.
  107. ///
  108. /// - Parameters:
  109. /// - requestHead: The request head to send.
  110. /// - promise: A promise to fulfill once the request head has been sent.
  111. internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise<Void>?) {
  112. self.subchannel.whenSuccess { channel in
  113. channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.head(requestHead), promise: promise)
  114. }
  115. }
  116. /// Send the request head once `subchannel` becomes available.
  117. ///
  118. /// - Important: This should only ever be called once.
  119. ///
  120. /// - Parameter requestHead: The request head to send.
  121. /// - Returns: A future which will be succeeded once the request head has been sent.
  122. internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture<Void> {
  123. let promise = client.channel.eventLoop.makePromise(of: Void.self)
  124. self.sendHead(requestHead, promise: promise)
  125. return promise.futureResult
  126. }
  127. /// Send the given message once `subchannel` becomes available.
  128. ///
  129. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  130. /// - Parameters:
  131. /// - message: The message to send.
  132. /// - promise: A promise to fulfil when the message reaches the network.
  133. internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?) {
  134. self.subchannel.whenSuccess { channel in
  135. channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.message(message), promise: promise)
  136. }
  137. }
  138. /// Send the given message once `subchannel` becomes available.
  139. ///
  140. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  141. /// - Returns: A future which will be fullfilled when the message reaches the network.
  142. internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture<Void> {
  143. let promise = client.channel.eventLoop.makePromise(of: Void.self)
  144. self._sendMessage(message, promise: promise)
  145. return promise.futureResult
  146. }
  147. /// Send `end` once `subchannel` becomes available.
  148. ///
  149. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  150. /// - Important: This should only ever be called once.
  151. /// - Parameter promise: A promise to succeed once then end has been sent.
  152. internal func _sendEnd(promise: EventLoopPromise<Void>?) {
  153. self.subchannel.whenSuccess { channel in
  154. channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.end, promise: promise)
  155. }
  156. }
  157. /// Send `end` once `subchannel` becomes available.
  158. ///
  159. /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name.
  160. /// - Important: This should only ever be called once.
  161. ///- Returns: A future which will be succeeded once the end has been sent.
  162. internal func _sendEnd() -> EventLoopFuture<Void> {
  163. let promise = client.channel.eventLoop.makePromise(of: Void.self)
  164. self._sendEnd(promise: promise)
  165. return promise.futureResult
  166. }
  167. /// Creates a client-side timeout for this call.
  168. ///
  169. /// - Important: This should only ever be called once.
  170. private func setTimeout(_ timeout: GRPCTimeout) {
  171. if timeout == .infinite { return }
  172. self.client.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  173. self?.clientChannelHandler.observeError(.client(.deadlineExceeded(timeout)))
  174. }
  175. }
  176. /// Makes a new `HTTPRequestHead` for a call with this signature.
  177. ///
  178. /// - Parameters:
  179. /// - path: path for this RPC method.
  180. /// - host: the address of the host we are connected to.
  181. /// - callOptions: options to use when configuring this call.
  182. /// - Returns: `HTTPRequestHead` configured for this call.
  183. internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead {
  184. var requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: path)
  185. callOptions.customMetadata.forEach { name, value in
  186. requestHead.headers.add(name: name, value: value)
  187. }
  188. // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority".
  189. requestHead.headers.add(name: "host", value: host)
  190. requestHead.headers.add(name: "content-type", value: "application/grpc")
  191. // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  192. requestHead.headers.add(name: "te", value: "trailers")
  193. //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents
  194. requestHead.headers.add(name: "user-agent", value: "grpc-swift-nio")
  195. requestHead.headers.add(name: "grpc-accept-encoding", value: CompressionMechanism.acceptEncodingHeader)
  196. if callOptions.timeout != .infinite {
  197. requestHead.headers.add(name: "grpc-timeout", value: String(describing: callOptions.timeout))
  198. }
  199. return requestHead
  200. }
  201. }