2
0

GRPCClientChannelHandler.swift 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import SwiftProtobuf
  20. /// The final client-side channel handler.
  21. ///
  22. /// This handler holds promises for the initial metadata and the status, as well as an observer
  23. /// for responses. For unary and client-streaming calls the observer will succeed a response
  24. /// promise. For server-streaming and bidirectional-streaming the observer will call the supplied
  25. /// callback with each response received.
  26. ///
  27. /// Errors are also handled by the channel handler. Promises for the initial metadata and
  28. /// response (if applicable) are failed with first error received. The status promise is __succeeded__
  29. /// with the error as a `GRPCStatus`. The stream is also closed and any inbound or outbound messages
  30. /// are ignored.
  31. internal class GRPCClientChannelHandler<RequestMessage: Message, ResponseMessage: Message> {
  32. internal let initialMetadataPromise: EventLoopPromise<HTTPHeaders>
  33. internal let statusPromise: EventLoopPromise<GRPCStatus>
  34. internal let responseObserver: ResponseObserver<ResponseMessage>
  35. /// A promise for a unary response.
  36. internal var responsePromise: EventLoopPromise<ResponseMessage>? {
  37. guard case .succeedPromise(let promise) = responseObserver else { return nil }
  38. return promise
  39. }
  40. /// Promise that the `HTTPRequestHead` has been sent to the network.
  41. ///
  42. /// If we attempt to close the stream before this has been fulfilled then the program will fatal
  43. /// error because of an issue with nghttp2/swift-nio-http2.
  44. ///
  45. /// Since we need this promise to succeed before we can close the channel, `BaseClientCall` sends
  46. /// the request head in `init` which will in turn initialize this promise in `write(ctx:data:promise:)`.
  47. /// This means that this promise should never be nil in practice.
  48. ///
  49. /// See: https://github.com/apple/swift-nio-http2/issues/39.
  50. private var requestHeadSentPromise: EventLoopPromise<Void>!
  51. private enum InboundState {
  52. case expectingHeadersOrStatus
  53. case expectingMessageOrStatus
  54. case expectingStatus
  55. case ignore
  56. var expectingStatus: Bool {
  57. switch self {
  58. case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus:
  59. return true
  60. case .ignore:
  61. return false
  62. }
  63. }
  64. }
  65. private enum OutboundState {
  66. case expectingHead
  67. case expectingMessageOrEnd
  68. case ignore
  69. }
  70. private var inboundState: InboundState = .expectingHeadersOrStatus
  71. private var outboundState: OutboundState = .expectingHead
  72. /// Creates a new `GRPCClientChannelHandler`.
  73. ///
  74. /// - Parameters:
  75. /// - initialMetadataPromise: a promise to succeed on receiving the initial metadata from the service.
  76. /// - statusPromise: a promise to succeed with the outcome of the call.
  77. /// - responseObserver: an observer for response messages from the server; for unary responses this should
  78. /// be the `succeedPromise` case.
  79. public init(
  80. initialMetadataPromise: EventLoopPromise<HTTPHeaders>,
  81. statusPromise: EventLoopPromise<GRPCStatus>,
  82. responseObserver: ResponseObserver<ResponseMessage>
  83. ) {
  84. self.initialMetadataPromise = initialMetadataPromise
  85. self.statusPromise = statusPromise
  86. self.responseObserver = responseObserver
  87. }
  88. /// Observe the given status.
  89. ///
  90. /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` being an
  91. /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable)
  92. /// are failed with the given status.
  93. ///
  94. /// - Parameter status: the status to observe.
  95. internal func observeStatus(_ status: GRPCStatus) {
  96. if status.code != .ok {
  97. self.initialMetadataPromise.fail(error: status)
  98. self.responsePromise?.fail(error: status)
  99. }
  100. self.statusPromise.succeed(result: status)
  101. }
  102. /// Observe the given error.
  103. ///
  104. /// Calls `observeStatus(status:)`. with `error.asGRPCStatus()`.
  105. ///
  106. /// - Parameter error: the error to observe.
  107. internal func observeError(_ error: GRPCError) {
  108. self.observeStatus(error.asGRPCStatus())
  109. }
  110. }
  111. extension GRPCClientChannelHandler: ChannelInboundHandler {
  112. public typealias InboundIn = GRPCClientResponsePart<ResponseMessage>
  113. /// Reads inbound data.
  114. ///
  115. /// On receipt of:
  116. /// - headers: the initial metadata promise is succeeded.
  117. /// - message: the message observer is called with the message; for unary responses a response
  118. /// promise is succeeded, otherwise a callback is called.
  119. /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata
  120. /// and response promise (if available) are failed with the status. The channel is then closed.
  121. public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
  122. guard self.inboundState != .ignore else { return }
  123. switch unwrapInboundIn(data) {
  124. case .headers(let headers):
  125. guard self.inboundState == .expectingHeadersOrStatus else {
  126. self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)")))
  127. return
  128. }
  129. self.initialMetadataPromise.succeed(result: headers)
  130. self.inboundState = .expectingMessageOrStatus
  131. case .message(let message):
  132. guard self.inboundState == .expectingMessageOrStatus else {
  133. self.errorCaught(ctx: ctx, error: GRPCError.client(.responseCardinalityViolation))
  134. return
  135. }
  136. self.responseObserver.observe(message)
  137. self.inboundState = self.responseObserver.expectsMultipleResponses ? .expectingMessageOrStatus : .expectingStatus
  138. case .status(let status):
  139. guard self.inboundState.expectingStatus else {
  140. self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)")))
  141. return
  142. }
  143. self.observeStatus(status)
  144. // We don't expect any more requests/responses beyond this point.
  145. self.close(ctx: ctx, mode: .all, promise: nil)
  146. }
  147. }
  148. }
  149. extension GRPCClientChannelHandler: ChannelOutboundHandler {
  150. public typealias OutboundIn = GRPCClientRequestPart<RequestMessage>
  151. public typealias OutboundOut = GRPCClientRequestPart<RequestMessage>
  152. public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  153. guard self.outboundState != .ignore else { return }
  154. switch self.unwrapOutboundIn(data) {
  155. case .head:
  156. guard self.outboundState == .expectingHead else {
  157. self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received headers while in state \(self.outboundState)")))
  158. return
  159. }
  160. // See the documentation for `requestHeadSentPromise` for an explanation of this.
  161. self.requestHeadSentPromise = promise ?? ctx.eventLoop.newPromise()
  162. ctx.write(data, promise: self.requestHeadSentPromise)
  163. self.outboundState = .expectingMessageOrEnd
  164. default:
  165. guard self.outboundState == .expectingMessageOrEnd else {
  166. self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received message or end while in state \(self.outboundState)")))
  167. return
  168. }
  169. ctx.write(data, promise: promise)
  170. }
  171. }
  172. }
  173. extension GRPCClientChannelHandler {
  174. /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore.
  175. public func close(ctx: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
  176. self.observeError(GRPCError.client(.cancelledByClient))
  177. requestHeadSentPromise.futureResult.whenComplete {
  178. ctx.close(mode: mode, promise: promise)
  179. }
  180. self.inboundState = .ignore
  181. self.outboundState = .ignore
  182. }
  183. /// Observe an error from the pipeline and close the channel.
  184. public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
  185. //! TODO: Add an error handling delegate, similar to in the server.
  186. self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client))
  187. ctx.close(mode: .all, promise: nil)
  188. }
  189. }