ClientResponseChannelHandler.swift 11 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// A base channel handler for receiving responses.
  22. ///
  23. /// This includes hold promises for the initial metadata and status of the gRPC call. This handler
  24. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  25. /// aforementioned promises.
  26. internal class ClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
  27. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  28. internal let logger: Logger
  29. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  30. internal let statusPromise: EventLoopPromise<GRPCStatus>
  31. internal let timeout: GRPCTimeout
  32. internal var timeoutTask: Scheduled<Void>?
  33. internal let errorDelegate: ClientErrorDelegate?
  34. internal enum InboundState {
  35. case expectingHeadersOrStatus
  36. case expectingMessageOrStatus
  37. case expectingStatus
  38. case ignore
  39. var expectingStatus: Bool {
  40. switch self {
  41. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  42. return true
  43. case .ignore:
  44. return false
  45. }
  46. }
  47. }
  48. /// The arity of a response.
  49. internal enum ResponseArity {
  50. case one
  51. case many
  52. /// The inbound state after receiving a response.
  53. var inboundStateAfterResponse: InboundState {
  54. switch self {
  55. case .one:
  56. return .expectingStatus
  57. case .many:
  58. return .expectingMessageOrStatus
  59. }
  60. }
  61. }
  62. private let responseArity: ResponseArity
  63. private var inboundState: InboundState = .expectingHeadersOrStatus {
  64. didSet {
  65. self.logger.debug("inbound state changed from \(oldValue) to \(self.inboundState)")
  66. }
  67. }
  68. /// Creates a new `ClientResponseChannelHandler`.
  69. ///
  70. /// - Parameters:
  71. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  72. /// - statusPromise: A promise to succeed with the outcome of the call.
  73. /// - errorDelegate: An error delegate to call when errors are observed.
  74. /// - timeout: The call timeout specified by the user.
  75. /// - expectedResponses: The number of responses expected.
  76. public init(
  77. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  78. statusPromise: EventLoopPromise<GRPCStatus>,
  79. errorDelegate: ClientErrorDelegate?,
  80. timeout: GRPCTimeout,
  81. expectedResponses: ResponseArity,
  82. logger: Logger
  83. ) {
  84. self.initialMetadataPromise = initialMetadataPromise
  85. self.statusPromise = statusPromise
  86. self.errorDelegate = errorDelegate
  87. self.timeout = timeout
  88. self.responseArity = expectedResponses
  89. self.logger = logger
  90. }
  91. /// Observe the given status.
  92. ///
  93. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  94. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  95. /// are failed with the given status.
  96. ///
  97. /// - Parameter status: the status to observe.
  98. internal func observeStatus(_ status: GRPCStatus) {
  99. if status.code != .ok {
  100. self.initialMetadataPromise.fail(status)
  101. }
  102. self.statusPromise.succeed(status)
  103. self.timeoutTask?.cancel()
  104. }
  105. /// Observe the given error.
  106. ///
  107. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  108. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  109. /// the given error (see `observeStatus(_:)`).
  110. ///
  111. /// - Parameter error: the error to observe.
  112. internal func observeError(_ error: GRPCError) {
  113. self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
  114. self.observeStatus(error.asGRPCStatus())
  115. }
  116. /// Called when a response is received. Subclasses should override this method.
  117. ///
  118. /// - Parameter response: The received response.
  119. internal func onResponse(_ response: _Box<ResponseMessage>) {
  120. // no-op
  121. }
  122. /// Reads inbound data.
  123. ///
  124. /// On receipt of:
  125. /// - headers: the initial metadata promise is succeeded.
  126. /// - message: `onResponse(_:)` is called with the received message.
  127. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  128. /// and response promise (if available) are failed with the status.
  129. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  130. guard self.inboundState != .ignore else {
  131. self.logger.notice("ignoring read data: \(data)")
  132. return
  133. }
  134. switch self.unwrapInboundIn(data) {
  135. case .headers(let headers):
  136. guard self.inboundState == .expectingHeadersOrStatus else {
  137. self.logger.error("invalid state '\(self.inboundState)' while processing headers")
  138. self.errorCaught(
  139. context: context,
  140. error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)"))
  141. )
  142. return
  143. }
  144. self.logger.info("received response headers: \(headers)")
  145. self.initialMetadataPromise.succeed(headers)
  146. self.inboundState = .expectingMessageOrStatus
  147. case .message(let boxedMessage):
  148. guard self.inboundState == .expectingMessageOrStatus else {
  149. self.logger.error("invalid state '\(self.inboundState)' while processing message")
  150. self.errorCaught(
  151. context: context,
  152. error: GRPCError.client(.responseCardinalityViolation)
  153. )
  154. return
  155. }
  156. self.logger.info("received response message", metadata: [
  157. MetadataKey.responseType: "\(ResponseMessage.self)"
  158. ])
  159. self.onResponse(boxedMessage)
  160. self.inboundState = self.responseArity.inboundStateAfterResponse
  161. case .status(let status):
  162. guard self.inboundState.expectingStatus else {
  163. self.logger.error("invalid state '\(self.inboundState)' while processing status")
  164. self.errorCaught(
  165. context: context,
  166. error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)"))
  167. )
  168. return
  169. }
  170. self.logger.info("received response status: \(status.code)")
  171. self.observeStatus(status)
  172. // We don't expect any more requests/responses beyond this point and we don't need to close
  173. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  174. }
  175. }
  176. public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  177. if let clientUserEvent = event as? GRPCClientUserEvent {
  178. switch clientUserEvent {
  179. case .cancelled:
  180. // We shouldn't observe an error since this event is triggered by the user: just observe the
  181. // status.
  182. self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus())
  183. context.close(promise: nil)
  184. }
  185. }
  186. }
  187. public func channelActive(context: ChannelHandlerContext) {
  188. if self.timeout != .infinite {
  189. let timeout = self.timeout
  190. self.timeoutTask = context.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  191. self?.errorCaught(
  192. context: context,
  193. error: GRPCError.client(.deadlineExceeded(timeout))
  194. )
  195. }
  196. }
  197. }
  198. public func channelInactive(context: ChannelHandlerContext) {
  199. self.inboundState = .ignore
  200. self.observeStatus(.init(code: .unavailable, message: nil))
  201. context.fireChannelInactive()
  202. }
  203. /// Observe an error from the pipeline and close the channel.
  204. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  205. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  206. context.close(mode: .all, promise: nil)
  207. }
  208. }
  209. /// A channel handler for client calls which recieve a single response.
  210. final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: ClientResponseChannelHandler<ResponseMessage> {
  211. let responsePromise: EventLoopPromise<ResponseMessage>
  212. internal init(
  213. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  214. responsePromise: EventLoopPromise<ResponseMessage>,
  215. statusPromise: EventLoopPromise<GRPCStatus>,
  216. errorDelegate: ClientErrorDelegate?,
  217. timeout: GRPCTimeout,
  218. logger: Logger
  219. ) {
  220. self.responsePromise = responsePromise
  221. super.init(
  222. initialMetadataPromise: initialMetadataPromise,
  223. statusPromise: statusPromise,
  224. errorDelegate: errorDelegate,
  225. timeout: timeout,
  226. expectedResponses: .one,
  227. logger: logger.addingMetadata(
  228. key: MetadataKey.channelHandler,
  229. value: "GRPCClientUnaryResponseChannelHandler"
  230. )
  231. )
  232. }
  233. /// Succeeds the response promise with the given response.
  234. ///
  235. /// - Parameter response: The response received from the service.
  236. override func onResponse(_ response: _Box<ResponseMessage>) {
  237. self.responsePromise.succeed(response.value)
  238. }
  239. /// Fails the response promise if the given status is not `.ok`.
  240. override func observeStatus(_ status: GRPCStatus) {
  241. super.observeStatus(status)
  242. if status.code != .ok {
  243. self.responsePromise.fail(status)
  244. }
  245. }
  246. }
  247. /// A channel handler for client calls which recieve a stream of responses.
  248. final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: ClientResponseChannelHandler<ResponseMessage> {
  249. typealias ResponseHandler = (ResponseMessage) -> Void
  250. let responseHandler: ResponseHandler
  251. internal init(
  252. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  253. statusPromise: EventLoopPromise<GRPCStatus>,
  254. errorDelegate: ClientErrorDelegate?,
  255. timeout: GRPCTimeout,
  256. logger: Logger,
  257. responseHandler: @escaping ResponseHandler
  258. ) {
  259. self.responseHandler = responseHandler
  260. super.init(
  261. initialMetadataPromise: initialMetadataPromise,
  262. statusPromise: statusPromise,
  263. errorDelegate: errorDelegate,
  264. timeout: timeout,
  265. expectedResponses: .many,
  266. logger: logger.addingMetadata(
  267. key: MetadataKey.channelHandler,
  268. value: "GRPCClientStreamingResponseChannelHandler"
  269. )
  270. )
  271. }
  272. /// Calls a user-provided handler with the given response.
  273. ///
  274. /// - Parameter response: The response received from the service.
  275. override func onResponse(_ response: _Box<ResponseMessage>) {
  276. self.responseHandler(response.value)
  277. }
  278. }
  279. /// Client user events.
  280. public enum GRPCClientUserEvent {
  281. /// The call has been cancelled.
  282. case cancelled
  283. }