ClientResponseChannelHandler.swift 12 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// A base channel handler for receiving responses.
  22. ///
  23. /// This includes hold promises for the initial metadata and status of the gRPC call. This handler
  24. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  25. /// aforementioned promises.
  26. internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
  27. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  28. internal let logger: Logger
  29. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  30. internal let trailingMetadataPromise: EventLoopPromise<HTTPHeaders>
  31. internal let statusPromise: EventLoopPromise<GRPCStatus>
  32. internal let timeout: GRPCTimeout
  33. internal var timeoutTask: Scheduled<Void>?
  34. internal let errorDelegate: ClientErrorDelegate?
  35. internal var context: ChannelHandlerContext?
  36. internal enum InboundState {
  37. case expectingHeadersOrStatus
  38. case expectingMessageOrStatus
  39. case expectingStatus
  40. case ignore
  41. var expectingStatus: Bool {
  42. switch self {
  43. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  44. return true
  45. case .ignore:
  46. return false
  47. }
  48. }
  49. }
  50. /// The arity of a response.
  51. internal enum ResponseArity {
  52. case one
  53. case many
  54. /// The inbound state after receiving a response.
  55. var inboundStateAfterResponse: InboundState {
  56. switch self {
  57. case .one:
  58. return .expectingStatus
  59. case .many:
  60. return .expectingMessageOrStatus
  61. }
  62. }
  63. }
  64. private let responseArity: ResponseArity
  65. private var inboundState: InboundState = .expectingHeadersOrStatus {
  66. didSet {
  67. self.logger.debug("inbound state changed from \(oldValue) to \(self.inboundState)")
  68. }
  69. }
  70. /// Creates a new `ClientResponseChannelHandler`.
  71. ///
  72. /// - Parameters:
  73. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  74. /// - statusPromise: A promise to succeed with the outcome of the call.
  75. /// - errorDelegate: An error delegate to call when errors are observed.
  76. /// - timeout: The call timeout specified by the user.
  77. /// - expectedResponses: The number of responses expected.
  78. public init(
  79. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  80. trailingMetadataPromise: EventLoopPromise<HTTPHeaders>,
  81. statusPromise: EventLoopPromise<GRPCStatus>,
  82. errorDelegate: ClientErrorDelegate?,
  83. timeout: GRPCTimeout,
  84. expectedResponses: ResponseArity,
  85. logger: Logger
  86. ) {
  87. self.initialMetadataPromise = initialMetadataPromise
  88. self.trailingMetadataPromise = trailingMetadataPromise
  89. self.statusPromise = statusPromise
  90. self.errorDelegate = errorDelegate
  91. self.timeout = timeout
  92. self.responseArity = expectedResponses
  93. self.logger = logger
  94. }
  95. /// Observe the given status.
  96. ///
  97. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  98. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  99. /// are failed with the given status.
  100. ///
  101. /// - Parameter status: the status to observe.
  102. internal func observeStatus(_ status: GRPCStatus, trailingMetadata: HTTPHeaders?) {
  103. if status.code != .ok {
  104. self.initialMetadataPromise.fail(status)
  105. }
  106. self.trailingMetadataPromise.succeed(trailingMetadata ?? HTTPHeaders())
  107. self.statusPromise.succeed(status)
  108. self.timeoutTask?.cancel()
  109. self.context = nil
  110. }
  111. /// Observe the given error.
  112. ///
  113. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  114. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  115. /// the given error (see `observeStatus(_:)`).
  116. ///
  117. /// - Parameter error: the error to observe.
  118. internal func observeError(_ error: GRPCError) {
  119. self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
  120. self.observeStatus(error.asGRPCStatus(), trailingMetadata: nil)
  121. }
  122. /// Called when a response is received. Subclasses should override this method.
  123. ///
  124. /// - Parameter response: The received response.
  125. internal func onResponse(_ response: _Box<ResponseMessage>) {
  126. // no-op
  127. }
  128. public func handlerAdded(context: ChannelHandlerContext) {
  129. // We need to hold the context in case we timeout and need to close the pipeline.
  130. self.context = context
  131. }
  132. /// Reads inbound data.
  133. ///
  134. /// On receipt of:
  135. /// - headers: the initial metadata promise is succeeded.
  136. /// - message: `onResponse(_:)` is called with the received message.
  137. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  138. /// and response promise (if available) are failed with the status.
  139. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  140. guard self.inboundState != .ignore else {
  141. self.logger.notice("ignoring read data: \(data)")
  142. return
  143. }
  144. switch self.unwrapInboundIn(data) {
  145. case .headers(let headers):
  146. guard self.inboundState == .expectingHeadersOrStatus else {
  147. self.logger.error("invalid state '\(self.inboundState)' while processing headers")
  148. self.errorCaught(
  149. context: context,
  150. error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)"))
  151. )
  152. return
  153. }
  154. self.logger.debug("received response headers: \(headers)")
  155. self.initialMetadataPromise.succeed(headers)
  156. self.inboundState = .expectingMessageOrStatus
  157. case .message(let boxedMessage):
  158. guard self.inboundState == .expectingMessageOrStatus else {
  159. self.logger.error("invalid state '\(self.inboundState)' while processing message")
  160. self.errorCaught(
  161. context: context,
  162. error: GRPCError.client(.responseCardinalityViolation)
  163. )
  164. return
  165. }
  166. self.logger.debug("received response message", metadata: [
  167. MetadataKey.responseType: "\(ResponseMessage.self)"
  168. ])
  169. self.onResponse(boxedMessage)
  170. self.inboundState = self.responseArity.inboundStateAfterResponse
  171. case let .status(status, trailers):
  172. guard self.inboundState.expectingStatus else {
  173. self.logger.error("invalid state '\(self.inboundState)' while processing status")
  174. self.errorCaught(
  175. context: context,
  176. error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)"))
  177. )
  178. return
  179. }
  180. self.logger.debug("received response status: \(status.code)")
  181. self.observeStatus(status, trailingMetadata: trailers)
  182. // We don't expect any more requests/responses beyond this point and we don't need to close
  183. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  184. }
  185. }
  186. public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  187. if let clientUserEvent = event as? GRPCClientUserEvent {
  188. switch clientUserEvent {
  189. case .cancelled:
  190. // We shouldn't observe an error since this event is triggered by the user: just observe the
  191. // status.
  192. self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus(), trailingMetadata: nil)
  193. context.close(promise: nil)
  194. }
  195. }
  196. }
  197. public func channelInactive(context: ChannelHandlerContext) {
  198. self.inboundState = .ignore
  199. self.observeStatus(.init(code: .unavailable, message: nil), trailingMetadata: nil)
  200. context.fireChannelInactive()
  201. }
  202. /// Observe an error from the pipeline and close the channel.
  203. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  204. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  205. context.close(mode: .all, promise: nil)
  206. }
  207. /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
  208. /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
  209. internal func scheduleTimeout(eventLoop: EventLoop) {
  210. guard self.timeout != .infinite else {
  211. return
  212. }
  213. let timeout = self.timeout
  214. self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  215. self?.performTimeout(error: .client(.deadlineExceeded(timeout)))
  216. }
  217. }
  218. /// Called when this call times out. Any promises which have not been fulfilled will be timed out
  219. /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
  220. /// its channel is closed.
  221. ///
  222. /// - Parameter error: The error to fail any promises with.
  223. internal func performTimeout(error: GRPCError) {
  224. self.observeError(error)
  225. self.context?.close(mode: .all, promise: nil)
  226. self.context = nil
  227. }
  228. }
  229. /// A channel handler for client calls which receive a single response.
  230. final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: ClientResponseChannelHandler<ResponseMessage> {
  231. let responsePromise: EventLoopPromise<ResponseMessage>
  232. internal init(
  233. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  234. trailingMetadataPromise: EventLoopPromise<HTTPHeaders>,
  235. responsePromise: EventLoopPromise<ResponseMessage>,
  236. statusPromise: EventLoopPromise<GRPCStatus>,
  237. errorDelegate: ClientErrorDelegate?,
  238. timeout: GRPCTimeout,
  239. logger: Logger
  240. ) {
  241. self.responsePromise = responsePromise
  242. super.init(
  243. initialMetadataPromise: initialMetadataPromise,
  244. trailingMetadataPromise: trailingMetadataPromise,
  245. statusPromise: statusPromise,
  246. errorDelegate: errorDelegate,
  247. timeout: timeout,
  248. expectedResponses: .one,
  249. logger: logger.addingMetadata(
  250. key: MetadataKey.channelHandler,
  251. value: "GRPCClientUnaryResponseChannelHandler"
  252. )
  253. )
  254. }
  255. /// Succeeds the response promise with the given response.
  256. ///
  257. /// - Parameter response: The response received from the service.
  258. override func onResponse(_ response: _Box<ResponseMessage>) {
  259. self.responsePromise.succeed(response.value)
  260. }
  261. /// Fails the response promise if the given status is not `.ok`.
  262. override func observeStatus(_ status: GRPCStatus, trailingMetadata: HTTPHeaders?) {
  263. super.observeStatus(status, trailingMetadata: trailingMetadata)
  264. if status.code != .ok {
  265. self.responsePromise.fail(status)
  266. }
  267. }
  268. }
  269. /// A channel handler for client calls which receive a stream of responses.
  270. final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: ClientResponseChannelHandler<ResponseMessage> {
  271. typealias ResponseHandler = (ResponseMessage) -> Void
  272. let responseHandler: ResponseHandler
  273. internal init(
  274. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  275. trailingMetadataPromise: EventLoopPromise<HTTPHeaders>,
  276. statusPromise: EventLoopPromise<GRPCStatus>,
  277. errorDelegate: ClientErrorDelegate?,
  278. timeout: GRPCTimeout,
  279. logger: Logger,
  280. responseHandler: @escaping ResponseHandler
  281. ) {
  282. self.responseHandler = responseHandler
  283. super.init(
  284. initialMetadataPromise: initialMetadataPromise,
  285. trailingMetadataPromise: trailingMetadataPromise,
  286. statusPromise: statusPromise,
  287. errorDelegate: errorDelegate,
  288. timeout: timeout,
  289. expectedResponses: .many,
  290. logger: logger.addingMetadata(
  291. key: MetadataKey.channelHandler,
  292. value: "GRPCClientStreamingResponseChannelHandler"
  293. )
  294. )
  295. }
  296. /// Calls a user-provided handler with the given response.
  297. ///
  298. /// - Parameter response: The response received from the service.
  299. override func onResponse(_ response: _Box<ResponseMessage>) {
  300. self.responseHandler(response.value)
  301. }
  302. }
  303. /// Client user events.
  304. public enum GRPCClientUserEvent {
  305. /// The call has been cancelled.
  306. case cancelled
  307. }