GRPCClientResponseChannelHandler.swift 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHPACK
  20. import Logging
  21. /// A base channel handler for receiving responses.
  22. ///
  23. /// This includes holding promises for the initial metadata and status of the gRPC call. This handler
  24. /// is also responsible for error handling, via an error delegate and by appropriately failing the
  25. /// aforementioned promises.
  26. internal class GRPCClientResponseChannelHandler<ResponseMessage: Message>: ChannelInboundHandler {
  27. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  28. internal let logger: Logger
  29. internal let initialMetadataPromise: EventLoopPromise<HPACKHeaders>
  30. internal let trailingMetadataPromise: EventLoopPromise<HPACKHeaders>
  31. internal let statusPromise: EventLoopPromise<GRPCStatus>
  32. internal let timeout: GRPCTimeout
  33. internal var timeoutTask: Scheduled<Void>?
  34. internal let errorDelegate: ClientErrorDelegate?
  35. internal var context: ChannelHandlerContext?
  36. /// Creates a new `ClientResponseChannelHandler`.
  37. ///
  38. /// - Parameters:
  39. /// - initialMetadataPromise: A promise to succeed on receiving the initial metadata from the service.
  40. /// - trailingMetadataPromise: A promise to succeed on receiving the trailing metadata from the service.
  41. /// - statusPromise: A promise to succeed with the outcome of the call.
  42. /// - errorDelegate: An error delegate to call when errors are observed.
  43. /// - timeout: The call timeout specified by the user.
  44. public init(
  45. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  46. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  47. statusPromise: EventLoopPromise<GRPCStatus>,
  48. errorDelegate: ClientErrorDelegate?,
  49. timeout: GRPCTimeout,
  50. logger: Logger
  51. ) {
  52. self.initialMetadataPromise = initialMetadataPromise
  53. self.trailingMetadataPromise = trailingMetadataPromise
  54. self.statusPromise = statusPromise
  55. self.errorDelegate = errorDelegate
  56. self.timeout = timeout
  57. self.logger = logger
  58. }
  59. /// Observe the given status.
  60. ///
  61. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  62. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  63. /// are failed with the given status.
  64. ///
  65. /// - Parameter status: the status to observe.
  66. internal func onStatus(_ status: GRPCStatus) {
  67. if status.code != .ok {
  68. self.initialMetadataPromise.fail(status)
  69. }
  70. self.trailingMetadataPromise.fail(status)
  71. self.statusPromise.succeed(status)
  72. self.timeoutTask?.cancel()
  73. self.context = nil
  74. }
  75. /// Observe the given error.
  76. ///
  77. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  78. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  79. /// the given error (see `observeStatus(_:)`).
  80. ///
  81. /// - Parameter error: the error to observe.
  82. internal func onError(_ error: Error) {
  83. let grpcError = (error as? GRPCError) ?? GRPCError.unknown(error, origin: .client)
  84. self.errorDelegate?.didCatchError(grpcError.wrappedError, file: grpcError.file, line: grpcError.line)
  85. self.onStatus(grpcError.asGRPCStatus())
  86. }
  87. /// Called when a response is received. Subclasses should override this method.
  88. ///
  89. /// - Parameter response: The received response.
  90. internal func onResponse(_ response: _Box<ResponseMessage>) {
  91. // no-op
  92. }
  93. public func handlerAdded(context: ChannelHandlerContext) {
  94. // We need to hold the context in case we timeout and need to close the pipeline.
  95. self.context = context
  96. }
  97. /// Reads inbound data.
  98. ///
  99. /// On receipt of:
  100. /// - headers: the initial metadata promise is succeeded.
  101. /// - message: `onResponse(_:)` is called with the received message.
  102. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  103. /// and response promise (if available) are failed with the status.
  104. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  105. switch self.unwrapInboundIn(data) {
  106. case .initialMetadata(let headers):
  107. self.initialMetadataPromise.succeed(headers)
  108. case .message(let message):
  109. self.onResponse(message)
  110. case .trailingMetadata(let trailers):
  111. self.trailingMetadataPromise.succeed(trailers)
  112. case let .status(status):
  113. self.onStatus(status)
  114. // We don't expect any more requests/responses beyond this point and we don't need to close
  115. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  116. }
  117. }
  118. public func channelInactive(context: ChannelHandlerContext) {
  119. self.onStatus(.init(code: .unavailable, message: nil))
  120. context.fireChannelInactive()
  121. }
  122. /// Observe an error from the pipeline and close the channel.
  123. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  124. self.onError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  125. context.close(mode: .all, promise: nil)
  126. }
  127. /// Schedules a timeout on the given event loop if the timeout is not `.infinite`.
  128. /// - Parameter eventLoop: The `eventLoop` to schedule the timeout on.
  129. internal func scheduleTimeout(eventLoop: EventLoop) {
  130. guard self.timeout != .infinite else {
  131. return
  132. }
  133. let timeout = self.timeout
  134. self.timeoutTask = eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in
  135. self?.performTimeout(error: .client(.deadlineExceeded(timeout)))
  136. }
  137. }
  138. /// Called when this call times out. Any promises which have not been fulfilled will be timed out
  139. /// with status `.deadlineExceeded`. If this handler has a context associated with it then the
  140. /// its channel is closed.
  141. ///
  142. /// - Parameter error: The error to fail any promises with.
  143. internal func performTimeout(error: GRPCError) {
  144. self.onError(error)
  145. self.context?.close(mode: .all, promise: nil)
  146. self.context = nil
  147. }
  148. }
  149. /// A channel handler for client calls which receive a single response.
  150. final class GRPCClientUnaryResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
  151. let responsePromise: EventLoopPromise<ResponseMessage>
  152. internal init(
  153. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  154. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  155. responsePromise: EventLoopPromise<ResponseMessage>,
  156. statusPromise: EventLoopPromise<GRPCStatus>,
  157. errorDelegate: ClientErrorDelegate?,
  158. timeout: GRPCTimeout,
  159. logger: Logger
  160. ) {
  161. self.responsePromise = responsePromise
  162. super.init(
  163. initialMetadataPromise: initialMetadataPromise,
  164. trailingMetadataPromise: trailingMetadataPromise,
  165. statusPromise: statusPromise,
  166. errorDelegate: errorDelegate,
  167. timeout: timeout,
  168. logger: logger.addingMetadata(
  169. key: MetadataKey.channelHandler,
  170. value: "GRPCClientUnaryResponseChannelHandler"
  171. )
  172. )
  173. }
  174. /// Succeeds the response promise with the given response.
  175. ///
  176. /// - Parameter response: The response received from the service.
  177. override func onResponse(_ response: _Box<ResponseMessage>) {
  178. self.responsePromise.succeed(response.value)
  179. }
  180. /// Fails the response promise if the given status is not `.ok`.
  181. override func onStatus(_ status: GRPCStatus) {
  182. super.onStatus(status)
  183. if status.code != .ok {
  184. self.responsePromise.fail(status)
  185. }
  186. }
  187. // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
  188. // See: https://bugs.swift.org/browse/SR-11564
  189. //
  190. // TODO: Remove this once SR-11564 is resolved.
  191. override internal func scheduleTimeout(eventLoop: EventLoop) {
  192. super.scheduleTimeout(eventLoop: eventLoop)
  193. }
  194. }
  195. /// A channel handler for client calls which receive a stream of responses.
  196. final class GRPCClientStreamingResponseChannelHandler<ResponseMessage: Message>: GRPCClientResponseChannelHandler<ResponseMessage> {
  197. typealias ResponseHandler = (ResponseMessage) -> Void
  198. let responseHandler: ResponseHandler
  199. internal init(
  200. initialMetadataPromise: EventLoopPromise<HPACKHeaders>,
  201. trailingMetadataPromise: EventLoopPromise<HPACKHeaders>,
  202. statusPromise: EventLoopPromise<GRPCStatus>,
  203. errorDelegate: ClientErrorDelegate?,
  204. timeout: GRPCTimeout,
  205. logger: Logger,
  206. responseHandler: @escaping ResponseHandler
  207. ) {
  208. self.responseHandler = responseHandler
  209. super.init(
  210. initialMetadataPromise: initialMetadataPromise,
  211. trailingMetadataPromise: trailingMetadataPromise,
  212. statusPromise: statusPromise,
  213. errorDelegate: errorDelegate,
  214. timeout: timeout,
  215. logger: logger.addingMetadata(
  216. key: MetadataKey.channelHandler,
  217. value: "GRPCClientStreamingResponseChannelHandler"
  218. )
  219. )
  220. }
  221. /// Calls a user-provided handler with the given response.
  222. ///
  223. /// - Parameter response: The response received from the service.
  224. override func onResponse(_ response: _Box<ResponseMessage>) {
  225. self.responseHandler(response.value)
  226. }
  227. // Workaround for SR-11564 (observed in Xcode 11.2 Beta).
  228. // See: https://bugs.swift.org/browse/SR-11564
  229. //
  230. // TODO: Remove this once SR-11564 is resolved.
  231. override internal func scheduleTimeout(eventLoop: EventLoop) {
  232. super.scheduleTimeout(eventLoop: eventLoop)
  233. }
  234. }
  235. /// Client user events.
  236. public enum GRPCClientUserEvent {
  237. /// The call has been cancelled.
  238. case cancelled
  239. }