2
0

GRPCClientChannelHandler.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import SwiftProtobuf
  20. /// The final client-side channel handler.
  21. ///
  22. /// This handler holds promises for the initial metadata and the status, as well as an observer
  23. /// for responses. For unary and client-streaming calls the observer will succeed a response
  24. /// promise. For server-streaming and bidirectional-streaming the observer will call the supplied
  25. /// callback with each response received.
  26. ///
  27. /// Errors are also handled by the channel handler. Promises for the initial metadata and
  28. /// response (if applicable) are failed with first error received. The status promise is __succeeded__
  29. /// with the error as the result of `GRPCStatusTransformable.asGRPCStatus()`, if available.
  30. /// The stream is also closed and any inbound or outbound messages are ignored.
  31. internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage: Message> {
  32. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  33. internal let statusPromise: EventLoopPromise<GRPCStatus>
  34. internal let responseObserver: ResponseObserver<ResponseMessage>
  35. /// A promise for a unary response.
  36. internal var responsePromise: EventLoopPromise<ResponseMessage>? {
  37. guard case .succeedPromise(let promise) = responseObserver else { return nil }
  38. return promise
  39. }
  40. private enum InboundState {
  41. case expectingHeadersOrStatus
  42. case expectingMessageOrStatus
  43. case expectingStatus
  44. case ignore
  45. var expectingStatus: Bool {
  46. switch self {
  47. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  48. return true
  49. case .ignore:
  50. return false
  51. }
  52. }
  53. }
  54. private enum OutboundState {
  55. case expectingHead
  56. case expectingMessageOrEnd
  57. case ignore
  58. }
  59. private var inboundState: InboundState = .expectingHeadersOrStatus
  60. private var outboundState: OutboundState = .expectingHead
  61. /// Creates a new `GRPCClientChannelHandler`.
  62. ///
  63. /// - Parameters:
  64. /// - initialMetadataPromise: a promise to succeed on receiving the initial metadata from the service.
  65. /// - statusPromise: a promise to succeed with the outcome of the call.
  66. /// - responseObserver: an observer for response messages from the server; for unary responses this should
  67. /// be the `succeedPromise` case.
  68. public init(
  69. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  70. statusPromise: EventLoopPromise<GRPCStatus>,
  71. responseObserver: ResponseObserver<ResponseMessage>
  72. ) {
  73. self.initialMetadataPromise = initialMetadataPromise
  74. self.statusPromise = statusPromise
  75. self.responseObserver = responseObserver
  76. }
  77. /// Observe the given status.
  78. ///
  79. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` conforming to
  80. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  81. /// are failed with the given status.
  82. ///
  83. /// - Parameter status: the status to observe.
  84. internal func observeStatus(_ status: GRPCStatus) {
  85. if status.code != .ok {
  86. self.initialMetadataPromise.fail(status)
  87. self.responsePromise?.fail(status)
  88. }
  89. self.statusPromise.succeed(status)
  90. }
  91. /// Observe the given error.
  92. ///
  93. /// Calls `observeStatus(status:)`. with `error.asGRPCStatus()`.
  94. ///
  95. /// - Parameter error: the error to observe.
  96. internal func observeError(_ error: GRPCError) {
  97. self.observeStatus(error.asGRPCStatus())
  98. }
  99. }
  100. extension GRPCClientChannelHandler: ChannelInboundHandler {
  101. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  102. /// Reads inbound data.
  103. ///
  104. /// On receipt of:
  105. /// - headers: the initial metadata promise is succeeded.
  106. /// - message: the message observer is called with the message; for unary responses a response
  107. /// promise is succeeded, otherwise a callback is called.
  108. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  109. /// and response promise (if available) are failed with the status. The channel is then closed.
  110. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  111. guard self.inboundState != .ignore else { return }
  112. switch unwrapInboundIn(data) {
  113. case .headers(let headers):
  114. guard self.inboundState == .expectingHeadersOrStatus else {
  115. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)")))
  116. return
  117. }
  118. self.initialMetadataPromise.succeed(headers)
  119. self.inboundState = .expectingMessageOrStatus
  120. case .message(let message):
  121. guard self.inboundState == .expectingMessageOrStatus else {
  122. self.errorCaught(context: context, error: GRPCError.client(.responseCardinalityViolation))
  123. return
  124. }
  125. self.responseObserver.observe(message)
  126. self.inboundState = self.responseObserver.expectsMultipleResponses ? .expectingMessageOrStatus : .expectingStatus
  127. case .status(let status):
  128. guard self.inboundState.expectingStatus else {
  129. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)")))
  130. return
  131. }
  132. self.observeStatus(status)
  133. // We don't expect any more requests/responses beyond this point.
  134. self.close(context: context, mode: .all, promise: nil)
  135. }
  136. }
  137. }
  138. extension GRPCClientChannelHandler: ChannelOutboundHandler {
  139. public typealias OutboundIn = GRPCClientRequestPart<RequestMessage>
  140. public typealias OutboundOut = GRPCClientRequestPart<RequestMessage>
  141. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  142. guard self.outboundState != .ignore else { return }
  143. switch self.unwrapOutboundIn(data) {
  144. case .head:
  145. guard self.outboundState == .expectingHead else {
  146. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received headers while in state \(self.outboundState)")))
  147. return
  148. }
  149. context.write(data, promise: promise)
  150. self.outboundState = .expectingMessageOrEnd
  151. default:
  152. guard self.outboundState == .expectingMessageOrEnd else {
  153. self.errorCaught(context: context, error: GRPCError.client(.invalidState("received message or end while in state \(self.outboundState)")))
  154. return
  155. }
  156. context.write(data, promise: promise)
  157. }
  158. }
  159. }
  160. extension GRPCClientChannelHandler {
  161. /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore.
  162. public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
  163. self.observeError(GRPCError.client(.cancelledByClient))
  164. context.close(mode: mode, promise: promise)
  165. self.inboundState = .ignore
  166. self.outboundState = .ignore
  167. }
  168. /// Observe an error from the pipeline and close the channel.
  169. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  170. //! TODO: Add an error handling delegate, similar to in the server.
  171. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  172. context.close(mode: .all, promise: nil)
  173. }
  174. }