GRPCClientChannelHandler.swift 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import SwiftProtobuf
  20. /// The final client-side channel handler.
  21. ///
  22. /// This handler holds promises for the initial metadata and the status, as well as an observer
  23. /// for responses. For unary and client-streaming calls the observer will succeed a response
  24. /// promise. For server-streaming and bidirectional-streaming the observer will call the supplied
  25. /// callback with each response received.
  26. ///
  27. /// Errors are also handled by the channel handler. Promises for the initial metadata and
  28. /// response (if applicable) are failed with first error received. The status promise is __succeeded__
  29. /// with the error as the result of `GRPCStatusTransformable.asGRPCStatus()`, if available.
  30. /// The stream is also closed and any inbound or outbound messages are ignored.
  31. internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage: Message> {
  32. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  33. internal let statusPromise: EventLoopPromise<GRPCStatus>
  34. internal let responseObserver: ResponseObserver<ResponseMessage>
  35. internal let errorDelegate: ClientErrorDelegate?
  36. /// A promise for a unary response.
  37. internal var responsePromise: EventLoopPromise<ResponseMessage>? {
  38. guard case .succeedPromise(let promise) = responseObserver else { return nil }
  39. return promise
  40. }
  41. private enum InboundState {
  42. case expectingHeadersOrStatus
  43. case expectingMessageOrStatus
  44. case expectingStatus
  45. case ignore
  46. var expectingStatus: Bool {
  47. switch self {
  48. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  49. return true
  50. case .ignore:
  51. return false
  52. }
  53. }
  54. }
  55. private enum OutboundState {
  56. case expectingHead
  57. case expectingMessageOrEnd
  58. case ignore
  59. }
  60. private var inboundState: InboundState = .expectingHeadersOrStatus
  61. private var outboundState: OutboundState = .expectingHead
  62. /// Creates a new `GRPCClientChannelHandler`.
  63. ///
  64. /// - Parameters:
  65. /// - initialMetadataPromise: a promise to succeed on receiving the initial metadata from the service.
  66. /// - statusPromise: a promise to succeed with the outcome of the call.
  67. /// - responseObserver: an observer for response messages from the server; for unary responses this should
  68. /// be the `succeedPromise` case.
  69. public init(
  70. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  71. statusPromise: EventLoopPromise<GRPCStatus>,
  72. responseObserver: ResponseObserver<ResponseMessage>,
  73. errorDelegate: ClientErrorDelegate?
  74. ) {
  75. self.initialMetadataPromise = initialMetadataPromise
  76. self.statusPromise = statusPromise
  77. self.responseObserver = responseObserver
  78. self.errorDelegate = errorDelegate
  79. }
  80. /// Observe the given status.
  81. ///
  82. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  83. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  84. /// are failed with the given status.
  85. ///
  86. /// - Parameter status: the status to observe.
  87. internal func observeStatus(_ status: GRPCStatus) {
  88. if status.code != .ok {
  89. self.initialMetadataPromise.fail(status)
  90. self.responsePromise?.fail(status)
  91. }
  92. self.statusPromise.succeed(status)
  93. }
  94. /// Observe the given error.
  95. ///
  96. /// If an `errorDelegate` has been set, the delegate's `didCatchError(error:file:line:)` method is
  97. /// called with the wrapped error and its source. Any unfulfilled promises are also resolved with
  98. /// the given error (see `observeStatus(_:)`).
  99. ///
  100. /// - Parameter error: the error to observe.
  101. internal func observeError(_ error: GRPCError) {
  102. self.errorDelegate?.didCatchError(error.wrappedError, file: error.file, line: error.line)
  103. self.observeStatus(error.asGRPCStatus())
  104. }
  105. }
  106. extension GRPCClientChannelHandler: ChannelInboundHandler {
  107. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  108. /// Reads inbound data.
  109. ///
  110. /// On receipt of:
  111. /// - headers: the initial metadata promise is succeeded.
  112. /// - message: the message observer is called with the message; for unary responses a response
  113. /// promise is succeeded, otherwise a callback is called.
  114. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  115. /// and response promise (if available) are failed with the status. The channel is then closed.
  116. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  117. guard self.inboundState != .ignore else { return }
  118. switch unwrapInboundIn(data) {
  119. case .headers(let headers):
  120. guard self.inboundState == .expectingHeadersOrStatus else {
  121. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)")))
  122. return
  123. }
  124. self.initialMetadataPromise.succeed(headers)
  125. self.inboundState = .expectingMessageOrStatus
  126. case .message(let message):
  127. guard self.inboundState == .expectingMessageOrStatus else {
  128. self.errorCaught(context: context, error: GRPCError.client(.responseCardinalityViolation))
  129. return
  130. }
  131. self.responseObserver.observe(message)
  132. self.inboundState = self.responseObserver.expectsMultipleResponses ? .expectingMessageOrStatus : .expectingStatus
  133. case .status(let status):
  134. guard self.inboundState.expectingStatus else {
  135. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)")))
  136. return
  137. }
  138. self.observeStatus(status)
  139. // We don't expect any more requests/responses beyond this point and we don't need to close
  140. // the channel since NIO's HTTP/2 channel handlers will deal with this for us.
  141. }
  142. }
  143. public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
  144. if let clientUserEvent = event as? GRPCClientUserEvent {
  145. switch clientUserEvent {
  146. case .cancelled:
  147. // We shouldn't observe an error since this event is triggered by the user: just observe the
  148. // status.
  149. self.observeStatus(GRPCError.client(.cancelledByClient).asGRPCStatus())
  150. self.close(context: context, mode: .all, promise: nil)
  151. case .timedOut(let timeout):
  152. self.observeError(GRPCError.client(.deadlineExceeded(timeout)))
  153. self.close(context: context, mode: .all, promise: nil)
  154. }
  155. }
  156. }
  157. }
  158. extension GRPCClientChannelHandler: ChannelOutboundHandler {
  159. public typealias OutboundIn = GRPCClientRequestPart<RequestMessage>
  160. public typealias OutboundOut = GRPCClientRequestPart<RequestMessage>
  161. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  162. guard self.outboundState != .ignore else { return }
  163. switch self.unwrapOutboundIn(data) {
  164. case .head:
  165. guard self.outboundState == .expectingHead else {
  166. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.outboundState)")))
  167. return
  168. }
  169. context.write(data, promise: promise)
  170. self.outboundState = .expectingMessageOrEnd
  171. default:
  172. guard self.outboundState == .expectingMessageOrEnd else {
  173. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received message or end while in state \(self.outboundState)")))
  174. return
  175. }
  176. context.write(data, promise: promise)
  177. }
  178. }
  179. }
  180. extension GRPCClientChannelHandler {
  181. /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore.
  182. public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
  183. context.close(mode: mode, promise: promise)
  184. self.inboundState = .ignore
  185. self.outboundState = .ignore
  186. }
  187. /// Observe an error from the pipeline and close the channel.
  188. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  189. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  190. context.close(mode: .all, promise: nil)
  191. }
  192. }
  193. /// Client user events.
  194. public enum GRPCClientUserEvent {
  195. /// The call has been cancelled.
  196. case cancelled
  197. /// The call did not complete before the deadline was exceeded.
  198. case timedOut(GRPCTimeout)
  199. }