GRPCClientChannelHandler.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP1
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. // TODO: rename GRPCClientRequestPart2 to GRPCClientRequestPart once this is in-place.
  23. /// A gRPC client request message part.
  24. public enum GRPCClientRequestPart2<Request: Message> {
  25. /// The 'head' of the request, that is, information about the initiation of the RPC.
  26. case head(GRPCRequestHead)
  27. /// A deserialized request message to send to the server.
  28. case request(Request)
  29. /// Indicates that the client does not intend to send any further messages.
  30. case end
  31. }
  32. public struct GRPCRequestHead {
  33. public var method: String
  34. public var scheme: String
  35. public var path: String
  36. public var host: String
  37. public var timeout: GRPCTimeout
  38. public var customMetadata: HPACKHeaders
  39. }
  40. // TODO: rename GRPCClientResponsePart2 to GRPCClientResponsePart once this is in-place.
  41. /// A gRPC client response message part.
  42. public enum GRPCClientResponsePart2<Response: Message> {
  43. /// Metadata received as the server acknowledges the RPC.
  44. case initialMetadata(HPACKHeaders)
  45. /// A deserialized response message received from the server.
  46. case response(Response)
  47. /// The metadata received at the end of the RPC.
  48. case trailingMetadata(HPACKHeaders)
  49. /// The final status of the RPC.
  50. case status(GRPCStatus)
  51. }
  52. /// The type of gRPC call.
  53. public enum GRPCCallType {
  54. /// Unary: a single request and a single response.
  55. case unary
  56. /// Client streaming: many requests and a single response.
  57. case clientStreaming
  58. /// Server streaming: a single request and many responses.
  59. case serverStreaming
  60. /// Bidirectional streaming: many request and many responses.
  61. case bidirectionalStreaming
  62. }
  63. // MARK: - GRPCClientChannelHandler
  64. /// A channel handler for gRPC clients which translates HTTP/2 frames into gRPC messages.
  65. ///
  66. /// This channel handler should typically be used in conjunction with another handler which
  67. /// reads the parsed `GRPCClientResponsePart2<Response>` messages and surfaces them to the caller
  68. /// in some fashion. Note that for unary and client streaming RPCs this handler will only emit at
  69. /// most one response message.
  70. ///
  71. /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request
  72. /// and response streams, which share a single HTTP/2 stream for transport.
  73. ///
  74. /// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2:
  75. ///
  76. /// ```
  77. /// let multiplexer: HTTP2StreamMultiplexer = // ...
  78. /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
  79. /// let clientChannelHandler = GRPCClientChannelHandler<Request, Response>(
  80. /// streamID: streamID,
  81. /// callType: callType,
  82. /// logger: logger
  83. /// )
  84. /// return channel.pipeline.addHandler(clientChannelHandler)
  85. /// }
  86. /// ```
  87. public final class GRPCClientChannelHandler<Request: Message, Response: Message> {
  88. private let logger: Logger
  89. private let streamID: HTTP2StreamID
  90. private var stateMachine: GRPCClientStateMachine<Request, Response>
  91. /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
  92. ///
  93. /// - Parameters:
  94. /// - streamID: The ID of the HTTP/2 stream that this handler will read and write HTTP/2
  95. /// frames on.
  96. /// - callType: Type of RPC call being made.
  97. /// - logger: Logger.
  98. public init(streamID: HTTP2StreamID, callType: GRPCCallType, logger: Logger) {
  99. self.streamID = streamID
  100. self.logger = logger
  101. switch callType {
  102. case .unary:
  103. self.stateMachine = .init(requestArity: .one, responseArity: .one, logger: logger)
  104. case .clientStreaming:
  105. self.stateMachine = .init(requestArity: .many, responseArity: .one, logger: logger)
  106. case .serverStreaming:
  107. self.stateMachine = .init(requestArity: .one, responseArity: .many, logger: logger)
  108. case .bidirectionalStreaming:
  109. self.stateMachine = .init(requestArity: .many, responseArity: .many, logger: logger)
  110. }
  111. }
  112. }
  113. // MARK: - GRPCClientChannelHandler: Inbound
  114. extension GRPCClientChannelHandler: ChannelInboundHandler {
  115. public typealias InboundIn = HTTP2Frame
  116. public typealias InboundOut = GRPCClientResponsePart2<Response>
  117. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  118. let frame = self.unwrapInboundIn(data)
  119. switch frame.payload {
  120. case .headers(let content):
  121. self.readHeaders(content: content, context: context)
  122. case .data(let content):
  123. self.readData(content: content, context: context)
  124. // We don't need to handle other frame type, just drop them instead.
  125. default:
  126. // TODO: synthesise a more precise `GRPCStatus` from RST_STREAM frames in accordance
  127. // with: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
  128. break
  129. }
  130. }
  131. /// Read the content from an HTTP/2 HEADERS frame received from the server.
  132. ///
  133. /// We can receive headers in two cases:
  134. /// - when the RPC is being acknowledged, and
  135. /// - when the RPC is being terminated.
  136. ///
  137. /// It is also possible for the RPC to be acknowledged and terminated at the same time, the
  138. /// specification refers to this as a "Trailers-Only" response.
  139. ///
  140. /// - Parameter content: Content of the headers frame.
  141. /// - Parameter context: Channel handler context.
  142. private func readHeaders(content: HTTP2Frame.FramePayload.Headers, context: ChannelHandlerContext) {
  143. // In the case of a "Trailers-Only" response there's no guarantee that end-of-stream will be set
  144. // on the headers frame: end stream may be sent on an empty data frame as well. If the headers
  145. // contain a gRPC status code then they must be for a "Trailers-Only" response.
  146. if content.endStream || content.headers.contains(name: GRPCHeaderName.statusCode) {
  147. // We have the headers, pass them to the next handler:
  148. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(content.headers)))
  149. // Are they valid headers?
  150. let result = self.stateMachine.receiveEndOfResponseStream(content.headers).mapError { error -> GRPCError in
  151. // The headers aren't valid so let's figure out a reasonable error to forward:
  152. switch error {
  153. case .invalidContentType:
  154. return .client(.invalidContentType)
  155. case .invalidHTTPStatus(let status):
  156. return .client(.invalidHTTPStatus(status))
  157. case .invalidHTTPStatusWithGRPCStatus(let status):
  158. return .client(.invalidHTTPStatusWithGRPCStatus(status))
  159. case .invalidState:
  160. return .client(.invalidState("invalid state parsing end-of-stream trailers"))
  161. }
  162. }
  163. // Okay, what should we tell the next handler?
  164. switch result {
  165. case .success(let status):
  166. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  167. case .failure(let error):
  168. context.fireErrorCaught(error)
  169. }
  170. } else {
  171. // "Normal" response headers, but are they valid?
  172. let result = self.stateMachine.receiveResponseHeaders(content.headers).mapError { error -> GRPCError in
  173. // The headers aren't valid so let's figure out a reasonable error to forward:
  174. switch error {
  175. case .invalidContentType:
  176. return .client(.invalidContentType)
  177. case .invalidHTTPStatus(let status):
  178. return .client(.invalidHTTPStatus(status))
  179. case .unsupportedMessageEncoding(let encoding):
  180. return .client(.unsupportedCompressionMechanism(encoding))
  181. case .invalidState:
  182. return .client(.invalidState("invalid state parsing headers"))
  183. }
  184. }
  185. // Okay, what should we tell the next handler?
  186. switch result {
  187. case .success:
  188. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(content.headers)))
  189. case .failure(let error):
  190. context.fireErrorCaught(error)
  191. }
  192. }
  193. }
  194. /// Reads the content from an HTTP/2 DATA frame received from the server and buffers the bytes
  195. /// necessary to deserialize a message (or messages).
  196. ///
  197. /// - Parameter content: Content of the data frame.
  198. /// - Parameter context: Channel handler context.
  199. private func readData(content: HTTP2Frame.FramePayload.Data, context: ChannelHandlerContext) {
  200. // Note: this is replicated from NIO's HTTP2ToHTTP1ClientCodec.
  201. guard case .byteBuffer(var buffer) = content.data else {
  202. preconditionFailure("Received DATA frame with non-ByteBuffer IOData")
  203. }
  204. // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
  205. // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
  206. // containing the gRPC status code) and an additional empty data frame is sent with the
  207. // end-of-stream flag set.
  208. guard buffer.readableBytes > 0 else {
  209. return
  210. }
  211. // Feed the buffer into the state machine.
  212. let result = self.stateMachine.receiveResponseBuffer(&buffer).mapError { error -> GRPCError in
  213. switch error {
  214. case .cardinalityViolation:
  215. return .client(.responseCardinalityViolation)
  216. case .deserializationFailed, .leftOverBytes:
  217. return .client(.responseProtoDeserializationFailure)
  218. case .invalidState:
  219. return .client(.invalidState("invalid state when parsing data as a response message"))
  220. }
  221. }
  222. // Did we get any messages?
  223. switch result {
  224. case .success(let messages):
  225. // Awesome: we got some messages. The state machine guarantees we only get at most a single
  226. // message for unary and client-streaming RPCs.
  227. for message in messages {
  228. context.fireChannelRead(self.wrapInboundOut(.response(message)))
  229. }
  230. case .failure(let error):
  231. context.fireErrorCaught(error)
  232. }
  233. }
  234. }
  235. // MARK: - GRPCClientChannelHandler: Outbound
  236. extension GRPCClientChannelHandler: ChannelOutboundHandler {
  237. public typealias OutboundIn = GRPCClientRequestPart2<Request>
  238. public typealias OutboundOut = HTTP2Frame
  239. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  240. switch self.unwrapOutboundIn(data) {
  241. case .head(let requestHead):
  242. // Feed the request into the state machine:
  243. switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
  244. case .success(let headers):
  245. // We're clear to write some headers. Create an appropriate frame and write it.
  246. let frame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers)))
  247. context.write(self.wrapOutboundOut(frame), promise: promise)
  248. case .failure(let sendRequestHeadersError):
  249. switch sendRequestHeadersError {
  250. case .invalidState:
  251. // This is bad: we need to trigger an error and close the channel.
  252. promise?.fail(sendRequestHeadersError)
  253. context.fireErrorCaught(GRPCError.client(.invalidState("unable to initiate RPC")))
  254. }
  255. }
  256. case .request(let request):
  257. // Feed the request message into the state machine:
  258. let result = self.stateMachine.sendRequest(request, allocator: context.channel.allocator)
  259. switch result {
  260. case .success(let buffer):
  261. // We're clear to send a message; wrap it up in an HTTP/2 frame.
  262. let frame = HTTP2Frame(
  263. streamID: self.streamID,
  264. payload: .data(.init(data: .byteBuffer(buffer)))
  265. )
  266. context.write(self.wrapOutboundOut(frame), promise: promise)
  267. case .failure(let writeError):
  268. switch writeError {
  269. case .cardinalityViolation:
  270. // This is fine: we can ignore the request. The RPC can continue as if nothing went wrong.
  271. promise?.fail(writeError)
  272. case .serializationFailed:
  273. // This is bad: we need to trigger an error and close the channel.
  274. promise?.fail(writeError)
  275. context.fireErrorCaught(GRPCError.client(.requestProtoSerializationFailure))
  276. case .invalidState:
  277. promise?.fail(writeError)
  278. context.fireErrorCaught(GRPCError.client(.invalidState("unable to write message")))
  279. }
  280. }
  281. case .end:
  282. // Okay: can we close the request stream?
  283. switch self.stateMachine.sendEndOfRequestStream() {
  284. case .success:
  285. // We can. Send an empty DATA frame with end-stream set.
  286. let empty = context.channel.allocator.buffer(capacity: 0)
  287. let frame = HTTP2Frame(
  288. streamID: self.streamID,
  289. payload: .data(.init(data: .byteBuffer(empty), endStream: true))
  290. )
  291. context.write(self.wrapOutboundOut(frame), promise: promise)
  292. case .failure(let error):
  293. // Why can't we close the request stream?
  294. switch error {
  295. case .alreadyClosed:
  296. // This is fine: we can just ignore it. The RPC can continue as if nothing went wrong.
  297. promise?.fail(error)
  298. case .invalidState:
  299. // This is bad: we need to trigger an error and close the channel.
  300. promise?.fail(error)
  301. context.fireErrorCaught(GRPCError.client(.invalidState("unable to close request stream")))
  302. }
  303. }
  304. }
  305. }
  306. public func triggerUserOutboundEvent(
  307. context: ChannelHandlerContext,
  308. event: Any,
  309. promise: EventLoopPromise<Void>?
  310. ) {
  311. if let userEvent = event as? GRPCClientUserEvent {
  312. switch userEvent {
  313. case .cancelled:
  314. context.fireErrorCaught(GRPCClientError.cancelledByClient)
  315. context.close(mode: .all, promise: promise)
  316. }
  317. } else {
  318. context.triggerUserOutboundEvent(event, promise: promise)
  319. }
  320. }
  321. }