2
0

GRPCClientChannelHandler.swift 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import SwiftProtobuf
  20. /// The final client-side channel handler.
  21. ///
  22. /// This handler holds promises for the initial metadata and the status, as well as an observer
  23. /// for responses. For unary and client-streaming calls the observer will succeed a response
  24. /// promise. For server-streaming and bidirectional-streaming the observer will call the supplied
  25. /// callback with each response received.
  26. ///
  27. /// Errors are also handled by the channel handler. Promises for the initial metadata and
  28. /// response (if applicable) are failed with first error received. The status promise is __succeeded__
  29. /// with the error as the result of `GRPCStatusTransformable.asGRPCStatus()`, if available.
  30. /// The stream is also closed and any inbound or outbound messages are ignored.
  31. internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage: Message> {
  32. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  33. internal let statusPromise: EventLoopPromise<GRPCStatus>
  34. internal let responseObserver: ResponseObserver<ResponseMessage>
  35. /// A promise for a unary response.
  36. internal var responsePromise: EventLoopPromise<ResponseMessage>? {
  37. guard case .succeedPromise(let promise) = responseObserver else { return nil }
  38. return promise
  39. }
  40. private enum InboundState {
  41. case expectingHeadersOrStatus
  42. case expectingMessageOrStatus
  43. case expectingStatus
  44. case ignore
  45. var expectingStatus: Bool {
  46. switch self {
  47. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  48. return true
  49. case .ignore:
  50. return false
  51. }
  52. }
  53. }
  54. private enum OutboundState {
  55. case expectingHead
  56. case expectingMessageOrEnd
  57. case ignore
  58. }
  59. private var inboundState: InboundState = .expectingHeadersOrStatus
  60. private var outboundState: OutboundState = .expectingHead
  61. /// Creates a new `GRPCClientChannelHandler`.
  62. ///
  63. /// - Parameters:
  64. /// - initialMetadataPromise: a promise to succeed on receiving the initial metadata from the service.
  65. /// - statusPromise: a promise to succeed with the outcome of the call.
  66. /// - responseObserver: an observer for response messages from the server; for unary responses this should
  67. /// be the `succeedPromise` case.
  68. public init(
  69. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  70. statusPromise: EventLoopPromise<GRPCStatus>,
  71. responseObserver: ResponseObserver<ResponseMessage>
  72. ) {
  73. self.initialMetadataPromise = initialMetadataPromise
  74. self.statusPromise = statusPromise
  75. self.responseObserver = responseObserver
  76. }
  77. /// Observe the given status.
  78. ///
  79. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  80. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  81. /// are failed with the given status.
  82. ///
  83. /// - Parameter status: the status to observe.
  84. internal func observeStatus(_ status: GRPCStatus) {
  85. if status.code != .ok {
  86. self.initialMetadataPromise.fail(status)
  87. self.responsePromise?.fail(status)
  88. }
  89. self.statusPromise.succeed(status)
  90. }
  91. /// Observe the given error.
  92. ///
  93. /// If the error conforms to `GRPCStatusTransformable` then `observeStatus(status:)` is called
  94. /// with the transformed error, otherwise `GRPCStatus.processingError` is used.
  95. ///
  96. /// - Parameter error: the error to observe.
  97. internal func observeError(_ error: Error) {
  98. if let transformable = error as? GRPCStatusTransformable {
  99. self.observeStatus(transformable.asGRPCStatus())
  100. } else {
  101. self.observeStatus(.processingError)
  102. }
  103. }
  104. }
  105. extension GRPCClientChannelHandler: ChannelInboundHandler {
  106. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  107. /// Reads inbound data.
  108. ///
  109. /// On receipt of:
  110. /// - headers: the initial metadata promise is succeeded.
  111. /// - message: the message observer is called with the message; for unary responses a response
  112. /// promise is succeeded, otherwise a callback is called.
  113. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  114. /// and response promise (if available) are failed with the status. The channel is then closed.
  115. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  116. guard self.inboundState != .ignore else { return }
  117. switch unwrapInboundIn(data) {
  118. case .headers(let headers):
  119. guard self.inboundState == .expectingHeadersOrStatus else {
  120. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)")))
  121. return
  122. }
  123. self.initialMetadataPromise.succeed(headers)
  124. self.inboundState = .expectingMessageOrStatus
  125. case .message(let message):
  126. guard self.inboundState == .expectingMessageOrStatus else {
  127. self.errorCaught(context: context, error: GRPCError.client(.responseCardinalityViolation))
  128. return
  129. }
  130. self.responseObserver.observe(message)
  131. self.inboundState = self.responseObserver.expectsMultipleResponses ? .expectingMessageOrStatus : .expectingStatus
  132. case .status(let status):
  133. guard self.inboundState.expectingStatus else {
  134. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)")))
  135. return
  136. }
  137. self.observeStatus(status)
  138. // We don't expect any more requests/responses beyond this point.
  139. self.close(context: context, mode: .all, promise: nil)
  140. }
  141. }
  142. }
  143. extension GRPCClientChannelHandler: ChannelOutboundHandler {
  144. public typealias OutboundIn = GRPCClientRequestPart<RequestMessage>
  145. public typealias OutboundOut = GRPCClientRequestPart<RequestMessage>
  146. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  147. guard self.outboundState != .ignore else { return }
  148. switch self.unwrapOutboundIn(data) {
  149. case .head:
  150. guard self.outboundState == .expectingHead else {
  151. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.outboundState)")))
  152. return
  153. }
  154. context.write(data, promise: promise)
  155. self.outboundState = .expectingMessageOrEnd
  156. default:
  157. guard self.outboundState == .expectingMessageOrEnd else {
  158. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received message or end while in state \(self.outboundState)")))
  159. return
  160. }
  161. context.write(data, promise: promise)
  162. }
  163. }
  164. }
  165. extension GRPCClientChannelHandler {
  166. /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore.
  167. public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
  168. self.observeError(GRPCError.client(.cancelledByClient))
  169. context.close(mode: mode, promise: promise)
  170. self.inboundState = .ignore
  171. self.outboundState = .ignore
  172. }
  173. /// Observe an error from the pipeline and close the channel.
  174. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  175. //! TODO: Add an error handling delegate, similar to in the server.
  176. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  177. context.close(mode: .all, promise: nil)
  178. }
  179. }