GRPCClientResponseChannelHandler.swift 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. /// A base channel handler for receiving responses.
  21. ///
  22. /// This includes hold promises for the initial metadata and status of the gRPC call. This handler
  23. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  24. /// aforementioned promises.
  25. internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
  26. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  27. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  28. internal let statusPromise: EventLoopPromise<GRPCStatus>
  29. internal let timeout: GRPCTimeout
  30. internal var timeoutTask: Scheduled<Void>?
  31. internal let errorDelegate: ClientErrorDelegate?
  32. internal enum InboundState {
  33. case expectingHeadersOrStatus
  34. case expectingMessageOrStatus
  35. case expectingStatus
  36. case ignore
  37. var expectingStatus: Bool {
  38. switch self {
  39. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  40. return true
  41. case .ignore:
  42. return false
  43. }
  44. }
  45. }
  46. /// The arity of a response.
  47. internal enum ResponseArity {
  48. case one
  49. case many
  50. /// The inbound state after receiving a response.
  51. var inboundStateAfterResponse: InboundState {
  52. switch self {
  53. case .one:
  54. return .expectingStatus
  55. case .many:
  56. return .expectingMessageOrStatus
  57. }
  58. }
  59. }
  60. private let responseArity: ResponseArity
  61. private var inboundState: InboundState = .expectingHeadersOrStatus
  62. /// Creates a new `GRPCClientResponseChannelHandler`.
  63. ///
  64. /// - Parameters:
  65. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  66. /// - statusPromise: A promise to succeed with the outcome of the call.
  67. /// - errorDelegate: An error delegate to call when errors are observed.
  68. /// - timeout: The call timeout specified by the user.
  69. /// - expectedResponses: The number of responses expected.
  70. public init(
  71. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  72. statusPromise: EventLoopPromise<GRPCStatus>,
  73. errorDelegate: ClientErrorDelegate?,
  74. timeout: GRPCTimeout,
  75. expectedResponses: ResponseArity
  76. ) {
  77. self.initialMetadataPromise = initialMetadataPromise
  78. self.statusPromise = statusPromise
  79. self.errorDelegate = errorDelegate
  80. self.timeout = timeout
  81. self.responseArity = expectedResponses
  82. }
  83. /// Observe the given status.
  84. ///
  85. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  86. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  87. /// are failed with the given status.
  88. ///
  89. /// - Parameter status: the status to observe.
  90. internal func observeStatus(_ status: GRPCStatus) {
  91. if status.code != .ok {
  92. self.initialMetadataPromise.fail(status)
  93. }
  94. self.statusPromise.succeed(status)
  95. self.timeoutTask?.cancel()
  96. }
  97. /// Observe the given error.
  98. ///
  99. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  100. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  101. /// the given error (see `observeStatus(_:)`).
  102. ///
  103. /// - Parameter error: the error to observe.
  104. internal func observeError(_ error: GRPCError) {
  105. self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
  106. self.observeStatus(error.asGRPCStatus())
  107. }
  108. /// Called when a response is received. Subclasses should override this method.
  109. ///
  110. /// - Parameter response: The received response.
  111. internal func onResponse(_ response: _Box<ResponseMessage>) {
  112. // no-op
  113. }
  114. /// Reads inbound data.
  115. ///
  116. /// On receipt of:
  117. /// - headers: the initial metadata promise is succeeded.
  118. /// - message: `onResponse(_:)` is called with the received message.
  119. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  120. /// and response promise (if available) are failed with the status.
  121. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  122. guard self.inboundState != .ignore else { return }
  123. switch self.unwrapInboundIn(data) {
  124. case .headers(let headers):
  125. guard self.inboundState == .expectingHeadersOrStatus else {
  126. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)")))
  127. return
  128. }
  129. self.initialMetadataPromise.succeed(headers)
  130. self.inboundState = .expectingMessageOrStatus
  131. case .message(let boxedMessage):
  132. guard self.inboundState == .expectingMessageOrStatus else {
  133. self.errorCaught(context: context, error: GRPCError.client(.responseCardinalityViolation))
  134. return
  135. }
  136. self.onResponse(boxedMessage)
  137. self.inboundState = self.responseArity.inboundStateAfterResponse
  138. case .status(let status):
  139. guard self.inboundState.expectingStatus else {
  140. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)")))
  141. return
  142. }
  143. self.observeStatus(status)
  144. // We don't expect any more requests/responses beyond this point and we don't need to close
  145. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  146. }
  147. }
  148. public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  149. if let clientUserEvent = event as? GRPCClientUserEvent {
  150. switch clientUserEvent {
  151. case .cancelled:
  152. // We shouldn't observe an error since this event is triggered by the user: just observe the
  153. // status.
  154. self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus())
  155. context.close(promise: nil)
  156. }
  157. }
  158. }
  159. public func channelActive(context: ChannelHandlerContext) {
  160. if self.timeout != .infinite {
  161. let timeout = self.timeout
  162. self.timeoutTask = context.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  163. self?.errorCaught(context: context, error: GRPCError.client(.deadlineExceeded(timeout)))
  164. }
  165. }
  166. }
  167. public func channelInactive(context: ChannelHandlerContext) {
  168. self.inboundState = .ignore
  169. self.observeStatus(.init(code: .unavailable, message: nil))
  170. context.fireChannelInactive()
  171. }
  172. /// Observe an error from the pipeline and close the channel.
  173. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  174. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  175. context.close(mode: .all, promise: nil)
  176. }
  177. }
  178. /// A channel handler for client calls which recieve a single response.
  179. final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
  180. let responsePromise: EventLoopPromise<ResponseMessage>
  181. internal init(
  182. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  183. responsePromise: EventLoopPromise<ResponseMessage>,
  184. statusPromise: EventLoopPromise<GRPCStatus>,
  185. errorDelegate: ClientErrorDelegate?,
  186. timeout: GRPCTimeout
  187. ) {
  188. self.responsePromise = responsePromise
  189. super.init(
  190. initialMetadataPromise: initialMetadataPromise,
  191. statusPromise: statusPromise,
  192. errorDelegate: errorDelegate,
  193. timeout: timeout,
  194. expectedResponses: .one
  195. )
  196. }
  197. /// Succeeds the response promise with the given response.
  198. ///
  199. /// - Parameter response: The response received from the service.
  200. override func onResponse(_ response: _Box<ResponseMessage>) {
  201. self.responsePromise.succeed(response.value)
  202. }
  203. /// Fails the response promise if the given status is not `.ok`.
  204. override func observeStatus(_ status: GRPCStatus) {
  205. super.observeStatus(status)
  206. if status.code != .ok {
  207. self.responsePromise.fail(status)
  208. }
  209. }
  210. }
  211. /// A channel handler for client calls which recieve a stream of responses.
  212. final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
  213. typealias ResponseHandler = (ResponseMessage) -> Void
  214. let responseHandler: ResponseHandler
  215. internal init(
  216. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  217. statusPromise: EventLoopPromise<GRPCStatus>,
  218. errorDelegate: ClientErrorDelegate?,
  219. timeout: GRPCTimeout,
  220. responseHandler: @escaping ResponseHandler
  221. ) {
  222. self.responseHandler = responseHandler
  223. super.init(
  224. initialMetadataPromise: initialMetadataPromise,
  225. statusPromise: statusPromise,
  226. errorDelegate: errorDelegate,
  227. timeout: timeout,
  228. expectedResponses: .many
  229. )
  230. }
  231. /// Calls a user-provided handler with the given response.
  232. ///
  233. /// - Parameter response: The response received from the service.
  234. override func onResponse(_ response: _Box<ResponseMessage>) {
  235. self.responseHandler(response.value)
  236. }
  237. }
  238. /// Client user events.
  239. public enum GRPCClientUserEvent {
  240. /// The call has been cancelled.
  241. case cancelled
  242. }