_GRPCClientChannelHandler.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP1
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. /// A gRPC client request message part.
  23. ///
  24. /// - Important: This is **NOT** part of the public API. It is declared as
  25. /// `public` because it is used within performance tests.
  26. public enum _GRPCClientRequestPart<Request: Message> {
  27. /// The 'head' of the request, that is, information about the initiation of the RPC.
  28. case head(_GRPCRequestHead)
  29. /// A deserialized request message to send to the server.
  30. case message(_Box<Request>)
  31. /// Indicates that the client does not intend to send any further messages.
  32. case end
  33. }
  34. /// - Important: This is **NOT** part of the public API. It is declared as
  35. /// `public` because it is used within performance tests.
  36. public struct _GRPCRequestHead {
  37. private final class _Storage {
  38. var method: String
  39. var scheme: String
  40. var path: String
  41. var host: String
  42. var timeout: GRPCTimeout
  43. init(
  44. method: String,
  45. scheme: String,
  46. path: String,
  47. host: String,
  48. timeout: GRPCTimeout
  49. ) {
  50. self.method = method
  51. self.scheme = scheme
  52. self.path = path
  53. self.host = host
  54. self.timeout = timeout
  55. }
  56. func copy() -> _Storage {
  57. return .init(
  58. method: self.method,
  59. scheme: self.scheme,
  60. path: self.path,
  61. host: self.host,
  62. timeout: self.timeout
  63. )
  64. }
  65. }
  66. private var _storage: _Storage
  67. // Don't put this in storage: it would CoW for every mutation.
  68. internal var customMetadata: HPACKHeaders
  69. internal var method: String {
  70. get {
  71. return self._storage.method
  72. }
  73. set {
  74. if !isKnownUniquelyReferenced(&self._storage) {
  75. self._storage = self._storage.copy()
  76. }
  77. self._storage.method = newValue
  78. }
  79. }
  80. internal var scheme: String {
  81. get {
  82. return self._storage.scheme
  83. }
  84. set {
  85. if !isKnownUniquelyReferenced(&self._storage) {
  86. self._storage = self._storage.copy()
  87. }
  88. self._storage.scheme = newValue
  89. }
  90. }
  91. internal var path: String {
  92. get {
  93. return self._storage.path
  94. }
  95. set {
  96. if !isKnownUniquelyReferenced(&self._storage) {
  97. self._storage = self._storage.copy()
  98. }
  99. self._storage.path = newValue
  100. }
  101. }
  102. internal var host: String {
  103. get {
  104. return self._storage.host
  105. }
  106. set {
  107. if !isKnownUniquelyReferenced(&self._storage) {
  108. self._storage = self._storage.copy()
  109. }
  110. self._storage.host = newValue
  111. }
  112. }
  113. internal var timeout: GRPCTimeout {
  114. get {
  115. return self._storage.timeout
  116. }
  117. set {
  118. if !isKnownUniquelyReferenced(&self._storage) {
  119. self._storage = self._storage.copy()
  120. }
  121. self._storage.timeout = newValue
  122. }
  123. }
  124. public init(
  125. method: String,
  126. scheme: String,
  127. path: String,
  128. host: String,
  129. timeout: GRPCTimeout,
  130. customMetadata: HPACKHeaders
  131. ) {
  132. self._storage = .init(
  133. method: method,
  134. scheme: scheme,
  135. path: path,
  136. host: host,
  137. timeout: timeout
  138. )
  139. self.customMetadata = customMetadata
  140. }
  141. }
  142. /// A gRPC client response message part.
  143. ///
  144. /// - Important: This is **NOT** part of the public API.
  145. public enum _GRPCClientResponsePart<Response: Message> {
  146. /// Metadata received as the server acknowledges the RPC.
  147. case initialMetadata(HPACKHeaders)
  148. /// A deserialized response message received from the server.
  149. case message(_Box<Response>)
  150. /// The metadata received at the end of the RPC.
  151. case trailingMetadata(HPACKHeaders)
  152. /// The final status of the RPC.
  153. case status(GRPCStatus)
  154. }
  155. /// The type of gRPC call.
  156. public enum GRPCCallType {
  157. /// Unary: a single request and a single response.
  158. case unary
  159. /// Client streaming: many requests and a single response.
  160. case clientStreaming
  161. /// Server streaming: a single request and many responses.
  162. case serverStreaming
  163. /// Bidirectional streaming: many request and many responses.
  164. case bidirectionalStreaming
  165. }
  166. // MARK: - GRPCClientChannelHandler
  167. /// A channel handler for gRPC clients which translates HTTP/2 frames into gRPC messages.
  168. ///
  169. /// This channel handler should typically be used in conjunction with another handler which
  170. /// reads the parsed `GRPCClientResponsePart<Response>` messages and surfaces them to the caller
  171. /// in some fashion. Note that for unary and client streaming RPCs this handler will only emit at
  172. /// most one response message.
  173. ///
  174. /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request
  175. /// and response streams, which share a single HTTP/2 stream for transport.
  176. ///
  177. /// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2:
  178. ///
  179. /// ```
  180. /// let multiplexer: HTTP2StreamMultiplexer = // ...
  181. /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
  182. /// let clientChannelHandler = GRPCClientChannelHandler<Request, Response>(
  183. /// streamID: streamID,
  184. /// callType: callType,
  185. /// logger: logger
  186. /// )
  187. /// return channel.pipeline.addHandler(clientChannelHandler)
  188. /// }
  189. /// ```
  190. ///
  191. /// - Important: This is **NOT** part of the public API. It is declared as
  192. /// `public` because it is used within performance tests.
  193. public final class _GRPCClientChannelHandler<Request: Message, Response: Message> {
  194. private let logger: Logger
  195. private let streamID: HTTP2StreamID
  196. private var stateMachine: GRPCClientStateMachine<Request, Response>
  197. /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
  198. ///
  199. /// - Parameters:
  200. /// - streamID: The ID of the HTTP/2 stream that this handler will read and write HTTP/2
  201. /// frames on.
  202. /// - callType: Type of RPC call being made.
  203. /// - logger: Logger.
  204. public init(streamID: HTTP2StreamID, callType: GRPCCallType, logger: Logger) {
  205. self.streamID = streamID
  206. self.logger = logger
  207. switch callType {
  208. case .unary:
  209. self.stateMachine = .init(requestArity: .one, responseArity: .one)
  210. case .clientStreaming:
  211. self.stateMachine = .init(requestArity: .many, responseArity: .one)
  212. case .serverStreaming:
  213. self.stateMachine = .init(requestArity: .one, responseArity: .many)
  214. case .bidirectionalStreaming:
  215. self.stateMachine = .init(requestArity: .many, responseArity: .many)
  216. }
  217. }
  218. }
  219. // MARK: - GRPCClientChannelHandler: Inbound
  220. extension _GRPCClientChannelHandler: ChannelInboundHandler {
  221. public typealias InboundIn = HTTP2Frame
  222. public typealias InboundOut = _GRPCClientResponsePart<Response>
  223. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  224. let frame = self.unwrapInboundIn(data)
  225. switch frame.payload {
  226. case .headers(let content):
  227. self.readHeaders(content: content, context: context)
  228. case .data(let content):
  229. self.readData(content: content, context: context)
  230. // We don't need to handle other frame type, just drop them instead.
  231. default:
  232. // TODO: synthesise a more precise `GRPCStatus` from RST_STREAM frames in accordance
  233. // with: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
  234. break
  235. }
  236. }
  237. /// Read the content from an HTTP/2 HEADERS frame received from the server.
  238. ///
  239. /// We can receive headers in two cases:
  240. /// - when the RPC is being acknowledged, and
  241. /// - when the RPC is being terminated.
  242. ///
  243. /// It is also possible for the RPC to be acknowledged and terminated at the same time, the
  244. /// specification refers to this as a "Trailers-Only" response.
  245. ///
  246. /// - Parameter content: Content of the headers frame.
  247. /// - Parameter context: Channel handler context.
  248. private func readHeaders(content: HTTP2Frame.FramePayload.Headers, context: ChannelHandlerContext) {
  249. // In the case of a "Trailers-Only" response there's no guarantee that end-of-stream will be set
  250. // on the headers frame: end stream may be sent on an empty data frame as well. If the headers
  251. // contain a gRPC status code then they must be for a "Trailers-Only" response.
  252. if content.endStream || content.headers.contains(name: GRPCHeaderName.statusCode) {
  253. // We have the headers, pass them to the next handler:
  254. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(content.headers)))
  255. // Are they valid headers?
  256. let result = self.stateMachine.receiveEndOfResponseStream(content.headers).mapError { error -> GRPCError in
  257. // The headers aren't valid so let's figure out a reasonable error to forward:
  258. switch error {
  259. case .invalidContentType:
  260. return .client(.invalidContentType)
  261. case .invalidHTTPStatus(let status):
  262. return .client(.invalidHTTPStatus(status))
  263. case .invalidHTTPStatusWithGRPCStatus(let status):
  264. return .client(.invalidHTTPStatusWithGRPCStatus(status))
  265. case .invalidState:
  266. return .client(.invalidState("invalid state parsing end-of-stream trailers"))
  267. }
  268. }
  269. // Okay, what should we tell the next handler?
  270. switch result {
  271. case .success(let status):
  272. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  273. case .failure(let error):
  274. context.fireErrorCaught(error)
  275. }
  276. } else {
  277. // "Normal" response headers, but are they valid?
  278. let result = self.stateMachine.receiveResponseHeaders(content.headers).mapError { error -> GRPCError in
  279. // The headers aren't valid so let's figure out a reasonable error to forward:
  280. switch error {
  281. case .invalidContentType:
  282. return .client(.invalidContentType)
  283. case .invalidHTTPStatus(let status):
  284. return .client(.invalidHTTPStatus(status))
  285. case .unsupportedMessageEncoding(let encoding):
  286. return .client(.unsupportedCompressionMechanism(encoding))
  287. case .invalidState:
  288. return .client(.invalidState("invalid state parsing headers"))
  289. }
  290. }
  291. // Okay, what should we tell the next handler?
  292. switch result {
  293. case .success:
  294. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(content.headers)))
  295. case .failure(let error):
  296. context.fireErrorCaught(error)
  297. }
  298. }
  299. }
  300. /// Reads the content from an HTTP/2 DATA frame received from the server and buffers the bytes
  301. /// necessary to deserialize a message (or messages).
  302. ///
  303. /// - Parameter content: Content of the data frame.
  304. /// - Parameter context: Channel handler context.
  305. private func readData(content: HTTP2Frame.FramePayload.Data, context: ChannelHandlerContext) {
  306. // Note: this is replicated from NIO's HTTP2ToHTTP1ClientCodec.
  307. guard case .byteBuffer(var buffer) = content.data else {
  308. preconditionFailure("Received DATA frame with non-ByteBuffer IOData")
  309. }
  310. // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
  311. // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
  312. // containing the gRPC status code) and an additional empty data frame is sent with the
  313. // end-of-stream flag set.
  314. guard buffer.readableBytes > 0 else {
  315. return
  316. }
  317. // Feed the buffer into the state machine.
  318. let result = self.stateMachine.receiveResponseBuffer(&buffer).mapError { error -> GRPCError in
  319. switch error {
  320. case .cardinalityViolation:
  321. return .client(.responseCardinalityViolation)
  322. case .deserializationFailed, .leftOverBytes:
  323. return .client(.responseProtoDeserializationFailure)
  324. case .invalidState:
  325. return .client(.invalidState("invalid state when parsing data as a response message"))
  326. }
  327. }
  328. // Did we get any messages?
  329. switch result {
  330. case .success(let messages):
  331. // Awesome: we got some messages. The state machine guarantees we only get at most a single
  332. // message for unary and client-streaming RPCs.
  333. for message in messages {
  334. context.fireChannelRead(self.wrapInboundOut(.message(.init(message))))
  335. }
  336. case .failure(let error):
  337. context.fireErrorCaught(error)
  338. }
  339. }
  340. }
  341. // MARK: - GRPCClientChannelHandler: Outbound
  342. extension _GRPCClientChannelHandler: ChannelOutboundHandler {
  343. public typealias OutboundIn = _GRPCClientRequestPart<Request>
  344. public typealias OutboundOut = HTTP2Frame
  345. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  346. switch self.unwrapOutboundIn(data) {
  347. case .head(let requestHead):
  348. // Feed the request into the state machine:
  349. switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
  350. case .success(let headers):
  351. // We're clear to write some headers. Create an appropriate frame and write it.
  352. let frame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers)))
  353. context.write(self.wrapOutboundOut(frame), promise: promise)
  354. case .failure(let sendRequestHeadersError):
  355. switch sendRequestHeadersError {
  356. case .invalidState:
  357. // This is bad: we need to trigger an error and close the channel.
  358. promise?.fail(sendRequestHeadersError)
  359. context.fireErrorCaught(GRPCError.client(.invalidState("unable to initiate RPC")))
  360. }
  361. }
  362. case .message(let request):
  363. // Feed the request message into the state machine:
  364. let result = self.stateMachine.sendRequest(request.value, allocator: context.channel.allocator)
  365. switch result {
  366. case .success(let buffer):
  367. // We're clear to send a message; wrap it up in an HTTP/2 frame.
  368. let frame = HTTP2Frame(
  369. streamID: self.streamID,
  370. payload: .data(.init(data: .byteBuffer(buffer)))
  371. )
  372. context.write(self.wrapOutboundOut(frame), promise: promise)
  373. case .failure(let writeError):
  374. switch writeError {
  375. case .cardinalityViolation:
  376. // This is fine: we can ignore the request. The RPC can continue as if nothing went wrong.
  377. promise?.fail(writeError)
  378. case .serializationFailed:
  379. // This is bad: we need to trigger an error and close the channel.
  380. promise?.fail(writeError)
  381. context.fireErrorCaught(GRPCError.client(.requestProtoSerializationFailure))
  382. case .invalidState:
  383. promise?.fail(writeError)
  384. context.fireErrorCaught(GRPCError.client(.invalidState("unable to write message")))
  385. }
  386. }
  387. case .end:
  388. // Okay: can we close the request stream?
  389. switch self.stateMachine.sendEndOfRequestStream() {
  390. case .success:
  391. // We can. Send an empty DATA frame with end-stream set.
  392. let empty = context.channel.allocator.buffer(capacity: 0)
  393. let frame = HTTP2Frame(
  394. streamID: self.streamID,
  395. payload: .data(.init(data: .byteBuffer(empty), endStream: true))
  396. )
  397. context.write(self.wrapOutboundOut(frame), promise: promise)
  398. case .failure(let error):
  399. // Why can't we close the request stream?
  400. switch error {
  401. case .alreadyClosed:
  402. // This is fine: we can just ignore it. The RPC can continue as if nothing went wrong.
  403. promise?.fail(error)
  404. case .invalidState:
  405. // This is bad: we need to trigger an error and close the channel.
  406. promise?.fail(error)
  407. context.fireErrorCaught(GRPCError.client(.invalidState("unable to close request stream")))
  408. }
  409. }
  410. }
  411. }
  412. public func triggerUserOutboundEvent(
  413. context: ChannelHandlerContext,
  414. event: Any,
  415. promise: EventLoopPromise<Void>?
  416. ) {
  417. if let userEvent = event as? GRPCClientUserEvent {
  418. switch userEvent {
  419. case .cancelled:
  420. context.fireErrorCaught(GRPCClientError.cancelledByClient)
  421. context.close(mode: .all, promise: promise)
  422. }
  423. } else {
  424. context.triggerUserOutboundEvent(event, promise: promise)
  425. }
  426. }
  427. }