2
0

GRPCClientChannelHandler.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP1
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. /// A gRPC client request message part.
  23. public enum GRPCClientRequestPart<Request: Message> {
  24. /// The 'head' of the request, that is, information about the initiation of the RPC.
  25. case head(GRPCRequestHead)
  26. /// A deserialized request message to send to the server.
  27. case message(_Box<Request>)
  28. /// Indicates that the client does not intend to send any further messages.
  29. case end
  30. }
  31. public struct GRPCRequestHead {
  32. private final class _Storage {
  33. public var method: String
  34. public var scheme: String
  35. public var path: String
  36. public var host: String
  37. public var timeout: GRPCTimeout
  38. init(
  39. method: String,
  40. scheme: String,
  41. path: String,
  42. host: String,
  43. timeout: GRPCTimeout
  44. ) {
  45. self.method = method
  46. self.scheme = scheme
  47. self.path = path
  48. self.host = host
  49. self.timeout = timeout
  50. }
  51. func copy() -> _Storage {
  52. return .init(
  53. method: self.method,
  54. scheme: self.scheme,
  55. path: self.path,
  56. host: self.host,
  57. timeout: self.timeout
  58. )
  59. }
  60. }
  61. private var _storage: _Storage
  62. // Don't put this in storage: it would CoW for every mutation.
  63. public var customMetadata: HPACKHeaders
  64. public var method: String {
  65. get {
  66. return self._storage.method
  67. }
  68. set {
  69. if !isKnownUniquelyReferenced(&self._storage) {
  70. self._storage = self._storage.copy()
  71. }
  72. self._storage.method = newValue
  73. }
  74. }
  75. public var scheme: String {
  76. get {
  77. return self._storage.scheme
  78. }
  79. set {
  80. if !isKnownUniquelyReferenced(&self._storage) {
  81. self._storage = self._storage.copy()
  82. }
  83. self._storage.scheme = newValue
  84. }
  85. }
  86. public var path: String {
  87. get {
  88. return self._storage.path
  89. }
  90. set {
  91. if !isKnownUniquelyReferenced(&self._storage) {
  92. self._storage = self._storage.copy()
  93. }
  94. self._storage.path = newValue
  95. }
  96. }
  97. public var host: String {
  98. get {
  99. return self._storage.host
  100. }
  101. set {
  102. if !isKnownUniquelyReferenced(&self._storage) {
  103. self._storage = self._storage.copy()
  104. }
  105. self._storage.host = newValue
  106. }
  107. }
  108. public var timeout: GRPCTimeout {
  109. get {
  110. return self._storage.timeout
  111. }
  112. set {
  113. if !isKnownUniquelyReferenced(&self._storage) {
  114. self._storage = self._storage.copy()
  115. }
  116. self._storage.timeout = newValue
  117. }
  118. }
  119. public init(
  120. method: String,
  121. scheme: String,
  122. path: String,
  123. host: String,
  124. timeout: GRPCTimeout,
  125. customMetadata: HPACKHeaders
  126. ) {
  127. self._storage = .init(
  128. method: method,
  129. scheme: scheme,
  130. path: path,
  131. host: host,
  132. timeout: timeout
  133. )
  134. self.customMetadata = customMetadata
  135. }
  136. }
  137. /// A gRPC client response message part.
  138. public enum GRPCClientResponsePart<Response: Message> {
  139. /// Metadata received as the server acknowledges the RPC.
  140. case initialMetadata(HPACKHeaders)
  141. /// A deserialized response message received from the server.
  142. case message(_Box<Response>)
  143. /// The metadata received at the end of the RPC.
  144. case trailingMetadata(HPACKHeaders)
  145. /// The final status of the RPC.
  146. case status(GRPCStatus)
  147. }
  148. /// The type of gRPC call.
  149. public enum GRPCCallType {
  150. /// Unary: a single request and a single response.
  151. case unary
  152. /// Client streaming: many requests and a single response.
  153. case clientStreaming
  154. /// Server streaming: a single request and many responses.
  155. case serverStreaming
  156. /// Bidirectional streaming: many request and many responses.
  157. case bidirectionalStreaming
  158. }
  159. // MARK: - GRPCClientChannelHandler
  160. /// A channel handler for gRPC clients which translates HTTP/2 frames into gRPC messages.
  161. ///
  162. /// This channel handler should typically be used in conjunction with another handler which
  163. /// reads the parsed `GRPCClientResponsePart<Response>` messages and surfaces them to the caller
  164. /// in some fashion. Note that for unary and client streaming RPCs this handler will only emit at
  165. /// most one response message.
  166. ///
  167. /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request
  168. /// and response streams, which share a single HTTP/2 stream for transport.
  169. ///
  170. /// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2:
  171. ///
  172. /// ```
  173. /// let multiplexer: HTTP2StreamMultiplexer = // ...
  174. /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
  175. /// let clientChannelHandler = GRPCClientChannelHandler<Request, Response>(
  176. /// streamID: streamID,
  177. /// callType: callType,
  178. /// logger: logger
  179. /// )
  180. /// return channel.pipeline.addHandler(clientChannelHandler)
  181. /// }
  182. /// ```
  183. public final class GRPCClientChannelHandler<Request: Message, Response: Message> {
  184. private let logger: Logger
  185. private let streamID: HTTP2StreamID
  186. private var stateMachine: GRPCClientStateMachine<Request, Response>
  187. /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
  188. ///
  189. /// - Parameters:
  190. /// - streamID: The ID of the HTTP/2 stream that this handler will read and write HTTP/2
  191. /// frames on.
  192. /// - callType: Type of RPC call being made.
  193. /// - logger: Logger.
  194. public init(streamID: HTTP2StreamID, callType: GRPCCallType, logger: Logger) {
  195. self.streamID = streamID
  196. self.logger = logger
  197. switch callType {
  198. case .unary:
  199. self.stateMachine = .init(requestArity: .one, responseArity: .one)
  200. case .clientStreaming:
  201. self.stateMachine = .init(requestArity: .many, responseArity: .one)
  202. case .serverStreaming:
  203. self.stateMachine = .init(requestArity: .one, responseArity: .many)
  204. case .bidirectionalStreaming:
  205. self.stateMachine = .init(requestArity: .many, responseArity: .many)
  206. }
  207. }
  208. }
  209. // MARK: - GRPCClientChannelHandler: Inbound
  210. extension GRPCClientChannelHandler: ChannelInboundHandler {
  211. public typealias InboundIn = HTTP2Frame
  212. public typealias InboundOut = GRPCClientResponsePart<Response>
  213. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  214. let frame = self.unwrapInboundIn(data)
  215. switch frame.payload {
  216. case .headers(let content):
  217. self.readHeaders(content: content, context: context)
  218. case .data(let content):
  219. self.readData(content: content, context: context)
  220. // We don't need to handle other frame type, just drop them instead.
  221. default:
  222. // TODO: synthesise a more precise `GRPCStatus` from RST_STREAM frames in accordance
  223. // with: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
  224. break
  225. }
  226. }
  227. /// Read the content from an HTTP/2 HEADERS frame received from the server.
  228. ///
  229. /// We can receive headers in two cases:
  230. /// - when the RPC is being acknowledged, and
  231. /// - when the RPC is being terminated.
  232. ///
  233. /// It is also possible for the RPC to be acknowledged and terminated at the same time, the
  234. /// specification refers to this as a "Trailers-Only" response.
  235. ///
  236. /// - Parameter content: Content of the headers frame.
  237. /// - Parameter context: Channel handler context.
  238. private func readHeaders(content: HTTP2Frame.FramePayload.Headers, context: ChannelHandlerContext) {
  239. // In the case of a "Trailers-Only" response there's no guarantee that end-of-stream will be set
  240. // on the headers frame: end stream may be sent on an empty data frame as well. If the headers
  241. // contain a gRPC status code then they must be for a "Trailers-Only" response.
  242. if content.endStream || content.headers.contains(name: GRPCHeaderName.statusCode) {
  243. // We have the headers, pass them to the next handler:
  244. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(content.headers)))
  245. // Are they valid headers?
  246. let result = self.stateMachine.receiveEndOfResponseStream(content.headers).mapError { error -> GRPCError in
  247. // The headers aren't valid so let's figure out a reasonable error to forward:
  248. switch error {
  249. case .invalidContentType:
  250. return .client(.invalidContentType)
  251. case .invalidHTTPStatus(let status):
  252. return .client(.invalidHTTPStatus(status))
  253. case .invalidHTTPStatusWithGRPCStatus(let status):
  254. return .client(.invalidHTTPStatusWithGRPCStatus(status))
  255. case .invalidState:
  256. return .client(.invalidState("invalid state parsing end-of-stream trailers"))
  257. }
  258. }
  259. // Okay, what should we tell the next handler?
  260. switch result {
  261. case .success(let status):
  262. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  263. case .failure(let error):
  264. context.fireErrorCaught(error)
  265. }
  266. } else {
  267. // "Normal" response headers, but are they valid?
  268. let result = self.stateMachine.receiveResponseHeaders(content.headers).mapError { error -> GRPCError in
  269. // The headers aren't valid so let's figure out a reasonable error to forward:
  270. switch error {
  271. case .invalidContentType:
  272. return .client(.invalidContentType)
  273. case .invalidHTTPStatus(let status):
  274. return .client(.invalidHTTPStatus(status))
  275. case .unsupportedMessageEncoding(let encoding):
  276. return .client(.unsupportedCompressionMechanism(encoding))
  277. case .invalidState:
  278. return .client(.invalidState("invalid state parsing headers"))
  279. }
  280. }
  281. // Okay, what should we tell the next handler?
  282. switch result {
  283. case .success:
  284. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(content.headers)))
  285. case .failure(let error):
  286. context.fireErrorCaught(error)
  287. }
  288. }
  289. }
  290. /// Reads the content from an HTTP/2 DATA frame received from the server and buffers the bytes
  291. /// necessary to deserialize a message (or messages).
  292. ///
  293. /// - Parameter content: Content of the data frame.
  294. /// - Parameter context: Channel handler context.
  295. private func readData(content: HTTP2Frame.FramePayload.Data, context: ChannelHandlerContext) {
  296. // Note: this is replicated from NIO's HTTP2ToHTTP1ClientCodec.
  297. guard case .byteBuffer(var buffer) = content.data else {
  298. preconditionFailure("Received DATA frame with non-ByteBuffer IOData")
  299. }
  300. // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
  301. // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
  302. // containing the gRPC status code) and an additional empty data frame is sent with the
  303. // end-of-stream flag set.
  304. guard buffer.readableBytes > 0 else {
  305. return
  306. }
  307. // Feed the buffer into the state machine.
  308. let result = self.stateMachine.receiveResponseBuffer(&buffer).mapError { error -> GRPCError in
  309. switch error {
  310. case .cardinalityViolation:
  311. return .client(.responseCardinalityViolation)
  312. case .deserializationFailed, .leftOverBytes:
  313. return .client(.responseProtoDeserializationFailure)
  314. case .invalidState:
  315. return .client(.invalidState("invalid state when parsing data as a response message"))
  316. }
  317. }
  318. // Did we get any messages?
  319. switch result {
  320. case .success(let messages):
  321. // Awesome: we got some messages. The state machine guarantees we only get at most a single
  322. // message for unary and client-streaming RPCs.
  323. for message in messages {
  324. context.fireChannelRead(self.wrapInboundOut(.message(.init(message))))
  325. }
  326. case .failure(let error):
  327. context.fireErrorCaught(error)
  328. }
  329. }
  330. }
  331. // MARK: - GRPCClientChannelHandler: Outbound
  332. extension GRPCClientChannelHandler: ChannelOutboundHandler {
  333. public typealias OutboundIn = GRPCClientRequestPart<Request>
  334. public typealias OutboundOut = HTTP2Frame
  335. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  336. switch self.unwrapOutboundIn(data) {
  337. case .head(let requestHead):
  338. // Feed the request into the state machine:
  339. switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
  340. case .success(let headers):
  341. // We're clear to write some headers. Create an appropriate frame and write it.
  342. let frame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers)))
  343. context.write(self.wrapOutboundOut(frame), promise: promise)
  344. case .failure(let sendRequestHeadersError):
  345. switch sendRequestHeadersError {
  346. case .invalidState:
  347. // This is bad: we need to trigger an error and close the channel.
  348. promise?.fail(sendRequestHeadersError)
  349. context.fireErrorCaught(GRPCError.client(.invalidState("unable to initiate RPC")))
  350. }
  351. }
  352. case .message(let request):
  353. // Feed the request message into the state machine:
  354. let result = self.stateMachine.sendRequest(request.value, allocator: context.channel.allocator)
  355. switch result {
  356. case .success(let buffer):
  357. // We're clear to send a message; wrap it up in an HTTP/2 frame.
  358. let frame = HTTP2Frame(
  359. streamID: self.streamID,
  360. payload: .data(.init(data: .byteBuffer(buffer)))
  361. )
  362. context.write(self.wrapOutboundOut(frame), promise: promise)
  363. case .failure(let writeError):
  364. switch writeError {
  365. case .cardinalityViolation:
  366. // This is fine: we can ignore the request. The RPC can continue as if nothing went wrong.
  367. promise?.fail(writeError)
  368. case .serializationFailed:
  369. // This is bad: we need to trigger an error and close the channel.
  370. promise?.fail(writeError)
  371. context.fireErrorCaught(GRPCError.client(.requestProtoSerializationFailure))
  372. case .invalidState:
  373. promise?.fail(writeError)
  374. context.fireErrorCaught(GRPCError.client(.invalidState("unable to write message")))
  375. }
  376. }
  377. case .end:
  378. // Okay: can we close the request stream?
  379. switch self.stateMachine.sendEndOfRequestStream() {
  380. case .success:
  381. // We can. Send an empty DATA frame with end-stream set.
  382. let empty = context.channel.allocator.buffer(capacity: 0)
  383. let frame = HTTP2Frame(
  384. streamID: self.streamID,
  385. payload: .data(.init(data: .byteBuffer(empty), endStream: true))
  386. )
  387. context.write(self.wrapOutboundOut(frame), promise: promise)
  388. case .failure(let error):
  389. // Why can't we close the request stream?
  390. switch error {
  391. case .alreadyClosed:
  392. // This is fine: we can just ignore it. The RPC can continue as if nothing went wrong.
  393. promise?.fail(error)
  394. case .invalidState:
  395. // This is bad: we need to trigger an error and close the channel.
  396. promise?.fail(error)
  397. context.fireErrorCaught(GRPCError.client(.invalidState("unable to close request stream")))
  398. }
  399. }
  400. }
  401. }
  402. public func triggerUserOutboundEvent(
  403. context: ChannelHandlerContext,
  404. event: Any,
  405. promise: EventLoopPromise<Void>?
  406. ) {
  407. if let userEvent = event as? GRPCClientUserEvent {
  408. switch userEvent {
  409. case .cancelled:
  410. context.fireErrorCaught(GRPCClientError.cancelledByClient)
  411. context.close(mode: .all, promise: promise)
  412. }
  413. } else {
  414. context.triggerUserOutboundEvent(event, promise: promise)
  415. }
  416. }
  417. }