GRPCClientStateMachine.swift 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHPACK
  20. import Logging
  21. import SwiftProtobuf
  22. enum ReceiveResponseHeadError: Error, Equatable {
  23. /// The 'content-type' header was missing or the value is not supported by this implementation.
  24. case invalidContentType
  25. /// The HTTP response status from the server was not 200 OK.
  26. case invalidHTTPStatus(HTTPResponseStatus?)
  27. /// The encoding used by the server is not supported.
  28. case unsupportedMessageEncoding(String)
  29. /// An invalid state was encountered. This is a serious implementation error.
  30. case invalidState
  31. }
  32. enum ReceiveEndOfResponseStreamError: Error {
  33. /// The 'content-type' header was missing or the value is not supported by this implementation.
  34. case invalidContentType
  35. /// The HTTP response status from the server was not 200 OK.
  36. case invalidHTTPStatus(HTTPResponseStatus?)
  37. /// The HTTP response status from the server was not 200 OK but the "grpc-status" header contained
  38. /// a valid value.
  39. case invalidHTTPStatusWithGRPCStatus(GRPCStatus)
  40. /// An invalid state was encountered. This is a serious implementation error.
  41. case invalidState
  42. }
  43. enum SendRequestHeadersError: Error {
  44. /// An invalid state was encountered. This is a serious implementation error.
  45. case invalidState
  46. }
  47. enum SendEndOfRequestStreamError: Error {
  48. /// The request stream has already been closed. This may happen if the RPC was cancelled, timed
  49. /// out, the server terminated the RPC, or the user explicitly closed the stream multiple times.
  50. case alreadyClosed
  51. /// An invalid state was encountered. This is a serious implementation error.
  52. case invalidState
  53. }
  54. /// A state machine for a single gRPC call from the perspective of a client.
  55. ///
  56. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  57. struct GRPCClientStateMachine<Request: Message, Response: Message> {
  58. /// The combined state of the request (client) and response (server) streams for an RPC call.
  59. ///
  60. /// The following states are not possible:
  61. /// - `.clientIdleServerActive`: The client must initiate the call before the server moves
  62. /// from the idle state.
  63. /// - `.clientIdleServerClosed`: The client must initiate the call before the server moves from
  64. /// the idle state.
  65. /// - `.clientActiveServerClosed`: The client may not stream if the server is closed.
  66. ///
  67. /// Note: when a peer (client or server) state is "active" it means that messages _may_ be sent or
  68. /// received. That is, the headers for the stream have been processed by the state machine and
  69. /// end-of-stream has not yet been processed. A stream may expect any number of messages (i.e. up
  70. /// to one for a unary call and many for a streaming call).
  71. enum State {
  72. /// Initial state. Neither request stream nor response stream have been initiated. Holds the
  73. /// pending write state for the request stream and arity for the response stream, respectively.
  74. ///
  75. /// Valid transitions:
  76. /// - `clientActiveServerIdle`: if the client initiates the RPC,
  77. /// - `clientClosedServerClosed`: if the client terminates the RPC.
  78. case clientIdleServerIdle(pendingWriteState: PendingWriteState, readArity: MessageArity)
  79. /// The client has initiated an RPC and has not received initial metadata from the server. Holds
  80. /// the writing state for request stream and arity for the response stream.
  81. ///
  82. /// Valid transitions:
  83. /// - `clientActiveServerActive`: if the server acknowledges the RPC initiation,
  84. /// - `clientClosedServerIdle`: if the client closes the request stream,
  85. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  86. /// RPC with a "trailers-only" response.
  87. case clientActiveServerIdle(writeState: WriteState, readArity: MessageArity)
  88. /// The client has indicated to the server that it has finished sending requests. The server
  89. /// has not yet sent response headers for the RPC. Holds the response stream arity.
  90. ///
  91. /// Valid transitions:
  92. /// - `clientClosedServerActive`: if the server acknowledges the RPC initiation,
  93. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  94. /// RPC with a "trailers-only" response.
  95. case clientClosedServerIdle(readArity: MessageArity)
  96. /// The client has initiated the RPC and the server has acknowledged it. Messages may have been
  97. /// sent and/or received. Holds the request stream write state and response stream read state.
  98. ///
  99. /// Valid transitions:
  100. /// - `clientClosedServerActive`: if the client closes the request stream,
  101. /// - `clientClosedServerClosed`: if the client or server terminates the RPC.
  102. case clientActiveServerActive(writeState: WriteState, readState: ReadState)
  103. /// The client has indicated to the server that it has finished sending requests. The server
  104. /// has acknowledged the RPC. Holds the response stream read state.
  105. ///
  106. /// Valid transitions:
  107. /// - `clientClosedServerClosed`: if the client or server terminate the RPC.
  108. case clientClosedServerActive(readState: ReadState)
  109. /// The RPC has terminated. There are no valid transitions from this state.
  110. case clientClosedServerClosed
  111. }
  112. /// The current state of the state machine.
  113. internal private(set) var state: State {
  114. didSet {
  115. switch (oldValue, self.state) {
  116. // All valid transitions:
  117. case (.clientIdleServerIdle, .clientActiveServerIdle),
  118. (.clientIdleServerIdle, .clientClosedServerClosed),
  119. (.clientActiveServerIdle, .clientActiveServerActive),
  120. (.clientActiveServerIdle, .clientClosedServerIdle),
  121. (.clientActiveServerIdle, .clientClosedServerClosed),
  122. (.clientClosedServerIdle, .clientClosedServerActive),
  123. (.clientClosedServerIdle, .clientClosedServerClosed),
  124. (.clientActiveServerActive, .clientClosedServerActive),
  125. (.clientActiveServerActive, .clientClosedServerClosed),
  126. (.clientClosedServerActive, .clientClosedServerClosed):
  127. break
  128. // Self transitions, also valid:
  129. case (.clientIdleServerIdle, .clientIdleServerIdle),
  130. (.clientActiveServerIdle, .clientActiveServerIdle),
  131. (.clientClosedServerIdle, .clientClosedServerIdle),
  132. (.clientActiveServerActive, .clientActiveServerActive),
  133. (.clientClosedServerActive, .clientClosedServerActive),
  134. (.clientClosedServerClosed, .clientClosedServerClosed):
  135. break
  136. default:
  137. preconditionFailure("invalid state transition from '\(oldValue)' to '\(self.state)'")
  138. }
  139. }
  140. }
  141. private let logger: Logger
  142. /// Creates a state machine representing a gRPC client's request and response stream state.
  143. ///
  144. /// - Parameter requestArity: The expected number of messages on the request stream.
  145. /// - Parameter responseArity: The expected number of messages on the response stream.
  146. /// - Parameter logger: Logger.
  147. init(
  148. requestArity: MessageArity,
  149. responseArity: MessageArity,
  150. logger: Logger
  151. ) {
  152. self.state = .clientIdleServerIdle(
  153. pendingWriteState: .init(arity: requestArity, compression: .none, contentType: .protobuf),
  154. readArity: responseArity
  155. )
  156. self.logger = logger
  157. }
  158. /// Creates a state machine representing a gRPC client's request and response stream state.
  159. ///
  160. /// - Parameter state: The initial state of the state machine.
  161. /// - Parameter logger: Logger.
  162. init(
  163. state: State,
  164. logger: Logger
  165. ) {
  166. self.state = state
  167. self.logger = logger
  168. }
  169. /// Initiates an RPC.
  170. ///
  171. /// The only valid state transition is:
  172. /// - `.clientIdleServerIdle` → `.clientActiveServerIdle`
  173. ///
  174. /// All other states will result in an `.invalidState` error.
  175. ///
  176. /// On success the state will transition to `.clientActiveServerIdle`.
  177. ///
  178. /// - Parameter requestHead: The client request head for the RPC.
  179. mutating func sendRequestHeaders(
  180. requestHead: GRPCRequestHead
  181. ) -> Result<HPACKHeaders, SendRequestHeadersError> {
  182. return self.state.sendRequestHeaders(requestHead: requestHead)
  183. }
  184. /// Formats a request to send to the server.
  185. ///
  186. /// The client must be streaming in order for this to return successfully. Therefore the valid
  187. /// state transitions are:
  188. /// - `.clientActiveServerIdle` → `.clientActiveServerIdle`
  189. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  190. ///
  191. /// The client should not attempt to send requests once the request stream is closed, that is
  192. /// from one of the following states:
  193. /// - `.clientClosedServerIdle`
  194. /// - `.clientClosedServerActive`
  195. /// - `.clientClosedServerClosed`
  196. /// Doing so will result in a `.cardinalityViolation`.
  197. ///
  198. /// Sending a message when both peers are idle (in the `.clientIdleServerIdle` state) will result
  199. /// in a `.invalidState` error.
  200. ///
  201. /// - Parameter message: The `Request` to send to the server.
  202. /// - Parameter allocator: A `ByteBufferAllocator` to allocate the buffer into which the encoded
  203. /// request will be written.
  204. mutating func sendRequest(
  205. _ message: Request,
  206. allocator: ByteBufferAllocator
  207. ) -> Result<ByteBuffer, MessageWriteError> {
  208. return self.state.sendRequest(message, allocator: allocator)
  209. }
  210. /// Closes the request stream.
  211. ///
  212. /// The client must be streaming requests in order to terminate the request stream. Valid
  213. /// states transitions are:
  214. /// - `.clientActiveServerIdle` → `.clientClosedServerIdle`
  215. /// - `.clientActiveServerActive` → `.clientClosedServerActive`
  216. ///
  217. /// The client should not attempt to close the request stream if it is already closed, that is
  218. /// from one of the following states:
  219. /// - `.clientClosedServerIdle`
  220. /// - `.clientClosedServerActive`
  221. /// - `.clientClosedServerClosed`
  222. /// Doing so will result in an `.alreadyClosed` error.
  223. ///
  224. /// Closing the request stream when both peers are idle (in the `.clientIdleServerIdle` state)
  225. /// will result in a `.invalidState` error.
  226. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  227. return self.state.sendEndOfRequestStream()
  228. }
  229. /// Receive an acknowledgement of the RPC from the server. This **must not** be a "Trailers-Only"
  230. /// response.
  231. ///
  232. /// The server must be idle in order to receive response headers. The valid state transitions are:
  233. /// - `.clientActiveServerIdle` → `.clientActiveServerActive`
  234. /// - `.clientClosedServerIdle` → `.clientClosedServerActive`
  235. ///
  236. /// The response head will be parsed and validated against the gRPC specification. The following
  237. /// errors may be returned:
  238. /// - `.invalidHTTPStatus` if the status was not "200",
  239. /// - `.invalidContentType` if the "content-type" header does not start with "application/grpc",
  240. /// - `.unsupportedMessageEncoding` if the "grpc-encoding" header is not supported.
  241. ///
  242. /// It is not possible to receive response headers from the following states:
  243. /// - `.clientIdleServerIdle`
  244. /// - `.clientActiveServerActive`
  245. /// - `.clientClosedServerActive`
  246. /// - `.clientClosedServerClosed`
  247. /// Doing so will result in a `.invalidState` error.
  248. ///
  249. /// - Parameter headers: The headers received from the server.
  250. mutating func receiveResponseHeaders(
  251. _ headers: HPACKHeaders
  252. ) -> Result<Void, ReceiveResponseHeadError> {
  253. return self.state.receiveResponseHeaders(headers, logger: self.logger)
  254. }
  255. /// Read a response buffer from the server and return any decoded messages.
  256. ///
  257. /// If the response stream has an expected count of `.one` then this function is guaranteed to
  258. /// produce *at most* one `Response` in the `Result`.
  259. ///
  260. /// To receive a response buffer the server must be streaming. Valid states are:
  261. /// - `.clientClosedServerActive` → `.clientClosedServerActive`
  262. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  263. ///
  264. /// This function will read all of the bytes in the `buffer` and attempt to produce as many
  265. /// messages as possible. This may lead to a number of errors:
  266. /// - `.cardinalityViolation` if more than one message is received when the state reader is
  267. /// expects at most one.
  268. /// - `.leftOverBytes` if bytes remain in the buffer after reading one message when at most one
  269. /// message is expected.
  270. /// - `.deserializationFailed` if the message could not be deserialized.
  271. ///
  272. /// It is not possible to receive response headers from the following states:
  273. /// - `.clientIdleServerIdle`
  274. /// - `.clientClosedServerActive`
  275. /// - `.clientActiveServerActive`
  276. /// - `.clientClosedServerClosed`
  277. /// Doing so will result in a `.invalidState` error.
  278. ///
  279. /// - Parameter buffer: A buffer of bytes received from the server.
  280. mutating func receiveResponseBuffer(
  281. _ buffer: inout ByteBuffer
  282. ) -> Result<[Response], MessageReadError> {
  283. return self.state.receiveResponseBuffer(&buffer)
  284. }
  285. /// Receive the end of the response stream from the server and parse the results into
  286. /// a `GRPCStatus`.
  287. ///
  288. /// To close the response stream the server must be streaming or idle (since the server may choose
  289. /// to 'fast fail' the RPC). Valid states are:
  290. /// - `.clientActiveServerIdle` → `.clientClosedServerClosed`
  291. /// - `.clientActiveServerActive` → `.clientClosedServerClosed`
  292. /// - `.clientClosedServerIdle` → `.clientClosedServerClosed`
  293. /// - `.clientClosedServerActive` → `.clientClosedServerClosed`
  294. ///
  295. /// It is not possible to receive an end-of-stream if the RPC has not been initiated or has
  296. /// already been terminated. That is, in one of the following states:
  297. /// - `.clientIdleServerIdle`
  298. /// - `.clientClosedServerClosed`
  299. /// Doing so will result in a `.invalidState` error.
  300. ///
  301. /// - Parameter trailers: The trailers to parse.
  302. mutating func receiveEndOfResponseStream(
  303. _ trailers: HPACKHeaders
  304. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  305. return self.state.receiveEndOfResponseStream(trailers)
  306. }
  307. }
  308. extension GRPCClientStateMachine.State {
  309. /// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
  310. mutating func sendRequestHeaders(
  311. requestHead: GRPCRequestHead
  312. ) -> Result<HPACKHeaders, SendRequestHeadersError> {
  313. let result: Result<HPACKHeaders, SendRequestHeadersError>
  314. switch self {
  315. case let .clientIdleServerIdle(pendingWriteState, responseArity):
  316. let headers = self.makeRequestHeaders(
  317. method: requestHead.method,
  318. scheme: requestHead.scheme,
  319. host: requestHead.host,
  320. path: requestHead.path,
  321. timeout: requestHead.timeout,
  322. customMetadata: requestHead.customMetadata
  323. )
  324. result = .success(headers)
  325. self = .clientActiveServerIdle(
  326. writeState: pendingWriteState.makeWriteState(),
  327. readArity: responseArity
  328. )
  329. case .clientActiveServerIdle,
  330. .clientClosedServerIdle,
  331. .clientClosedServerActive,
  332. .clientActiveServerActive,
  333. .clientClosedServerClosed:
  334. result = .failure(.invalidState)
  335. }
  336. return result
  337. }
  338. /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
  339. mutating func sendRequest(
  340. _ message: Request,
  341. allocator: ByteBufferAllocator
  342. ) -> Result<ByteBuffer, MessageWriteError> {
  343. let result: Result<ByteBuffer, MessageWriteError>
  344. switch self {
  345. case .clientActiveServerIdle(var writeState, let readArity):
  346. result = writeState.write(message, allocator: allocator)
  347. self = .clientActiveServerIdle(writeState: writeState, readArity: readArity)
  348. case .clientActiveServerActive(var writeState, let readState):
  349. result = writeState.write(message, allocator: allocator)
  350. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  351. case .clientClosedServerIdle,
  352. .clientClosedServerActive,
  353. .clientClosedServerClosed:
  354. result = .failure(.cardinalityViolation)
  355. case .clientIdleServerIdle:
  356. result = .failure(.invalidState)
  357. }
  358. return result
  359. }
  360. /// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
  361. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  362. let result: Result<Void, SendEndOfRequestStreamError>
  363. switch self {
  364. case .clientActiveServerIdle(_, let readArity):
  365. result = .success(())
  366. self = .clientClosedServerIdle(readArity: readArity)
  367. case .clientActiveServerActive(_, let readState):
  368. result = .success(())
  369. self = .clientClosedServerActive(readState: readState)
  370. case .clientClosedServerIdle,
  371. .clientClosedServerActive,
  372. .clientClosedServerClosed:
  373. result = .failure(.alreadyClosed)
  374. case .clientIdleServerIdle:
  375. result = .failure(.invalidState)
  376. }
  377. return result
  378. }
  379. /// See `GRPCClientStateMachine.receiveResponseHeaders(_:)`.
  380. mutating func receiveResponseHeaders(
  381. _ headers: HPACKHeaders,
  382. logger: Logger
  383. ) -> Result<Void, ReceiveResponseHeadError> {
  384. let result: Result<Void, ReceiveResponseHeadError>
  385. switch self {
  386. case let .clientActiveServerIdle(writeState, readArity):
  387. result = self.parseResponseHeaders(headers, arity: readArity, logger: logger).map { readState in
  388. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  389. }
  390. case let .clientClosedServerIdle(readArity):
  391. result = self.parseResponseHeaders(headers, arity: readArity, logger: logger).map { readState in
  392. self = .clientClosedServerActive(readState: readState)
  393. }
  394. case .clientIdleServerIdle,
  395. .clientClosedServerActive,
  396. .clientActiveServerActive,
  397. .clientClosedServerClosed:
  398. result = .failure(.invalidState)
  399. }
  400. return result
  401. }
  402. /// See `GRPCClientStateMachine.receiveResponseBuffer(_:)`.
  403. mutating func receiveResponseBuffer(
  404. _ buffer: inout ByteBuffer
  405. ) -> Result<[Response], MessageReadError> {
  406. let result: Result<[Response], MessageReadError>
  407. switch self {
  408. case .clientClosedServerActive(var readState):
  409. result = readState.readMessages(&buffer)
  410. self = .clientClosedServerActive(readState: readState)
  411. case .clientActiveServerActive(let writeState, var readState):
  412. result = readState.readMessages(&buffer)
  413. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  414. case .clientIdleServerIdle,
  415. .clientActiveServerIdle,
  416. .clientClosedServerIdle,
  417. .clientClosedServerClosed:
  418. result = .failure(.invalidState)
  419. }
  420. return result
  421. }
  422. /// See `GRPCClientStateMachine.receiveEndOfResponseStream(_:)`.
  423. mutating func receiveEndOfResponseStream(
  424. _ trailers: HPACKHeaders
  425. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  426. let result: Result<GRPCStatus, ReceiveEndOfResponseStreamError>
  427. switch self {
  428. case .clientActiveServerIdle,
  429. .clientClosedServerIdle:
  430. result = self.parseTrailersOnly(trailers).map { status in
  431. self = .clientClosedServerClosed
  432. return status
  433. }
  434. case .clientActiveServerActive,
  435. .clientClosedServerActive:
  436. result = .success(self.parseTrailers(trailers))
  437. self = .clientClosedServerClosed
  438. case .clientIdleServerIdle,
  439. .clientClosedServerClosed:
  440. result = .failure(.invalidState)
  441. }
  442. return result
  443. }
  444. /// Makes the request headers (`Request-Headers` in the specification) used to initiate an RPC
  445. /// call.
  446. ///
  447. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  448. ///
  449. /// - Parameter host: The host serving the RPC.
  450. /// - Parameter options: Any options related to the call.
  451. /// - Parameter requestID: A request ID associated with the call. An additional header will be
  452. /// added using this value if `options.requestIDHeader` is specified.
  453. private func makeRequestHeaders(
  454. method: String,
  455. scheme: String,
  456. host: String,
  457. path: String,
  458. timeout: GRPCTimeout,
  459. customMetadata: HPACKHeaders
  460. ) -> HPACKHeaders {
  461. // Note: we don't currently set the 'grpc-encoding' header, if we do we will need to feed that
  462. // encoded into the message writer.
  463. var headers: HPACKHeaders = [
  464. ":method": method,
  465. ":path": path,
  466. ":authority": host,
  467. ":scheme": scheme,
  468. "content-type": "application/grpc+proto",
  469. "te": "trailers", // Used to detect incompatible proxies, part of the gRPC specification.
  470. "user-agent": "grpc-swift-nio", // TODO: Add a more specific user-agent.
  471. ]
  472. // Add the timeout header, if a timeout was specified.
  473. if timeout != .infinite {
  474. headers.add(name: GRPCHeaderName.timeout, value: String(describing: timeout))
  475. }
  476. // Add user-defined custom metadata: this should come after the call definition headers.
  477. headers.add(contentsOf: customMetadata)
  478. return headers
  479. }
  480. /// Parses the response headers ("Response-Headers" in the specification) from the server into
  481. /// a `ReadState`.
  482. ///
  483. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  484. ///
  485. /// - Parameter headers: The headers to parse.
  486. private func parseResponseHeaders(
  487. _ headers: HPACKHeaders,
  488. arity: MessageArity,
  489. logger: Logger
  490. ) -> Result<ReadState, ReceiveResponseHeadError> {
  491. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  492. //
  493. // "Implementations should expect broken deployments to send non-200 HTTP status codes in
  494. // responses as well as a variety of non-GRPC content-types and to omit Status & Status-Message.
  495. // Implementations must synthesize a Status & Status-Message to propagate to the application
  496. // layer when this occurs."
  497. let statusHeader = headers[":status"].first
  498. let responseStatus = statusHeader.flatMap(Int.init).map { code in
  499. HTTPResponseStatus(statusCode: code)
  500. } ?? .preconditionFailed
  501. guard responseStatus == .ok else {
  502. return .failure(.invalidHTTPStatus(responseStatus))
  503. }
  504. guard headers["content-type"].first.flatMap(ContentType.init) != nil else {
  505. return .failure(.invalidContentType)
  506. }
  507. // What compression mechanism is the server using, if any?
  508. let compression = CompressionMechanism(value: headers[GRPCHeaderName.encoding].first)
  509. // From: https://github.com/grpc/grpc/blob/master/doc/compression.md
  510. //
  511. // "If a server sent data which is compressed by an algorithm that is not supported by the
  512. // client, an INTERNAL error status will occur on the client side."
  513. guard compression.supported else {
  514. return .failure(.unsupportedMessageEncoding(compression.rawValue))
  515. }
  516. let reader = LengthPrefixedMessageReader(
  517. mode: .client,
  518. compressionMechanism: compression,
  519. logger: logger
  520. )
  521. return .success(.reading(arity, reader))
  522. }
  523. /// Parses the response trailers ("Trailers" in the specification) from the server into
  524. /// a `GRPCStatus`.
  525. ///
  526. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  527. ///
  528. /// - Parameter trailers: Trailers to parse.
  529. private func parseTrailers(_ trailers: HPACKHeaders) -> GRPCStatus {
  530. // Extract the "Status" and "Status-Message"
  531. let code = self.readStatusCode(from: trailers) ?? .unknown
  532. let message = self.readStatusMessage(from: trailers)
  533. return .init(code: code, message: message)
  534. }
  535. private func readStatusCode(from trailers: HPACKHeaders) -> GRPCStatus.Code? {
  536. return trailers[GRPCHeaderName.statusCode].first
  537. .flatMap(Int.init)
  538. .flatMap(GRPCStatus.Code.init)
  539. }
  540. private func readStatusMessage(from trailers: HPACKHeaders) -> String? {
  541. return trailers[GRPCHeaderName.statusMessage].first
  542. .map(GRPCStatusMessageMarshaller.unmarshall)
  543. }
  544. /// Parses a "Trailers-Only" response from the server into a `GRPCStatus`.
  545. ///
  546. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  547. ///
  548. /// - Parameter trailers: Trailers to parse.
  549. private func parseTrailersOnly(
  550. _ trailers: HPACKHeaders
  551. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  552. // We need to check whether we have a valid HTTP status in the headers, if we don't then we also
  553. // need to check whether we have a gRPC status as it should take preference over a synthesising
  554. // one from the ":status".
  555. //
  556. // See: https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  557. let statusHeader = trailers[":status"].first
  558. guard let status = statusHeader.flatMap(Int.init).map({ HTTPResponseStatus(statusCode: $0) })
  559. else {
  560. return .failure(.invalidHTTPStatus(nil))
  561. }
  562. guard status == .ok else {
  563. if let code = self.readStatusCode(from: trailers) {
  564. let message = self.readStatusMessage(from: trailers)
  565. return .failure(.invalidHTTPStatusWithGRPCStatus(.init(code: code, message: message)))
  566. } else {
  567. return .failure(.invalidHTTPStatus(status))
  568. }
  569. }
  570. guard trailers["content-type"].first.flatMap(ContentType.init) != nil else {
  571. return .failure(.invalidContentType)
  572. }
  573. // We've verified the status and content type are okay: parse the trailers.
  574. return .success(self.parseTrailers(trailers))
  575. }
  576. }