ClientResponseChannelHandler.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// A base channel handler for receiving responses.
  22. ///
  23. /// This includes hold promises for the initial metadata and status of the gRPC call. This handler
  24. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  25. /// aforementioned promises.
  26. internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
  27. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  28. internal let logger: Logger
  29. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  30. internal let statusPromise: EventLoopPromise<GRPCStatus>
  31. internal let timeout: GRPCTimeout
  32. internal var timeoutTask: Scheduled<Void>?
  33. internal let errorDelegate: ClientErrorDelegate?
  34. internal var context: ChannelHandlerContext?
  35. internal enum InboundState {
  36. case expectingHeadersOrStatus
  37. case expectingMessageOrStatus
  38. case expectingStatus
  39. case ignore
  40. var expectingStatus: Bool {
  41. switch self {
  42. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  43. return true
  44. case .ignore:
  45. return false
  46. }
  47. }
  48. }
  49. /// The arity of a response.
  50. internal enum ResponseArity {
  51. case one
  52. case many
  53. /// The inbound state after receiving a response.
  54. var inboundStateAfterResponse: InboundState {
  55. switch self {
  56. case .one:
  57. return .expectingStatus
  58. case .many:
  59. return .expectingMessageOrStatus
  60. }
  61. }
  62. }
  63. private let responseArity: ResponseArity
  64. private var inboundState: InboundState = .expectingHeadersOrStatus {
  65. didSet {
  66. self.logger.debug("inbound state changed from \(oldValue) to \(self.inboundState)")
  67. }
  68. }
  69. /// Creates a new `ClientResponseChannelHandler`.
  70. ///
  71. /// - Parameters:
  72. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  73. /// - statusPromise: A promise to succeed with the outcome of the call.
  74. /// - errorDelegate: An error delegate to call when errors are observed.
  75. /// - timeout: The call timeout specified by the user.
  76. /// - expectedResponses: The number of responses expected.
  77. public init(
  78. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  79. statusPromise: EventLoopPromise<GRPCStatus>,
  80. errorDelegate: ClientErrorDelegate?,
  81. timeout: GRPCTimeout,
  82. expectedResponses: ResponseArity,
  83. logger: Logger
  84. ) {
  85. self.initialMetadataPromise = initialMetadataPromise
  86. self.statusPromise = statusPromise
  87. self.errorDelegate = errorDelegate
  88. self.timeout = timeout
  89. self.responseArity = expectedResponses
  90. self.logger = logger
  91. }
  92. /// Observe the given status.
  93. ///
  94. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  95. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  96. /// are failed with the given status.
  97. ///
  98. /// - Parameter status: the status to observe.
  99. internal func observeStatus(_ status: GRPCStatus) {
  100. if status.code != .ok {
  101. self.initialMetadataPromise.fail(status)
  102. }
  103. self.statusPromise.succeed(status)
  104. self.timeoutTask?.cancel()
  105. self.context = nil
  106. }
  107. /// Observe the given error.
  108. ///
  109. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  110. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  111. /// the given error (see `observeStatus(_:)`).
  112. ///
  113. /// - Parameter error: the error to observe.
  114. internal func observeError(_ error: GRPCError) {
  115. self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
  116. self.observeStatus(error.asGRPCStatus())
  117. }
  118. /// Called when a response is received. Subclasses should override this method.
  119. ///
  120. /// - Parameter response: The received response.
  121. internal func onResponse(_ response: _Box<ResponseMessage>) {
  122. // no-op
  123. }
  124. public func handlerAdded(context: ChannelHandlerContext) {
  125. // We need to hold the context in case we timeout and need to close the pipeline.
  126. self.context = context
  127. }
  128. /// Reads inbound data.
  129. ///
  130. /// On receipt of:
  131. /// - headers: the initial metadata promise is succeeded.
  132. /// - message: `onResponse(_:)` is called with the received message.
  133. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  134. /// and response promise (if available) are failed with the status.
  135. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  136. guard self.inboundState != .ignore else {
  137. self.logger.notice("ignoring read data: \(data)")
  138. return
  139. }
  140. switch self.unwrapInboundIn(data) {
  141. case .headers(let headers):
  142. guard self.inboundState == .expectingHeadersOrStatus else {
  143. self.logger.error("invalid state '\(self.inboundState)' while processing headers")
  144. self.errorCaught(
  145. context: context,
  146. error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)"))
  147. )
  148. return
  149. }
  150. self.logger.info("received response headers: \(headers)")
  151. self.initialMetadataPromise.succeed(headers)
  152. self.inboundState = .expectingMessageOrStatus
  153. case .message(let boxedMessage):
  154. guard self.inboundState == .expectingMessageOrStatus else {
  155. self.logger.error("invalid state '\(self.inboundState)' while processing message")
  156. self.errorCaught(
  157. context: context,
  158. error: GRPCError.client(.responseCardinalityViolation)
  159. )
  160. return
  161. }
  162. self.logger.info("received response message", metadata: [
  163. MetadataKey.responseType: "\(ResponseMessage.self)"
  164. ])
  165. self.onResponse(boxedMessage)
  166. self.inboundState = self.responseArity.inboundStateAfterResponse
  167. case .status(let status):
  168. guard self.inboundState.expectingStatus else {
  169. self.logger.error("invalid state '\(self.inboundState)' while processing status")
  170. self.errorCaught(
  171. context: context,
  172. error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)"))
  173. )
  174. return
  175. }
  176. self.logger.info("received response status: \(status.code)")
  177. self.observeStatus(status)
  178. // We don't expect any more requests/responses beyond this point and we don't need to close
  179. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  180. }
  181. }
  182. public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  183. if let clientUserEvent = event as? GRPCClientUserEvent {
  184. switch clientUserEvent {
  185. case .cancelled:
  186. // We shouldn't observe an error since this event is triggered by the user: just observe the
  187. // status.
  188. self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus())
  189. context.close(promise: nil)
  190. }
  191. }
  192. }
  193. public func channelInactive(context: ChannelHandlerContext) {
  194. self.inboundState = .ignore
  195. self.observeStatus(.init(code: .unavailable, message: nil))
  196. context.fireChannelInactive()
  197. }
  198. /// Observe an error from the pipeline and close the channel.
  199. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  200. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  201. context.close(mode: .all, promise: nil)
  202. }
  203. /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
  204. /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
  205. internal func scheduleTimeout(eventLoop: EventLoop) {
  206. guard self.timeout != .infinite else {
  207. return
  208. }
  209. let timeout = self.timeout
  210. self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  211. self?.performTimeout(error: .client(.deadlineExceeded(timeout)))
  212. }
  213. }
  214. /// Called when this call times out. Any promises which have not been fulfilled will be timed out
  215. /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
  216. /// its channel is closed.
  217. ///
  218. /// - Parameter error: The error to fail any promises with.
  219. internal func performTimeout(error: GRPCError) {
  220. self.observeError(error)
  221. self.context?.close(mode: .all, promise: nil)
  222. self.context = nil
  223. }
  224. }
  225. /// A channel handler for client calls which recieve a single response.
  226. final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: ClientResponseChannelHandler<ResponseMessage> {
  227. let responsePromise: EventLoopPromise<ResponseMessage>
  228. internal init(
  229. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  230. responsePromise: EventLoopPromise<ResponseMessage>,
  231. statusPromise: EventLoopPromise<GRPCStatus>,
  232. errorDelegate: ClientErrorDelegate?,
  233. timeout: GRPCTimeout,
  234. logger: Logger
  235. ) {
  236. self.responsePromise = responsePromise
  237. super.init(
  238. initialMetadataPromise: initialMetadataPromise,
  239. statusPromise: statusPromise,
  240. errorDelegate: errorDelegate,
  241. timeout: timeout,
  242. expectedResponses: .one,
  243. logger: logger.addingMetadata(
  244. key: MetadataKey.channelHandler,
  245. value: "GRPCClientUnaryResponseChannelHandler"
  246. )
  247. )
  248. }
  249. /// Succeeds the response promise with the given response.
  250. ///
  251. /// - Parameter response: The response received from the service.
  252. override func onResponse(_ response: _Box<ResponseMessage>) {
  253. self.responsePromise.succeed(response.value)
  254. }
  255. /// Fails the response promise if the given status is not `.ok`.
  256. override func observeStatus(_ status: GRPCStatus) {
  257. super.observeStatus(status)
  258. if status.code != .ok {
  259. self.responsePromise.fail(status)
  260. }
  261. }
  262. }
  263. /// A channel handler for client calls which recieve a stream of responses.
  264. final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: ClientResponseChannelHandler<ResponseMessage> {
  265. typealias ResponseHandler = (ResponseMessage) -> Void
  266. let responseHandler: ResponseHandler
  267. internal init(
  268. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  269. statusPromise: EventLoopPromise<GRPCStatus>,
  270. errorDelegate: ClientErrorDelegate?,
  271. timeout: GRPCTimeout,
  272. logger: Logger,
  273. responseHandler: @escaping ResponseHandler
  274. ) {
  275. self.responseHandler = responseHandler
  276. super.init(
  277. initialMetadataPromise: initialMetadataPromise,
  278. statusPromise: statusPromise,
  279. errorDelegate: errorDelegate,
  280. timeout: timeout,
  281. expectedResponses: .many,
  282. logger: logger.addingMetadata(
  283. key: MetadataKey.channelHandler,
  284. value: "GRPCClientStreamingResponseChannelHandler"
  285. )
  286. )
  287. }
  288. /// Calls a user-provided handler with the given response.
  289. ///
  290. /// - Parameter response: The response received from the service.
  291. override func onResponse(_ response: _Box<ResponseMessage>) {
  292. self.responseHandler(response.value)
  293. }
  294. }
  295. /// Client user events.
  296. public enum GRPCClientUserEvent {
  297. /// The call has been cancelled.
  298. case cancelled
  299. }