_GRPCClientChannelHandler.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP1
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. /// A gRPC client request message part.
  23. ///
  24. /// - Important: This is **NOT** part of the public API. It is declared as
  25. /// `public` because it is used within performance tests.
  26. public enum _GRPCClientRequestPart<Request: GRPCPayload> {
  27. /// The 'head' of the request, that is, information about the initiation of the RPC.
  28. case head(_GRPCRequestHead)
  29. /// A deserialized request message to send to the server.
  30. case message(_MessageContext<Request>)
  31. /// Indicates that the client does not intend to send any further messages.
  32. case end
  33. }
  34. /// - Important: This is **NOT** part of the public API. It is declared as
  35. /// `public` because it is used within performance tests.
  36. public struct _GRPCRequestHead {
  37. private final class _Storage {
  38. var method: String
  39. var scheme: String
  40. var path: String
  41. var host: String
  42. var timeout: GRPCTimeout
  43. var encoding: ClientMessageEncoding
  44. init(
  45. method: String,
  46. scheme: String,
  47. path: String,
  48. host: String,
  49. timeout: GRPCTimeout,
  50. encoding: ClientMessageEncoding
  51. ) {
  52. self.method = method
  53. self.scheme = scheme
  54. self.path = path
  55. self.host = host
  56. self.timeout = timeout
  57. self.encoding = encoding
  58. }
  59. func copy() -> _Storage {
  60. return .init(
  61. method: self.method,
  62. scheme: self.scheme,
  63. path: self.path,
  64. host: self.host,
  65. timeout: self.timeout,
  66. encoding: self.encoding
  67. )
  68. }
  69. }
  70. private var _storage: _Storage
  71. // Don't put this in storage: it would CoW for every mutation.
  72. internal var customMetadata: HPACKHeaders
  73. internal var method: String {
  74. get {
  75. return self._storage.method
  76. }
  77. set {
  78. if !isKnownUniquelyReferenced(&self._storage) {
  79. self._storage = self._storage.copy()
  80. }
  81. self._storage.method = newValue
  82. }
  83. }
  84. internal var scheme: String {
  85. get {
  86. return self._storage.scheme
  87. }
  88. set {
  89. if !isKnownUniquelyReferenced(&self._storage) {
  90. self._storage = self._storage.copy()
  91. }
  92. self._storage.scheme = newValue
  93. }
  94. }
  95. internal var path: String {
  96. get {
  97. return self._storage.path
  98. }
  99. set {
  100. if !isKnownUniquelyReferenced(&self._storage) {
  101. self._storage = self._storage.copy()
  102. }
  103. self._storage.path = newValue
  104. }
  105. }
  106. internal var host: String {
  107. get {
  108. return self._storage.host
  109. }
  110. set {
  111. if !isKnownUniquelyReferenced(&self._storage) {
  112. self._storage = self._storage.copy()
  113. }
  114. self._storage.host = newValue
  115. }
  116. }
  117. internal var timeout: GRPCTimeout {
  118. get {
  119. return self._storage.timeout
  120. }
  121. set {
  122. if !isKnownUniquelyReferenced(&self._storage) {
  123. self._storage = self._storage.copy()
  124. }
  125. self._storage.timeout = newValue
  126. }
  127. }
  128. internal var encoding: ClientMessageEncoding {
  129. get {
  130. return self._storage.encoding
  131. }
  132. set {
  133. if !isKnownUniquelyReferenced(&self._storage) {
  134. self._storage = self._storage.copy()
  135. }
  136. self._storage.encoding = newValue
  137. }
  138. }
  139. public init(
  140. method: String,
  141. scheme: String,
  142. path: String,
  143. host: String,
  144. timeout: GRPCTimeout,
  145. customMetadata: HPACKHeaders,
  146. encoding: ClientMessageEncoding
  147. ) {
  148. self._storage = .init(
  149. method: method,
  150. scheme: scheme,
  151. path: path,
  152. host: host,
  153. timeout: timeout,
  154. encoding: encoding
  155. )
  156. self.customMetadata = customMetadata
  157. }
  158. }
  159. /// A gRPC client response message part.
  160. ///
  161. /// - Important: This is **NOT** part of the public API.
  162. public enum _GRPCClientResponsePart<Response: GRPCPayload> {
  163. /// Metadata received as the server acknowledges the RPC.
  164. case initialMetadata(HPACKHeaders)
  165. /// A deserialized response message received from the server.
  166. case message(_MessageContext<Response>)
  167. /// The metadata received at the end of the RPC.
  168. case trailingMetadata(HPACKHeaders)
  169. /// The final status of the RPC.
  170. case status(GRPCStatus)
  171. }
  172. /// The type of gRPC call.
  173. public enum GRPCCallType {
  174. /// Unary: a single request and a single response.
  175. case unary
  176. /// Client streaming: many requests and a single response.
  177. case clientStreaming
  178. /// Server streaming: a single request and many responses.
  179. case serverStreaming
  180. /// Bidirectional streaming: many request and many responses.
  181. case bidirectionalStreaming
  182. }
  183. // MARK: - GRPCClientChannelHandler
  184. /// A channel handler for gRPC clients which translates HTTP/2 frames into gRPC messages.
  185. ///
  186. /// This channel handler should typically be used in conjunction with another handler which
  187. /// reads the parsed `GRPCClientResponsePart<Response>` messages and surfaces them to the caller
  188. /// in some fashion. Note that for unary and client streaming RPCs this handler will only emit at
  189. /// most one response message.
  190. ///
  191. /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request
  192. /// and response streams, which share a single HTTP/2 stream for transport.
  193. ///
  194. /// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2:
  195. ///
  196. /// ```
  197. /// let multiplexer: HTTP2StreamMultiplexer = // ...
  198. /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
  199. /// let clientChannelHandler = GRPCClientChannelHandler<Request, Response>(
  200. /// streamID: streamID,
  201. /// callType: callType,
  202. /// logger: logger
  203. /// )
  204. /// return channel.pipeline.addHandler(clientChannelHandler)
  205. /// }
  206. /// ```
  207. ///
  208. /// - Important: This is **NOT** part of the public API. It is declared as
  209. /// `public` because it is used within performance tests.
  210. public final class _GRPCClientChannelHandler<Request: GRPCPayload, Response: GRPCPayload> {
  211. private let logger: Logger
  212. private let streamID: HTTP2StreamID
  213. private var stateMachine: GRPCClientStateMachine<Request, Response>
  214. /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
  215. ///
  216. /// - Parameters:
  217. /// - streamID: The ID of the HTTP/2 stream that this handler will read and write HTTP/2
  218. /// frames on.
  219. /// - callType: Type of RPC call being made.
  220. /// - logger: Logger.
  221. public init(streamID: HTTP2StreamID, callType: GRPCCallType, logger: Logger) {
  222. self.streamID = streamID
  223. self.logger = logger
  224. switch callType {
  225. case .unary:
  226. self.stateMachine = .init(requestArity: .one, responseArity: .one)
  227. case .clientStreaming:
  228. self.stateMachine = .init(requestArity: .many, responseArity: .one)
  229. case .serverStreaming:
  230. self.stateMachine = .init(requestArity: .one, responseArity: .many)
  231. case .bidirectionalStreaming:
  232. self.stateMachine = .init(requestArity: .many, responseArity: .many)
  233. }
  234. }
  235. }
  236. // MARK: - GRPCClientChannelHandler: Inbound
  237. extension _GRPCClientChannelHandler: ChannelInboundHandler {
  238. public typealias InboundIn = HTTP2Frame
  239. public typealias InboundOut = _GRPCClientResponsePart<Response>
  240. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  241. let frame = self.unwrapInboundIn(data)
  242. switch frame.payload {
  243. case .headers(let content):
  244. self.readHeaders(content: content, context: context)
  245. case .data(let content):
  246. self.readData(content: content, context: context)
  247. // We don't need to handle other frame type, just drop them instead.
  248. default:
  249. // TODO: synthesise a more precise `GRPCStatus` from RST_STREAM frames in accordance
  250. // with: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
  251. break
  252. }
  253. }
  254. /// Read the content from an HTTP/2 HEADERS frame received from the server.
  255. ///
  256. /// We can receive headers in two cases:
  257. /// - when the RPC is being acknowledged, and
  258. /// - when the RPC is being terminated.
  259. ///
  260. /// It is also possible for the RPC to be acknowledged and terminated at the same time, the
  261. /// specification refers to this as a "Trailers-Only" response.
  262. ///
  263. /// - Parameter content: Content of the headers frame.
  264. /// - Parameter context: Channel handler context.
  265. private func readHeaders(content: HTTP2Frame.FramePayload.Headers, context: ChannelHandlerContext) {
  266. // In the case of a "Trailers-Only" response there's no guarantee that end-of-stream will be set
  267. // on the headers frame: end stream may be sent on an empty data frame as well. If the headers
  268. // contain a gRPC status code then they must be for a "Trailers-Only" response.
  269. if content.endStream || content.headers.contains(name: GRPCHeaderName.statusCode) {
  270. // We have the headers, pass them to the next handler:
  271. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(content.headers)))
  272. // Are they valid headers?
  273. let result = self.stateMachine.receiveEndOfResponseStream(content.headers).mapError { error -> GRPCError.WithContext in
  274. // The headers aren't valid so let's figure out a reasonable error to forward:
  275. switch error {
  276. case .invalidContentType(let contentType):
  277. return GRPCError.InvalidContentType(contentType).captureContext()
  278. case .invalidHTTPStatus(let status):
  279. return GRPCError.InvalidHTTPStatus(status).captureContext()
  280. case .invalidHTTPStatusWithGRPCStatus(let status):
  281. return GRPCError.InvalidHTTPStatusWithGRPCStatus(status).captureContext()
  282. case .invalidState:
  283. return GRPCError.InvalidState("parsing end-of-stream trailers").captureContext()
  284. }
  285. }
  286. // Okay, what should we tell the next handler?
  287. switch result {
  288. case .success(let status):
  289. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  290. case .failure(let error):
  291. context.fireErrorCaught(error)
  292. }
  293. } else {
  294. // "Normal" response headers, but are they valid?
  295. let result = self.stateMachine.receiveResponseHeaders(content.headers).mapError { error -> GRPCError.WithContext in
  296. // The headers aren't valid so let's figure out a reasonable error to forward:
  297. switch error {
  298. case .invalidContentType(let contentType):
  299. return GRPCError.InvalidContentType(contentType).captureContext()
  300. case .invalidHTTPStatus(let status):
  301. return GRPCError.InvalidHTTPStatus(status).captureContext()
  302. case .unsupportedMessageEncoding:
  303. return GRPCError.CompressionUnsupported().captureContext()
  304. case .invalidState:
  305. return GRPCError.InvalidState("parsing headers").captureContext()
  306. }
  307. }
  308. // Okay, what should we tell the next handler?
  309. switch result {
  310. case .success:
  311. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(content.headers)))
  312. case .failure(let error):
  313. context.fireErrorCaught(error)
  314. }
  315. }
  316. }
  317. /// Reads the content from an HTTP/2 DATA frame received from the server and buffers the bytes
  318. /// necessary to deserialize a message (or messages).
  319. ///
  320. /// - Parameter content: Content of the data frame.
  321. /// - Parameter context: Channel handler context.
  322. private func readData(content: HTTP2Frame.FramePayload.Data, context: ChannelHandlerContext) {
  323. // Note: this is replicated from NIO's HTTP2ToHTTP1ClientCodec.
  324. guard case .byteBuffer(var buffer) = content.data else {
  325. preconditionFailure("Received DATA frame with non-ByteBuffer IOData")
  326. }
  327. // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
  328. // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
  329. // containing the gRPC status code) and an additional empty data frame is sent with the
  330. // end-of-stream flag set.
  331. guard buffer.readableBytes > 0 else {
  332. return
  333. }
  334. // Feed the buffer into the state machine.
  335. let result = self.stateMachine.receiveResponseBuffer(&buffer).mapError { error -> GRPCError.WithContext in
  336. switch error {
  337. case .cardinalityViolation:
  338. return GRPCError.StreamCardinalityViolation(stream: .response).captureContext()
  339. case .deserializationFailed, .leftOverBytes:
  340. return GRPCError.DeserializationFailure().captureContext()
  341. case .decompressionLimitExceeded(let compressedSize):
  342. return GRPCError.DecompressionLimitExceeded(compressedSize: compressedSize).captureContext()
  343. case .invalidState:
  344. return GRPCError.InvalidState("parsing data as a response message").captureContext()
  345. }
  346. }
  347. // Did we get any messages?
  348. switch result {
  349. case .success(let messages):
  350. // Awesome: we got some messages. The state machine guarantees we only get at most a single
  351. // message for unary and client-streaming RPCs.
  352. for message in messages {
  353. // Note: `compressed: false` is currently just a placeholder. This is fine since the message
  354. // context is not currently exposed to the user. If we implement interceptors for the client
  355. // and decide to surface this information then we'll need to extract that information from
  356. // the message reader.
  357. context.fireChannelRead(self.wrapInboundOut(.message(.init(message, compressed: false))))
  358. }
  359. case .failure(let error):
  360. context.fireErrorCaught(error)
  361. }
  362. }
  363. }
  364. // MARK: - GRPCClientChannelHandler: Outbound
  365. extension _GRPCClientChannelHandler: ChannelOutboundHandler {
  366. public typealias OutboundIn = _GRPCClientRequestPart<Request>
  367. public typealias OutboundOut = HTTP2Frame
  368. public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  369. switch self.unwrapOutboundIn(data) {
  370. case .head(let requestHead):
  371. // Feed the request into the state machine:
  372. switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
  373. case .success(let headers):
  374. // We're clear to write some headers. Create an appropriate frame and write it.
  375. let frame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers)))
  376. context.write(self.wrapOutboundOut(frame), promise: promise)
  377. case .failure(let sendRequestHeadersError):
  378. switch sendRequestHeadersError {
  379. case .invalidState:
  380. // This is bad: we need to trigger an error and close the channel.
  381. promise?.fail(sendRequestHeadersError)
  382. context.fireErrorCaught(GRPCError.InvalidState("unable to initiate RPC").captureContext())
  383. }
  384. }
  385. case .message(let request):
  386. // Feed the request message into the state machine:
  387. let result = self.stateMachine.sendRequest(request.message, compressed: request.compressed, allocator: context.channel.allocator)
  388. switch result {
  389. case .success(let buffer):
  390. // We're clear to send a message; wrap it up in an HTTP/2 frame.
  391. let frame = HTTP2Frame(
  392. streamID: self.streamID,
  393. payload: .data(.init(data: .byteBuffer(buffer)))
  394. )
  395. context.write(self.wrapOutboundOut(frame), promise: promise)
  396. case .failure(let writeError):
  397. switch writeError {
  398. case .cardinalityViolation:
  399. // This is fine: we can ignore the request. The RPC can continue as if nothing went wrong.
  400. promise?.fail(writeError)
  401. case .serializationFailed:
  402. // This is bad: we need to trigger an error and close the channel.
  403. promise?.fail(writeError)
  404. context.fireErrorCaught(GRPCError.SerializationFailure().captureContext())
  405. case .invalidState:
  406. promise?.fail(writeError)
  407. context.fireErrorCaught(GRPCError.InvalidState("unable to write message").captureContext())
  408. }
  409. }
  410. case .end:
  411. // Okay: can we close the request stream?
  412. switch self.stateMachine.sendEndOfRequestStream() {
  413. case .success:
  414. // We can. Send an empty DATA frame with end-stream set.
  415. let empty = context.channel.allocator.buffer(capacity: 0)
  416. let frame = HTTP2Frame(
  417. streamID: self.streamID,
  418. payload: .data(.init(data: .byteBuffer(empty), endStream: true))
  419. )
  420. context.write(self.wrapOutboundOut(frame), promise: promise)
  421. case .failure(let error):
  422. // Why can't we close the request stream?
  423. switch error {
  424. case .alreadyClosed:
  425. // This is fine: we can just ignore it. The RPC can continue as if nothing went wrong.
  426. promise?.fail(error)
  427. case .invalidState:
  428. // This is bad: we need to trigger an error and close the channel.
  429. promise?.fail(error)
  430. context.fireErrorCaught(GRPCError.InvalidState("unable to close request stream").captureContext())
  431. }
  432. }
  433. }
  434. }
  435. public func triggerUserOutboundEvent(
  436. context: ChannelHandlerContext,
  437. event: Any,
  438. promise: EventLoopPromise<Void>?
  439. ) {
  440. if let userEvent = event as? GRPCClientUserEvent {
  441. switch userEvent {
  442. case .cancelled:
  443. context.fireErrorCaught(GRPCError.RPCCancelledByClient().captureContext())
  444. context.close(mode: .all, promise: promise)
  445. }
  446. } else {
  447. context.triggerUserOutboundEvent(event, promise: promise)
  448. }
  449. }
  450. }