GRPCClientStateMachine.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import Logging
  20. import SwiftProtobuf
  21. enum ReceiveResponseHeadError: Error, Equatable {
  22. /// The 'content-type' header was missing or the value is not supported by this implementation.
  23. case invalidContentType
  24. /// The HTTP response status from the server was not 200 OK.
  25. case invalidHTTPStatus(HTTPResponseStatus)
  26. /// The encoding used by the server is not supported.
  27. case unsupportedMessageEncoding
  28. /// An invalid state was encountered. This is a serious implementation error.
  29. case invalidState
  30. }
  31. enum ReceiveEndOfResponseStreamError: Error {
  32. /// An invalid state was encountered. This is a serious implementation error.
  33. case invalidState
  34. }
  35. enum SendRequestHeadersError: Error {
  36. /// An invalid state was encountered. This is a serious implementation error.
  37. case invalidState
  38. }
  39. enum SendEndOfRequestStreamError: Error {
  40. /// The request stream has already been closed. This may happen if the RPC was cancelled, timed
  41. /// out, the server terminated the RPC, or the user explicitly closed the stream multiple times.
  42. case alreadyClosed
  43. /// An invalid state was encountered. This is a serious implementation error.
  44. case invalidState
  45. }
  46. /// A state machine for a single gRPC call from the perspective of a client.
  47. ///
  48. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  49. struct GRPCClientStateMachine<Request: Message, Response: Message> {
  50. /// The combined state of the request (client) and response (server) streams for an RPC call.
  51. ///
  52. /// The following states are not possible:
  53. /// - `.clientIdleServerActive`: The client must initiate the call before the server moves
  54. /// from the idle state.
  55. /// - `.clientIdleServerClosed`: The client must initiate the call before the server moves from
  56. /// the idle state.
  57. /// - `.clientActiveServerClosed`: The client may not stream if the server is closed.
  58. ///
  59. /// Note: when a peer (client or server) state is "active" it means that messages _may_ be sent or
  60. /// received. That is, the headers for the stream have been processed by the state machine and
  61. /// end-of-stream has not yet been processed. A stream may expect any number of messages (i.e. up
  62. /// to one for a unary call and many for a streaming call).
  63. enum State {
  64. /// Initial state. Neither request stream nor response stream have been initiated. Holds the
  65. /// pending write state for the request stream and arity for the response stream, respectively.
  66. ///
  67. /// Valid transitions:
  68. /// - `clientActiveServerIdle`: if the client initiates the RPC,
  69. /// - `clientClosedServerClosed`: if the client terminates the RPC.
  70. case clientIdleServerIdle(pendingWriteState: PendingWriteState, readArity: MessageArity)
  71. /// The client has initiated an RPC and has not received initial metadata from the server. Holds
  72. /// the writing state for request stream and arity for the response stream.
  73. ///
  74. /// Valid transitions:
  75. /// - `clientActiveServerActive`: if the server acknowledges the RPC initiation,
  76. /// - `clientClosedServerIdle`: if the client closes the request stream,
  77. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  78. /// RPC with a "trailers-only" response.
  79. case clientActiveServerIdle(writeState: WriteState, readArity: MessageArity)
  80. /// The client has indicated to the server that it has finished sending requests. The server
  81. /// has not yet sent response headers for the RPC. Holds the response stream arity.
  82. ///
  83. /// Valid transitions:
  84. /// - `clientClosedServerActive`: if the server acknowledges the RPC initiation,
  85. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  86. /// RPC with a "trailers-only" response.
  87. case clientClosedServerIdle(readArity: MessageArity)
  88. /// The client has initiated the RPC and the server has acknowledged it. Messages may have been
  89. /// sent and/or received. Holds the request stream write state and response stream read state.
  90. ///
  91. /// Valid transitions:
  92. /// - `clientClosedServerActive`: if the client closes the request stream,
  93. /// - `clientClosedServerClosed`: if the client or server terminates the RPC.
  94. case clientActiveServerActive(writeState: WriteState, readState: ReadState)
  95. /// The client has indicated to the server that it has finished sending requests. The server
  96. /// has acknowledged the RPC. Holds the response stream read state.
  97. ///
  98. /// Valid transitions:
  99. /// - `clientClosedServerClosed`: if the client or server terminate the RPC.
  100. case clientClosedServerActive(readState: ReadState)
  101. /// The RPC has terminated. There are no valid transitions from this state.
  102. case clientClosedServerClosed
  103. }
  104. /// The current state of the state machine.
  105. internal private(set) var state: State {
  106. didSet {
  107. switch (oldValue, self.state) {
  108. // All valid transitions:
  109. case (.clientIdleServerIdle, .clientActiveServerIdle),
  110. (.clientIdleServerIdle, .clientClosedServerClosed),
  111. (.clientActiveServerIdle, .clientActiveServerActive),
  112. (.clientActiveServerIdle, .clientClosedServerIdle),
  113. (.clientActiveServerIdle, .clientClosedServerClosed),
  114. (.clientClosedServerIdle, .clientClosedServerActive),
  115. (.clientClosedServerIdle, .clientClosedServerClosed),
  116. (.clientActiveServerActive, .clientClosedServerActive),
  117. (.clientActiveServerActive, .clientClosedServerClosed),
  118. (.clientClosedServerActive, .clientClosedServerClosed):
  119. break
  120. // Self transitions, also valid:
  121. case (.clientIdleServerIdle, .clientIdleServerIdle),
  122. (.clientActiveServerIdle, .clientActiveServerIdle),
  123. (.clientClosedServerIdle, .clientClosedServerIdle),
  124. (.clientActiveServerActive, .clientActiveServerActive),
  125. (.clientClosedServerActive, .clientClosedServerActive),
  126. (.clientClosedServerClosed, .clientClosedServerClosed):
  127. break
  128. default:
  129. preconditionFailure("invalid state transition from '\(oldValue)' to '\(self.state)'")
  130. }
  131. }
  132. }
  133. private let logger: Logger
  134. /// Creates a state machine representing a gRPC client's request and response stream state.
  135. ///
  136. /// - Parameter requestArity: The expected number of messages on the request stream.
  137. /// - Parameter responseArity: The expected number of messages on the response stream.
  138. /// - Parameter logger: Logger.
  139. init(
  140. requestArity: MessageArity,
  141. responseArity: MessageArity,
  142. logger: Logger
  143. ) {
  144. self.state = .clientIdleServerIdle(
  145. pendingWriteState: .init(arity: requestArity, compression: .none, contentType: .protobuf),
  146. readArity: responseArity
  147. )
  148. self.logger = logger
  149. }
  150. /// Creates a state machine representing a gRPC client's request and response stream state.
  151. ///
  152. /// - Parameter state: The initial state of the state machine.
  153. /// - Parameter logger: Logger.
  154. init(
  155. state: State,
  156. logger: Logger
  157. ) {
  158. self.state = state
  159. self.logger = logger
  160. }
  161. /// Initiates an RPC.
  162. ///
  163. /// The only valid state transition is:
  164. /// - `.clientIdleServerIdle` → `.clientActiveServerIdle`
  165. ///
  166. /// All other states will result in an `.invalidState` error.
  167. ///
  168. /// On success the state will transition to `.clientActiveServerIdle`.
  169. ///
  170. /// - Parameter host: The host which will handle the RPC.
  171. /// - Parameter path: The path of the RPC (e.g. '/echo.Echo/Collect').
  172. /// - Parameter options: Options for this RPC.
  173. /// - Parameter requestID: The unique ID of this request used for logging.
  174. mutating func sendRequestHead(
  175. host: String,
  176. path: String,
  177. options: CallOptions,
  178. requestID: String
  179. ) -> Result<HTTPRequestHead, SendRequestHeadersError> {
  180. return self.state.sendRequestHead(
  181. host: host,
  182. path: path,
  183. options: options,
  184. requestID: requestID
  185. )
  186. }
  187. /// Formats a request to send to the server.
  188. ///
  189. /// The client must be streaming in order for this to return successfully. Therefore the valid
  190. /// state transitions are:
  191. /// - `.clientActiveServerIdle` → `.clientActiveServerIdle`
  192. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  193. ///
  194. /// The client should not attempt to send requests once the request stream is closed, that is
  195. /// from one of the following states:
  196. /// - `.clientClosedServerIdle`
  197. /// - `.clientClosedServerActive`
  198. /// - `.clientClosedServerClosed`
  199. /// Doing so will result in a `.cardinalityViolation`.
  200. ///
  201. /// Sending a message when both peers are idle (in the `.clientIdleServerIdle` state) will result
  202. /// in a `.invalidState` error.
  203. ///
  204. /// - Parameter message: The `Request` to send to the server.
  205. /// - Parameter allocator: A `ByteBufferAllocator` to allocate the buffer into which the encoded
  206. /// request will be written.
  207. mutating func sendRequest(
  208. _ message: Request,
  209. allocator: ByteBufferAllocator
  210. ) -> Result<ByteBuffer, MessageWriteError> {
  211. return self.state.sendRequest(message, allocator: allocator)
  212. }
  213. /// Closes the request stream.
  214. ///
  215. /// The client must be streaming requests in order to terminate the request stream. Valid
  216. /// states transitions are:
  217. /// - `.clientActiveServerIdle` → `.clientClosedServerIdle`
  218. /// - `.clientActiveServerActive` → `.clientClosedServerActive`
  219. ///
  220. /// The client should not attempt to close the request stream if it is already closed, that is
  221. /// from one of the following states:
  222. /// - `.clientClosedServerIdle`
  223. /// - `.clientClosedServerActive`
  224. /// - `.clientClosedServerClosed`
  225. /// Doing so will result in an `.alreadyClosed` error.
  226. ///
  227. /// Closing the request stream when both peers are idle (in the `.clientIdleServerIdle` state)
  228. /// will result in a `.invalidState` error.
  229. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  230. return self.state.sendEndOfRequestStream()
  231. }
  232. /// Receive an acknowledgement of the RPC from the server. This **must not** be a "Trailers-Only"
  233. /// response.
  234. ///
  235. /// The server must be idle in order to receive response headers. The valid state transitions are:
  236. /// - `.clientActiveServerIdle` → `.clientActiveServerActive`
  237. /// - `.clientClosedServerIdle` → `.clientClosedServerActive`
  238. ///
  239. /// The response head will be parsed and validated against the gRPC specification. The following
  240. /// errors may be returned:
  241. /// - `.invalidHTTPStatus` if the status was not "200",
  242. /// - `.invalidContentType` if the "content-type" header does not start with "application/grpc",
  243. /// - `.unsupportedMessageEncoding` if the "grpc-encoding" header is not supported.
  244. ///
  245. /// It is not possible to receive response headers from the following states:
  246. /// - `.clientIdleServerIdle`
  247. /// - `.clientActiveServerActive`
  248. /// - `.clientClosedServerActive`
  249. /// - `.clientClosedServerClosed`
  250. /// Doing so will result in a `.invalidState` error.
  251. ///
  252. /// - Parameter responseHead: The response head received from the server.
  253. mutating func receiveResponseHead(
  254. _ responseHead: HTTPResponseHead
  255. ) -> Result<HTTPHeaders, ReceiveResponseHeadError> {
  256. return self.state.receiveResponseHead(responseHead, logger: self.logger)
  257. }
  258. /// Read a response buffer from the server and return any decoded messages.
  259. ///
  260. /// If the response stream has an expected count of `.one` then this function is guaranteed to
  261. /// produce *at most* one `Response` in the `Result`.
  262. ///
  263. /// To receive a response buffer the server must be streaming. Valid states are:
  264. /// - `.clientClosedServerActive` → `.clientClosedServerActive`
  265. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  266. ///
  267. /// This function will read all of the bytes in the `buffer` and attempt to produce as many
  268. /// messages as possible. This may lead to a number of errors:
  269. /// - `.cardinalityViolation` if more than one message is received when the state reader is
  270. /// expects at most one.
  271. /// - `.leftOverBytes` if bytes remain in the buffer after reading one message when at most one
  272. /// message is expected.
  273. /// - `.deserializationFailed` if the message could not be deserialized.
  274. ///
  275. /// It is not possible to receive response headers from the following states:
  276. /// - `.clientIdleServerIdle`
  277. /// - `.clientClosedServerActive`
  278. /// - `.clientActiveServerActive`
  279. /// - `.clientClosedServerClosed`
  280. /// Doing so will result in a `.invalidState` error.
  281. ///
  282. /// - Parameter buffer: A buffer of bytes received from the server.
  283. mutating func receiveResponseBuffer(
  284. _ buffer: inout ByteBuffer
  285. ) -> Result<[Response], MessageReadError> {
  286. return self.state.receiveResponseBuffer(&buffer)
  287. }
  288. /// Receive the end of the response stream from the server and parse the results into
  289. /// a `GRPCStatus`.
  290. ///
  291. /// To close the response stream the server must be streaming or idle (since the server may choose
  292. /// to 'fast fail' the RPC). Valid states are:
  293. /// - `.clientActiveServerIdle` → `.clientClosedServerClosed`
  294. /// - `.clientActiveServerActive` → `.clientClosedServerClosed`
  295. /// - `.clientClosedServerIdle` → `.clientClosedServerClosed`
  296. /// - `.clientClosedServerActive` → `.clientClosedServerClosed`
  297. ///
  298. /// It is not possible to receive an end-of-stream if the RPC has not been initiated or has
  299. /// already been terminated. That is, in one of the following states:
  300. /// - `.clientIdleServerIdle`
  301. /// - `.clientClosedServerClosed`
  302. /// Doing so will result in a `.invalidState` error.
  303. ///
  304. /// - Parameter trailers: The trailers to parse.
  305. mutating func receiveEndOfResponseStream(
  306. _ trailers: HTTPHeaders
  307. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  308. return self.state.receiveEndOfResponseStream(trailers)
  309. }
  310. }
  311. extension GRPCClientStateMachine.State {
  312. /// See `GRPCClientStateMachine.sendRequestHead(host:path:options:requestID)`.
  313. mutating func sendRequestHead(
  314. host: String,
  315. path: String,
  316. options: CallOptions,
  317. requestID: String
  318. ) -> Result<HTTPRequestHead, SendRequestHeadersError> {
  319. let result: Result<HTTPRequestHead, SendRequestHeadersError>
  320. switch self {
  321. case let .clientIdleServerIdle(pendingWriteState, readArity):
  322. let head = self.makeRequestHead(host: host, path: path, options: options, requestID: requestID)
  323. result = .success(head)
  324. self = .clientActiveServerIdle(writeState: pendingWriteState.makeWriteState(), readArity: readArity)
  325. case .clientActiveServerIdle,
  326. .clientClosedServerIdle,
  327. .clientClosedServerActive,
  328. .clientActiveServerActive,
  329. .clientClosedServerClosed:
  330. result = .failure(.invalidState)
  331. }
  332. return result
  333. }
  334. /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
  335. mutating func sendRequest(
  336. _ message: Request,
  337. allocator: ByteBufferAllocator
  338. ) -> Result<ByteBuffer, MessageWriteError> {
  339. let result: Result<ByteBuffer, MessageWriteError>
  340. switch self {
  341. case .clientActiveServerIdle(var writeState, let readArity):
  342. result = writeState.write(message, allocator: allocator)
  343. self = .clientActiveServerIdle(writeState: writeState, readArity: readArity)
  344. case .clientActiveServerActive(var writeState, let readState):
  345. result = writeState.write(message, allocator: allocator)
  346. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  347. case .clientClosedServerIdle,
  348. .clientClosedServerActive,
  349. .clientClosedServerClosed:
  350. result = .failure(.cardinalityViolation)
  351. case .clientIdleServerIdle:
  352. result = .failure(.invalidState)
  353. }
  354. return result
  355. }
  356. /// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
  357. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  358. let result: Result<Void, SendEndOfRequestStreamError>
  359. switch self {
  360. case .clientActiveServerIdle(_, let readArity):
  361. result = .success(())
  362. self = .clientClosedServerIdle(readArity: readArity)
  363. case .clientActiveServerActive(_, let readState):
  364. result = .success(())
  365. self = .clientClosedServerActive(readState: readState)
  366. case .clientClosedServerIdle,
  367. .clientClosedServerActive,
  368. .clientClosedServerClosed:
  369. result = .failure(.alreadyClosed)
  370. case .clientIdleServerIdle:
  371. result = .failure(.invalidState)
  372. }
  373. return result
  374. }
  375. /// See `GRPCClientStateMachine.receiveResponseHead(_:)`.
  376. mutating func receiveResponseHead(
  377. _ responseHead: HTTPResponseHead,
  378. logger: Logger
  379. ) -> Result<HTTPHeaders, ReceiveResponseHeadError> {
  380. let result: Result<HTTPHeaders, ReceiveResponseHeadError>
  381. switch self {
  382. case let .clientActiveServerIdle(writeState, readArity):
  383. switch self.parseResponseHead(responseHead, responseArity: readArity, logger: logger) {
  384. case .success(let readState):
  385. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  386. result = .success(responseHead.headers)
  387. case .failure(let error):
  388. result = .failure(error)
  389. }
  390. case let .clientClosedServerIdle(readArity):
  391. switch self.parseResponseHead(responseHead, responseArity: readArity, logger: logger) {
  392. case .success(let readState):
  393. self = .clientClosedServerActive(readState: readState)
  394. result = .success(responseHead.headers)
  395. case .failure(let error):
  396. result = .failure(error)
  397. }
  398. case .clientIdleServerIdle,
  399. .clientClosedServerActive,
  400. .clientActiveServerActive,
  401. .clientClosedServerClosed:
  402. result = .failure(.invalidState)
  403. }
  404. return result
  405. }
  406. /// See `GRPCClientStateMachine.receiveResponseBuffer(_:)`.
  407. mutating func receiveResponseBuffer(
  408. _ buffer: inout ByteBuffer
  409. ) -> Result<[Response], MessageReadError> {
  410. let result: Result<[Response], MessageReadError>
  411. switch self {
  412. case .clientClosedServerActive(var readState):
  413. result = readState.readMessages(&buffer)
  414. self = .clientClosedServerActive(readState: readState)
  415. case .clientActiveServerActive(let writeState, var readState):
  416. result = readState.readMessages(&buffer)
  417. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  418. case .clientIdleServerIdle,
  419. .clientActiveServerIdle,
  420. .clientClosedServerIdle,
  421. .clientClosedServerClosed:
  422. result = .failure(.invalidState)
  423. }
  424. return result
  425. }
  426. /// See `GRPCClientStateMachine.receiveEndOfResponseStream(_:)`.
  427. mutating func receiveEndOfResponseStream(
  428. _ trailers: HTTPHeaders
  429. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  430. let result: Result<GRPCStatus, ReceiveEndOfResponseStreamError>
  431. switch self {
  432. case .clientActiveServerActive,
  433. .clientActiveServerIdle,
  434. .clientClosedServerIdle,
  435. .clientClosedServerActive:
  436. result = .success(self.parseTrailers(trailers))
  437. self = .clientClosedServerClosed
  438. case .clientIdleServerIdle,
  439. .clientClosedServerClosed:
  440. result = .failure(.invalidState)
  441. }
  442. return result
  443. }
  444. /// Makes the request head (`Request-Headers` in the specification) used to initiate an RPC
  445. /// call.
  446. ///
  447. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  448. ///
  449. /// - Parameter host: The host serving the RPC.
  450. /// - Parameter options: Any options related to the call.
  451. /// - Parameter requestID: A request ID associated with the call. An additional header will be
  452. /// added using this value if `options.requestIDHeader` is specified.
  453. private func makeRequestHead(
  454. host: String,
  455. path: String,
  456. options: CallOptions,
  457. requestID: String
  458. ) -> HTTPRequestHead {
  459. // Note: we don't currently set the 'grpc-encoding' header, if we do we will need to feed that
  460. // encoded into the message writer.
  461. var headers: HTTPHeaders = [
  462. "content-type": "application/grpc",
  463. "te": "trailers", // Used to detect incompatible proxies, part of the gRPC specification.
  464. "user-agent": "grpc-swift-nio", // TODO: Add a more specific user-agent.
  465. "host": host, // NIO's HTTP2ToHTTP1Codec replaces "host" with ":authority"
  466. ]
  467. // Add the timeout header, if a timeout was specified.
  468. if options.timeout != .infinite {
  469. headers.add(name: GRPCHeaderName.timeout, value: String(describing: options.timeout))
  470. }
  471. // Add user-defined custom metadata: this should come after the call definition headers.
  472. headers.add(contentsOf: options.customMetadata)
  473. // Add a tracing header if the user specified it.
  474. if let headerName = options.requestIDHeader {
  475. headers.add(name: headerName, value: requestID)
  476. }
  477. return HTTPRequestHead(
  478. version: HTTPVersion(major: 2, minor: 0),
  479. method: options.cacheable ? .GET : .POST,
  480. uri: path,
  481. headers: headers
  482. )
  483. }
  484. /// Parses the response head ("Response-Headers" in the specification) from server into
  485. /// a `ReadState`.
  486. ///
  487. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  488. ///
  489. /// - Parameter headers: The headers to parse.
  490. private func parseResponseHead(
  491. _ head: HTTPResponseHead,
  492. responseArity: MessageArity,
  493. logger: Logger
  494. ) -> Result<ReadState, ReceiveResponseHeadError> {
  495. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  496. //
  497. // "Implementations should expect broken deployments to send non-200 HTTP status codes in
  498. // responses as well as a variety of non-GRPC content-types and to omit Status & Status-Message.
  499. // Implementations must synthesize a Status & Status-Message to propagate to the application
  500. // layer when this occurs."
  501. guard head.status == .ok else {
  502. return .failure(.invalidHTTPStatus(head.status))
  503. }
  504. guard head.headers["content-type"].first?.starts(with: "application/grpc") ?? false else {
  505. return .failure(.invalidContentType)
  506. }
  507. // What compression mechanism is the server using, if any?
  508. let compression = CompressionMechanism(value: head.headers[GRPCHeaderName.encoding].first)
  509. // From: https://github.com/grpc/grpc/blob/master/doc/compression.md
  510. //
  511. // "If a server sent data which is compressed by an algorithm that is not supported by the
  512. // client, an INTERNAL error status will occur on the client side."
  513. guard compression.supported else {
  514. return .failure(.unsupportedMessageEncoding)
  515. }
  516. let reader = LengthPrefixedMessageReader(
  517. mode: .client,
  518. compressionMechanism: compression,
  519. logger: logger
  520. )
  521. return .success(.reading(responseArity, reader))
  522. }
  523. /// Parses the response trailers ("Trailers" in the specification) from the server into
  524. /// a `GRPCStatus`.
  525. ///
  526. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  527. ///
  528. /// - Parameter trailers: Trailers to parse.
  529. private func parseTrailers(_ trailers: HTTPHeaders) -> GRPCStatus {
  530. // Extract the "Status"
  531. let code = trailers[GRPCHeaderName.statusCode].first
  532. .flatMap(Int.init)
  533. .flatMap(GRPCStatus.Code.init) ?? .unknown
  534. // Extract and unmarshall the "Status-Message"
  535. let message = trailers[GRPCHeaderName.statusMessage].first
  536. .map(GRPCStatusMessageMarshaller.unmarshall)
  537. return .init(code: code, message: message)
  538. }
  539. }