_GRPCClientChannelHandler.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. import SwiftProtobuf
  22. /// A gRPC client request message part.
  23. ///
  24. /// - Important: This is **NOT** part of the public API. It is declared as
  25. /// `public` because it is used within performance tests.
  26. public enum _GRPCClientRequestPart<Request> {
  27. /// The 'head' of the request, that is, information about the initiation of the RPC.
  28. case head(_GRPCRequestHead)
  29. /// A deserialized request message to send to the server.
  30. case message(_MessageContext<Request>)
  31. /// Indicates that the client does not intend to send any further messages.
  32. case end
  33. }
  34. /// As `_GRPCClientRequestPart` but messages are serialized.
  35. public typealias _RawGRPCClientRequestPart = _GRPCClientRequestPart<ByteBuffer>
  36. /// A gRPC client response message part.
  37. ///
  38. /// - Important: This is **NOT** part of the public API.
  39. public enum _GRPCClientResponsePart<Response> {
  40. /// Metadata received as the server acknowledges the RPC.
  41. case initialMetadata(HPACKHeaders)
  42. /// A deserialized response message received from the server.
  43. case message(_MessageContext<Response>)
  44. /// The metadata received at the end of the RPC.
  45. case trailingMetadata(HPACKHeaders)
  46. /// The final status of the RPC.
  47. case status(GRPCStatus)
  48. }
  49. /// As `_GRPCClientResponsePart` but messages are serialized.
  50. public typealias _RawGRPCClientResponsePart = _GRPCClientResponsePart<ByteBuffer>
  51. /// - Important: This is **NOT** part of the public API. It is declared as
  52. /// `public` because it is used within performance tests.
  53. public struct _GRPCRequestHead {
  54. private final class _Storage {
  55. var method: String
  56. var scheme: String
  57. var path: String
  58. var host: String
  59. var deadline: NIODeadline
  60. var encoding: ClientMessageEncoding
  61. init(
  62. method: String,
  63. scheme: String,
  64. path: String,
  65. host: String,
  66. deadline: NIODeadline,
  67. encoding: ClientMessageEncoding
  68. ) {
  69. self.method = method
  70. self.scheme = scheme
  71. self.path = path
  72. self.host = host
  73. self.deadline = deadline
  74. self.encoding = encoding
  75. }
  76. func copy() -> _Storage {
  77. return .init(
  78. method: self.method,
  79. scheme: self.scheme,
  80. path: self.path,
  81. host: self.host,
  82. deadline: self.deadline,
  83. encoding: self.encoding
  84. )
  85. }
  86. }
  87. private var _storage: _Storage
  88. // Don't put this in storage: it would CoW for every mutation.
  89. internal var customMetadata: HPACKHeaders
  90. internal var method: String {
  91. get {
  92. return self._storage.method
  93. }
  94. set {
  95. if !isKnownUniquelyReferenced(&self._storage) {
  96. self._storage = self._storage.copy()
  97. }
  98. self._storage.method = newValue
  99. }
  100. }
  101. internal var scheme: String {
  102. get {
  103. return self._storage.scheme
  104. }
  105. set {
  106. if !isKnownUniquelyReferenced(&self._storage) {
  107. self._storage = self._storage.copy()
  108. }
  109. self._storage.scheme = newValue
  110. }
  111. }
  112. internal var path: String {
  113. get {
  114. return self._storage.path
  115. }
  116. set {
  117. if !isKnownUniquelyReferenced(&self._storage) {
  118. self._storage = self._storage.copy()
  119. }
  120. self._storage.path = newValue
  121. }
  122. }
  123. internal var host: String {
  124. get {
  125. return self._storage.host
  126. }
  127. set {
  128. if !isKnownUniquelyReferenced(&self._storage) {
  129. self._storage = self._storage.copy()
  130. }
  131. self._storage.host = newValue
  132. }
  133. }
  134. internal var deadline: NIODeadline {
  135. get {
  136. return self._storage.deadline
  137. }
  138. set {
  139. if !isKnownUniquelyReferenced(&self._storage) {
  140. self._storage = self._storage.copy()
  141. }
  142. self._storage.deadline = newValue
  143. }
  144. }
  145. internal var encoding: ClientMessageEncoding {
  146. get {
  147. return self._storage.encoding
  148. }
  149. set {
  150. if !isKnownUniquelyReferenced(&self._storage) {
  151. self._storage = self._storage.copy()
  152. }
  153. self._storage.encoding = newValue
  154. }
  155. }
  156. public init(
  157. method: String,
  158. scheme: String,
  159. path: String,
  160. host: String,
  161. deadline: NIODeadline,
  162. customMetadata: HPACKHeaders,
  163. encoding: ClientMessageEncoding
  164. ) {
  165. self._storage = .init(
  166. method: method,
  167. scheme: scheme,
  168. path: path,
  169. host: host,
  170. deadline: deadline,
  171. encoding: encoding
  172. )
  173. self.customMetadata = customMetadata
  174. }
  175. }
  176. extension _GRPCRequestHead {
  177. internal init(
  178. scheme: String,
  179. path: String,
  180. host: String,
  181. options: CallOptions,
  182. requestID: String?
  183. ) {
  184. let metadata: HPACKHeaders
  185. if let requestID = requestID, let requestIDHeader = options.requestIDHeader {
  186. var customMetadata = options.customMetadata
  187. customMetadata.add(name: requestIDHeader, value: requestID)
  188. metadata = customMetadata
  189. } else {
  190. metadata = options.customMetadata
  191. }
  192. self = _GRPCRequestHead(
  193. method: options.cacheable ? "GET" : "POST",
  194. scheme: scheme,
  195. path: path,
  196. host: host,
  197. deadline: options.timeLimit.makeDeadline(),
  198. customMetadata: metadata,
  199. encoding: options.messageEncoding
  200. )
  201. }
  202. }
  203. /// The type of gRPC call.
  204. public enum GRPCCallType {
  205. /// Unary: a single request and a single response.
  206. case unary
  207. /// Client streaming: many requests and a single response.
  208. case clientStreaming
  209. /// Server streaming: a single request and many responses.
  210. case serverStreaming
  211. /// Bidirectional streaming: many request and many responses.
  212. case bidirectionalStreaming
  213. }
  214. // MARK: - GRPCClientChannelHandler
  215. /// A channel handler for gRPC clients which translates HTTP/2 frames into gRPC messages.
  216. ///
  217. /// This channel handler should typically be used in conjunction with another handler which
  218. /// reads the parsed `GRPCClientResponsePart<Response>` messages and surfaces them to the caller
  219. /// in some fashion. Note that for unary and client streaming RPCs this handler will only emit at
  220. /// most one response message.
  221. ///
  222. /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request
  223. /// and response streams, which share a single HTTP/2 stream for transport.
  224. ///
  225. /// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2:
  226. ///
  227. /// ```
  228. /// let multiplexer: HTTP2StreamMultiplexer = // ...
  229. /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
  230. /// let clientChannelHandler = GRPCClientChannelHandler<Request, Response>(
  231. /// streamID: streamID,
  232. /// callType: callType,
  233. /// logger: logger
  234. /// )
  235. /// return channel.pipeline.addHandler(clientChannelHandler)
  236. /// }
  237. /// ```
  238. ///
  239. /// - Important: This is **NOT** part of the public API. It is declared as
  240. /// `public` because it is used within performance tests.
  241. public final class _GRPCClientChannelHandler {
  242. private let logger: Logger
  243. private var stateMachine: GRPCClientStateMachine
  244. /// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
  245. ///
  246. /// - Parameters:
  247. /// - callType: Type of RPC call being made.
  248. /// - logger: Logger.
  249. public init(callType: GRPCCallType, logger: Logger) {
  250. self.logger = logger
  251. switch callType {
  252. case .unary:
  253. self.stateMachine = .init(requestArity: .one, responseArity: .one)
  254. case .clientStreaming:
  255. self.stateMachine = .init(requestArity: .many, responseArity: .one)
  256. case .serverStreaming:
  257. self.stateMachine = .init(requestArity: .one, responseArity: .many)
  258. case .bidirectionalStreaming:
  259. self.stateMachine = .init(requestArity: .many, responseArity: .many)
  260. }
  261. }
  262. }
  263. // MARK: - GRPCClientChannelHandler: Inbound
  264. extension _GRPCClientChannelHandler: ChannelInboundHandler {
  265. public typealias InboundIn = HTTP2Frame.FramePayload
  266. public typealias InboundOut = _RawGRPCClientResponsePart
  267. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  268. let payload = self.unwrapInboundIn(data)
  269. switch payload {
  270. case let .headers(content):
  271. self.readHeaders(content: content, context: context)
  272. case let .data(content):
  273. self.readData(content: content, context: context)
  274. // We don't need to handle other frame type, just drop them instead.
  275. default:
  276. // TODO: synthesise a more precise `GRPCStatus` from RST_STREAM frames in accordance
  277. // with: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
  278. break
  279. }
  280. }
  281. /// Read the content from an HTTP/2 HEADERS frame received from the server.
  282. ///
  283. /// We can receive headers in two cases:
  284. /// - when the RPC is being acknowledged, and
  285. /// - when the RPC is being terminated.
  286. ///
  287. /// It is also possible for the RPC to be acknowledged and terminated at the same time, the
  288. /// specification refers to this as a "Trailers-Only" response.
  289. ///
  290. /// - Parameter content: Content of the headers frame.
  291. /// - Parameter context: Channel handler context.
  292. private func readHeaders(
  293. content: HTTP2Frame.FramePayload.Headers,
  294. context: ChannelHandlerContext
  295. ) {
  296. self.logger.trace("received HTTP2 frame", metadata: [
  297. MetadataKey.h2Payload: "HEADERS",
  298. MetadataKey.h2Headers: "\(content.headers)",
  299. MetadataKey.h2EndStream: "\(content.endStream)",
  300. ])
  301. // In the case of a "Trailers-Only" response there's no guarantee that end-of-stream will be set
  302. // on the headers frame: end stream may be sent on an empty data frame as well. If the headers
  303. // contain a gRPC status code then they must be for a "Trailers-Only" response.
  304. if content.endStream || content.headers.contains(name: GRPCHeaderName.statusCode) {
  305. // We have the headers, pass them to the next handler:
  306. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(content.headers)))
  307. // Are they valid headers?
  308. let result = self.stateMachine.receiveEndOfResponseStream(content.headers)
  309. .mapError { error -> GRPCError.WithContext in
  310. // The headers aren't valid so let's figure out a reasonable error to forward:
  311. switch error {
  312. case let .invalidContentType(contentType):
  313. return GRPCError.InvalidContentType(contentType).captureContext()
  314. case let .invalidHTTPStatus(status):
  315. return GRPCError.InvalidHTTPStatus(status).captureContext()
  316. case let .invalidHTTPStatusWithGRPCStatus(status):
  317. return GRPCError.InvalidHTTPStatusWithGRPCStatus(status).captureContext()
  318. case .invalidState:
  319. return GRPCError.InvalidState("parsing end-of-stream trailers").captureContext()
  320. }
  321. }
  322. // Okay, what should we tell the next handler?
  323. switch result {
  324. case let .success(status):
  325. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  326. case let .failure(error):
  327. context.fireErrorCaught(error)
  328. }
  329. } else {
  330. // "Normal" response headers, but are they valid?
  331. let result = self.stateMachine.receiveResponseHeaders(content.headers)
  332. .mapError { error -> GRPCError.WithContext in
  333. // The headers aren't valid so let's figure out a reasonable error to forward:
  334. switch error {
  335. case let .invalidContentType(contentType):
  336. return GRPCError.InvalidContentType(contentType).captureContext()
  337. case let .invalidHTTPStatus(status):
  338. return GRPCError.InvalidHTTPStatus(status).captureContext()
  339. case .unsupportedMessageEncoding:
  340. return GRPCError.CompressionUnsupported().captureContext()
  341. case .invalidState:
  342. return GRPCError.InvalidState("parsing headers").captureContext()
  343. }
  344. }
  345. // Okay, what should we tell the next handler?
  346. switch result {
  347. case .success:
  348. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(content.headers)))
  349. case let .failure(error):
  350. context.fireErrorCaught(error)
  351. }
  352. }
  353. }
  354. /// Reads the content from an HTTP/2 DATA frame received from the server and buffers the bytes
  355. /// necessary to deserialize a message (or messages).
  356. ///
  357. /// - Parameter content: Content of the data frame.
  358. /// - Parameter context: Channel handler context.
  359. private func readData(content: HTTP2Frame.FramePayload.Data, context: ChannelHandlerContext) {
  360. // Note: this is replicated from NIO's HTTP2ToHTTP1ClientCodec.
  361. guard case var .byteBuffer(buffer) = content.data else {
  362. preconditionFailure("Received DATA frame with non-ByteBuffer IOData")
  363. }
  364. self.logger.trace("received HTTP2 frame", metadata: [
  365. MetadataKey.h2Payload: "DATA",
  366. MetadataKey.h2DataBytes: "\(content.data.readableBytes)",
  367. MetadataKey.h2EndStream: "\(content.endStream)",
  368. ])
  369. // Do we have bytes to read? If there are no bytes to read then we can't do anything. This may
  370. // happen if the end-of-stream flag is not set on the trailing headers frame (i.e. the one
  371. // containing the gRPC status code) and an additional empty data frame is sent with the
  372. // end-of-stream flag set.
  373. guard buffer.readableBytes > 0 else {
  374. return
  375. }
  376. // Feed the buffer into the state machine.
  377. let result = self.stateMachine.receiveResponseBuffer(&buffer)
  378. .mapError { error -> GRPCError.WithContext in
  379. switch error {
  380. case .cardinalityViolation:
  381. return GRPCError.StreamCardinalityViolation.response.captureContext()
  382. case .deserializationFailed, .leftOverBytes:
  383. return GRPCError.DeserializationFailure().captureContext()
  384. case let .decompressionLimitExceeded(compressedSize):
  385. return GRPCError.DecompressionLimitExceeded(compressedSize: compressedSize)
  386. .captureContext()
  387. case .invalidState:
  388. return GRPCError.InvalidState("parsing data as a response message").captureContext()
  389. }
  390. }
  391. // Did we get any messages?
  392. switch result {
  393. case let .success(messages):
  394. // Awesome: we got some messages. The state machine guarantees we only get at most a single
  395. // message for unary and client-streaming RPCs.
  396. for message in messages {
  397. // Note: `compressed: false` is currently just a placeholder. This is fine since the message
  398. // context is not currently exposed to the user. If we implement interceptors for the client
  399. // and decide to surface this information then we'll need to extract that information from
  400. // the message reader.
  401. context.fireChannelRead(self.wrapInboundOut(.message(.init(message, compressed: false))))
  402. }
  403. case let .failure(error):
  404. context.fireErrorCaught(error)
  405. }
  406. }
  407. }
  408. // MARK: - GRPCClientChannelHandler: Outbound
  409. extension _GRPCClientChannelHandler: ChannelOutboundHandler {
  410. public typealias OutboundIn = _RawGRPCClientRequestPart
  411. public typealias OutboundOut = HTTP2Frame.FramePayload
  412. public func write(context: ChannelHandlerContext, data: NIOAny,
  413. promise: EventLoopPromise<Void>?) {
  414. switch self.unwrapOutboundIn(data) {
  415. case let .head(requestHead):
  416. // Feed the request into the state machine:
  417. switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
  418. case let .success(headers):
  419. // We're clear to write some headers. Create an appropriate frame and write it.
  420. let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
  421. self.logger.trace("writing HTTP2 frame", metadata: [
  422. MetadataKey.h2Payload: "HEADERS",
  423. MetadataKey.h2Headers: "\(headers)",
  424. MetadataKey.h2EndStream: "false",
  425. ])
  426. context.write(self.wrapOutboundOut(framePayload), promise: promise)
  427. case let .failure(sendRequestHeadersError):
  428. switch sendRequestHeadersError {
  429. case .invalidState:
  430. // This is bad: we need to trigger an error and close the channel.
  431. promise?.fail(sendRequestHeadersError)
  432. context.fireErrorCaught(GRPCError.InvalidState("unable to initiate RPC").captureContext())
  433. }
  434. }
  435. case let .message(request):
  436. // Feed the request message into the state machine:
  437. let result = self.stateMachine.sendRequest(
  438. request.message,
  439. compressed: request.compressed,
  440. allocator: context.channel.allocator
  441. )
  442. switch result {
  443. case let .success(buffer):
  444. // We're clear to send a message; wrap it up in an HTTP/2 frame.
  445. let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
  446. self.logger.trace("writing HTTP2 frame", metadata: [
  447. MetadataKey.h2Payload: "DATA",
  448. MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
  449. MetadataKey.h2EndStream: "false",
  450. ])
  451. context.write(self.wrapOutboundOut(framePayload), promise: promise)
  452. case let .failure(writeError):
  453. switch writeError {
  454. case .cardinalityViolation:
  455. // This is fine: we can ignore the request. The RPC can continue as if nothing went wrong.
  456. promise?.fail(writeError)
  457. case .serializationFailed:
  458. // This is bad: we need to trigger an error and close the channel.
  459. promise?.fail(writeError)
  460. context.fireErrorCaught(GRPCError.SerializationFailure().captureContext())
  461. case .invalidState:
  462. promise?.fail(writeError)
  463. context
  464. .fireErrorCaught(GRPCError.InvalidState("unable to write message").captureContext())
  465. }
  466. }
  467. case .end:
  468. // Okay: can we close the request stream?
  469. switch self.stateMachine.sendEndOfRequestStream() {
  470. case .success:
  471. // We can. Send an empty DATA frame with end-stream set.
  472. let empty = context.channel.allocator.buffer(capacity: 0)
  473. let framePayload = HTTP2Frame.FramePayload
  474. .data(.init(data: .byteBuffer(empty), endStream: true))
  475. self.logger.trace("writing HTTP2 frame", metadata: [
  476. MetadataKey.h2Payload: "DATA",
  477. MetadataKey.h2DataBytes: "0",
  478. MetadataKey.h2EndStream: "true",
  479. ])
  480. context.write(self.wrapOutboundOut(framePayload), promise: promise)
  481. case let .failure(error):
  482. // Why can't we close the request stream?
  483. switch error {
  484. case .alreadyClosed:
  485. // This is fine: we can just ignore it. The RPC can continue as if nothing went wrong.
  486. promise?.fail(error)
  487. case .invalidState:
  488. // This is bad: we need to trigger an error and close the channel.
  489. promise?.fail(error)
  490. context
  491. .fireErrorCaught(
  492. GRPCError.InvalidState("unable to close request stream")
  493. .captureContext()
  494. )
  495. }
  496. }
  497. }
  498. }
  499. }