GRPCClientResponseChannelHandler.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHPACK
  20. import Logging
  21. /// A base channel handler for receiving responses.
  22. ///
  23. /// This includes holding promises for the initial metadata and status of the gRPC call. This handler
  24. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  25. /// aforementioned promises.
  26. internal class GRPCClientResponseChannelHandler<ResponsePayload: GRPCPayload>: ChannelInboundHandler {
  27. public typealias InboundIn = _GRPCClientResponsePart<ResponsePayload>
  28. internal let logger: Logger
  29. internal var stopwatch: Stopwatch?
  30. internal let initialMetadataPromise: EventLoopPromise<HPACKHeaders>
  31. internal let trailingMetadataPromise: EventLoopPromise<HPACKHeaders>
  32. internal let statusPromise: EventLoopPromise<GRPCStatus>
  33. internal let timeout: GRPCTimeout
  34. internal var timeoutTask: Scheduled<Void>?
  35. internal let errorDelegate: ClientErrorDelegate?
  36. internal var context: ChannelHandlerContext?
  37. /// Creates a new `ClientResponseChannelHandler`.
  38. ///
  39. /// - Parameters:
  40. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  41. /// - trailingMetadataPromise: A promise to succeed on receiving the trailing metadata from the service.
  42. /// - statusPromise: A promise to succeed with the outcome of the call.
  43. /// - errorDelegate: An error delegate to call when errors are observed.
  44. /// - timeout: The call timeout specified by the user.
  45. public init(
  46. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  47. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  48. statusPromise: EventLoopPromise<GRPCStatus>,
  49. errorDelegate: ClientErrorDelegate?,
  50. timeout: GRPCTimeout,
  51. logger: Logger
  52. ) {
  53. self.initialMetadataPromise = initialMetadataPromise
  54. self.trailingMetadataPromise = trailingMetadataPromise
  55. self.statusPromise = statusPromise
  56. self.errorDelegate = errorDelegate
  57. self.timeout = timeout
  58. self.logger = logger
  59. }
  60. /// Observe the given status.
  61. ///
  62. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  63. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  64. /// are failed with the given status.
  65. ///
  66. /// - Parameter status: the status to observe.
  67. internal func onStatus(_ status: GRPCStatus) {
  68. if status.code != .ok {
  69. self.initialMetadataPromise.fail(status)
  70. }
  71. self.trailingMetadataPromise.fail(status)
  72. self.statusPromise.succeed(status)
  73. self.timeoutTask?.cancel()
  74. self.context = nil
  75. if let stopwatch = self.stopwatch {
  76. let millis = stopwatch.elapsedMillis()
  77. self.logger.debug("rpc call finished", metadata: [
  78. "duration_ms": "\(millis)",
  79. "status_code": "\(status.code.rawValue)"
  80. ])
  81. self.stopwatch = nil
  82. }
  83. }
  84. /// Observe the given error.
  85. ///
  86. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:logger:file:line:)` method is
  87. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  88. /// the given error (see `observeStatus(_:)`).
  89. ///
  90. /// - Parameter error: the error to observe.
  91. internal func onError(_ error: Error) {
  92. if let errorWithContext = error as? GRPCError.WithContext {
  93. self.errorDelegate?.didCatchError(
  94. errorWithContext.error,
  95. logger: self.logger,
  96. file: errorWithContext.file,
  97. line: errorWithContext.line
  98. )
  99. self.onStatus(errorWithContext.error.makeGRPCStatus())
  100. } else {
  101. self.errorDelegate?.didCatchErrorWithoutContext(error, logger: self.logger)
  102. self.onStatus((error as? GRPCStatusTransformable)?.makeGRPCStatus() ?? .processingError)
  103. }
  104. }
  105. /// Called when a response is received. Subclasses should override this method.
  106. ///
  107. /// - Parameter response: The received response.
  108. internal func onResponse(_ response: _MessageContext<ResponsePayload>) {
  109. // no-op
  110. }
  111. public func handlerAdded(context: ChannelHandlerContext) {
  112. // We need to hold the context in case we timeout and need to close the pipeline.
  113. self.context = context
  114. self.stopwatch = .start()
  115. }
  116. /// Reads inbound data.
  117. ///
  118. /// On receipt of:
  119. /// - headers: the initial metadata promise is succeeded.
  120. /// - message: `onResponse(_:)` is called with the received message.
  121. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  122. /// and response promise (if available) are failed with the status.
  123. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  124. switch self.unwrapInboundIn(data) {
  125. case .initialMetadata(let headers):
  126. self.initialMetadataPromise.succeed(headers)
  127. case .message(let message):
  128. self.onResponse(message)
  129. case .trailingMetadata(let trailers):
  130. self.trailingMetadataPromise.succeed(trailers)
  131. case let .status(status):
  132. self.onStatus(status)
  133. // We don't expect any more requests/responses beyond this point and we don't need to close
  134. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  135. }
  136. }
  137. public func channelInactive(context: ChannelHandlerContext) {
  138. self.onStatus(.init(code: .unavailable, message: nil))
  139. context.fireChannelInactive()
  140. }
  141. /// Observe an error from the pipeline and close the channel.
  142. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  143. self.onError(error)
  144. context.close(mode: .all, promise: nil)
  145. }
  146. /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
  147. /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
  148. internal func scheduleTimeout(eventLoop: EventLoop) {
  149. guard self.timeout != .infinite else {
  150. return
  151. }
  152. let timeout = self.timeout
  153. self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  154. self?.performTimeout(error: GRPCError.RPCTimedOut(timeout).captureContext())
  155. }
  156. }
  157. /// Called when this call times out. Any promises which have not been fulfilled will be timed out
  158. /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
  159. /// its channel is closed.
  160. ///
  161. /// - Parameter error: The error to fail any promises with.
  162. internal func performTimeout(error: GRPCError.WithContext) {
  163. self.onError(error)
  164. self.context?.close(mode: .all, promise: nil)
  165. self.context = nil
  166. }
  167. }
  168. /// A channel handler for client calls which receive a single response.
  169. final class GRPCClientUnaryResponseChannelHandler<ResponsePayload: GRPCPayload>: GRPCClientResponseChannelHandler<ResponsePayload> {
  170. let responsePromise: EventLoopPromise<ResponsePayload>
  171. internal init(
  172. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  173. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  174. responsePromise: EventLoopPromise<ResponsePayload>,
  175. statusPromise: EventLoopPromise<GRPCStatus>,
  176. errorDelegate: ClientErrorDelegate?,
  177. timeout: GRPCTimeout,
  178. logger: Logger
  179. ) {
  180. self.responsePromise = responsePromise
  181. super.init(
  182. initialMetadataPromise: initialMetadataPromise,
  183. trailingMetadataPromise: trailingMetadataPromise,
  184. statusPromise: statusPromise,
  185. errorDelegate: errorDelegate,
  186. timeout: timeout,
  187. logger: logger
  188. )
  189. }
  190. /// Succeeds the response promise with the given response.
  191. ///
  192. /// - Parameter response: The response received from the service.
  193. override func onResponse(_ response: _MessageContext<ResponsePayload>) {
  194. self.responsePromise.succeed(response.message)
  195. }
  196. /// Fails the response promise if the given status is not `.ok`.
  197. override func onStatus(_ status: GRPCStatus) {
  198. super.onStatus(status)
  199. if status.code != .ok {
  200. self.responsePromise.fail(status)
  201. }
  202. }
  203. // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
  204. // See: https://bugs.swift.org/browse/SR-11564
  205. //
  206. // TODO: Remove this once SR-11564 is resolved.
  207. override internal func scheduleTimeout(eventLoop: EventLoop) {
  208. super.scheduleTimeout(eventLoop: eventLoop)
  209. }
  210. }
  211. /// A channel handler for client calls which receive a stream of responses.
  212. final class GRPCClientStreamingResponseChannelHandler<ResponsePayload: GRPCPayload>: GRPCClientResponseChannelHandler<ResponsePayload> {
  213. typealias ResponseHandler = (ResponsePayload) -> Void
  214. let responseHandler: ResponseHandler
  215. internal init(
  216. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  217. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  218. statusPromise: EventLoopPromise<GRPCStatus>,
  219. errorDelegate: ClientErrorDelegate?,
  220. timeout: GRPCTimeout,
  221. logger: Logger,
  222. responseHandler: @escaping ResponseHandler
  223. ) {
  224. self.responseHandler = responseHandler
  225. super.init(
  226. initialMetadataPromise: initialMetadataPromise,
  227. trailingMetadataPromise: trailingMetadataPromise,
  228. statusPromise: statusPromise,
  229. errorDelegate: errorDelegate,
  230. timeout: timeout,
  231. logger: logger
  232. )
  233. }
  234. /// Calls a user-provided handler with the given response.
  235. ///
  236. /// - Parameter response: The response received from the service.
  237. override func onResponse(_ response: _MessageContext<ResponsePayload>) {
  238. self.responseHandler(response.message)
  239. }
  240. // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
  241. // See: https://bugs.swift.org/browse/SR-11564
  242. //
  243. // TODO: Remove this once SR-11564 is resolved.
  244. override internal func scheduleTimeout(eventLoop: EventLoop) {
  245. super.scheduleTimeout(eventLoop: eventLoop)
  246. }
  247. }
  248. /// Client user events.
  249. internal enum GRPCClientUserEvent {
  250. /// The call has been cancelled.
  251. case cancelled
  252. }