GRPCClientResponseChannelHandler.swift 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHPACK
  20. import Logging
  21. /// A base channel handler for receiving responses.
  22. ///
  23. /// This includes holding promises for the initial metadata and status of the gRPC call. This handler
  24. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  25. /// aforementioned promises.
  26. internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
  27. public typealias InboundIn = _GRPCClientResponsePart<ResponseMessage>
  28. internal let logger: Logger
  29. internal var stopwatch: Stopwatch?
  30. internal let initialMetadataPromise: EventLoopPromise<HPACKHeaders>
  31. internal let trailingMetadataPromise: EventLoopPromise<HPACKHeaders>
  32. internal let statusPromise: EventLoopPromise<GRPCStatus>
  33. internal let timeout: GRPCTimeout
  34. internal var timeoutTask: Scheduled<Void>?
  35. internal let errorDelegate: ClientErrorDelegate?
  36. internal var context: ChannelHandlerContext?
  37. /// Creates a new `ClientResponseChannelHandler`.
  38. ///
  39. /// - Parameters:
  40. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  41. /// - trailingMetadataPromise: A promise to succeed on receiving the trailing metadata from the service.
  42. /// - statusPromise: A promise to succeed with the outcome of the call.
  43. /// - errorDelegate: An error delegate to call when errors are observed.
  44. /// - timeout: The call timeout specified by the user.
  45. public init(
  46. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  47. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  48. statusPromise: EventLoopPromise<GRPCStatus>,
  49. errorDelegate: ClientErrorDelegate?,
  50. timeout: GRPCTimeout,
  51. logger: Logger
  52. ) {
  53. self.initialMetadataPromise = initialMetadataPromise
  54. self.trailingMetadataPromise = trailingMetadataPromise
  55. self.statusPromise = statusPromise
  56. self.errorDelegate = errorDelegate
  57. self.timeout = timeout
  58. self.logger = logger
  59. }
  60. /// Observe the given status.
  61. ///
  62. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  63. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  64. /// are failed with the given status.
  65. ///
  66. /// - Parameter status: the status to observe.
  67. internal func onStatus(_ status: GRPCStatus) {
  68. if status.code != .ok {
  69. self.initialMetadataPromise.fail(status)
  70. }
  71. self.trailingMetadataPromise.fail(status)
  72. self.statusPromise.succeed(status)
  73. self.timeoutTask?.cancel()
  74. self.context = nil
  75. if let stopwatch = self.stopwatch {
  76. let millis = stopwatch.elapsedMillis()
  77. self.logger.debug("rpc call finished", metadata: [
  78. "duration_ms": "\(millis)",
  79. "status_code": "\(status.code.rawValue)"
  80. ])
  81. self.stopwatch = nil
  82. }
  83. }
  84. /// Observe the given error.
  85. ///
  86. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  87. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  88. /// the given error (see `observeStatus(_:)`).
  89. ///
  90. /// - Parameter error: the error to observe.
  91. internal func onError(_ error: Error) {
  92. let grpcError = (error as? GRPCError) ?? GRPCError.unknown(error, origin: .client)
  93. self.errorDelegate?.didCatchError(grpcError.wrappedError, logger: self.logger, file: grpcError.file, line: grpcError.line)
  94. self.onStatus(grpcError.asGRPCStatus())
  95. }
  96. /// Called when a response is received. Subclasses should override this method.
  97. ///
  98. /// - Parameter response: The received response.
  99. internal func onResponse(_ response: _Box<ResponseMessage>) {
  100. // no-op
  101. }
  102. public func handlerAdded(context: ChannelHandlerContext) {
  103. // We need to hold the context in case we timeout and need to close the pipeline.
  104. self.context = context
  105. self.stopwatch = .start()
  106. }
  107. /// Reads inbound data.
  108. ///
  109. /// On receipt of:
  110. /// - headers: the initial metadata promise is succeeded.
  111. /// - message: `onResponse(_:)` is called with the received message.
  112. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  113. /// and response promise (if available) are failed with the status.
  114. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  115. switch self.unwrapInboundIn(data) {
  116. case .initialMetadata(let headers):
  117. self.initialMetadataPromise.succeed(headers)
  118. case .message(let message):
  119. self.onResponse(message)
  120. case .trailingMetadata(let trailers):
  121. self.trailingMetadataPromise.succeed(trailers)
  122. case let .status(status):
  123. self.onStatus(status)
  124. // We don't expect any more requests/responses beyond this point and we don't need to close
  125. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  126. }
  127. }
  128. public func channelInactive(context: ChannelHandlerContext) {
  129. self.onStatus(.init(code: .unavailable, message: nil))
  130. context.fireChannelInactive()
  131. }
  132. /// Observe an error from the pipeline and close the channel.
  133. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  134. self.onError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  135. context.close(mode: .all, promise: nil)
  136. }
  137. /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
  138. /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
  139. internal func scheduleTimeout(eventLoop: EventLoop) {
  140. guard self.timeout != .infinite else {
  141. return
  142. }
  143. let timeout = self.timeout
  144. self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  145. self?.performTimeout(error: .client(.deadlineExceeded(timeout)))
  146. }
  147. }
  148. /// Called when this call times out. Any promises which have not been fulfilled will be timed out
  149. /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
  150. /// its channel is closed.
  151. ///
  152. /// - Parameter error: The error to fail any promises with.
  153. internal func performTimeout(error: GRPCError) {
  154. self.onError(error)
  155. self.context?.close(mode: .all, promise: nil)
  156. self.context = nil
  157. }
  158. }
  159. /// A channel handler for client calls which receive a single response.
  160. final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
  161. let responsePromise: EventLoopPromise<ResponseMessage>
  162. internal init(
  163. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  164. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  165. responsePromise: EventLoopPromise<ResponseMessage>,
  166. statusPromise: EventLoopPromise<GRPCStatus>,
  167. errorDelegate: ClientErrorDelegate?,
  168. timeout: GRPCTimeout,
  169. logger: Logger
  170. ) {
  171. self.responsePromise = responsePromise
  172. super.init(
  173. initialMetadataPromise: initialMetadataPromise,
  174. trailingMetadataPromise: trailingMetadataPromise,
  175. statusPromise: statusPromise,
  176. errorDelegate: errorDelegate,
  177. timeout: timeout,
  178. logger: logger
  179. )
  180. }
  181. /// Succeeds the response promise with the given response.
  182. ///
  183. /// - Parameter response: The response received from the service.
  184. override func onResponse(_ response: _Box<ResponseMessage>) {
  185. self.responsePromise.succeed(response.value)
  186. }
  187. /// Fails the response promise if the given status is not `.ok`.
  188. override func onStatus(_ status: GRPCStatus) {
  189. super.onStatus(status)
  190. if status.code != .ok {
  191. self.responsePromise.fail(status)
  192. }
  193. }
  194. // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
  195. // See: https://bugs.swift.org/browse/SR-11564
  196. //
  197. // TODO: Remove this once SR-11564 is resolved.
  198. override internal func scheduleTimeout(eventLoop: EventLoop) {
  199. super.scheduleTimeout(eventLoop: eventLoop)
  200. }
  201. }
  202. /// A channel handler for client calls which receive a stream of responses.
  203. final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
  204. typealias ResponseHandler = (ResponseMessage) -> Void
  205. let responseHandler: ResponseHandler
  206. internal init(
  207. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  208. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  209. statusPromise: EventLoopPromise<GRPCStatus>,
  210. errorDelegate: ClientErrorDelegate?,
  211. timeout: GRPCTimeout,
  212. logger: Logger,
  213. responseHandler: @escaping ResponseHandler
  214. ) {
  215. self.responseHandler = responseHandler
  216. super.init(
  217. initialMetadataPromise: initialMetadataPromise,
  218. trailingMetadataPromise: trailingMetadataPromise,
  219. statusPromise: statusPromise,
  220. errorDelegate: errorDelegate,
  221. timeout: timeout,
  222. logger: logger
  223. )
  224. }
  225. /// Calls a user-provided handler with the given response.
  226. ///
  227. /// - Parameter response: The response received from the service.
  228. override func onResponse(_ response: _Box<ResponseMessage>) {
  229. self.responseHandler(response.value)
  230. }
  231. // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
  232. // See: https://bugs.swift.org/browse/SR-11564
  233. //
  234. // TODO: Remove this once SR-11564 is resolved.
  235. override internal func scheduleTimeout(eventLoop: EventLoop) {
  236. super.scheduleTimeout(eventLoop: eventLoop)
  237. }
  238. }
  239. /// Client user events.
  240. internal enum GRPCClientUserEvent {
  241. /// The call has been cancelled.
  242. case cancelled
  243. }