GRPCClientStateMachine.swift 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. enum ReceiveResponseHeadError: Error, Equatable {
  23. /// The 'content-type' header was missing or the value is not supported by this implementation.
  24. case invalidContentType(String?)
  25. /// The HTTP response status from the server was not 200 OK.
  26. case invalidHTTPStatus(String?)
  27. /// The encoding used by the server is not supported.
  28. case unsupportedMessageEncoding(String)
  29. /// An invalid state was encountered. This is a serious implementation error.
  30. case invalidState
  31. }
  32. enum ReceiveEndOfResponseStreamError: Error, Equatable {
  33. /// The 'content-type' header was missing or the value is not supported by this implementation.
  34. case invalidContentType(String?)
  35. /// The HTTP response status from the server was not 200 OK.
  36. case invalidHTTPStatus(String?)
  37. /// The HTTP response status from the server was not 200 OK but the "grpc-status" header contained
  38. /// a valid value.
  39. case invalidHTTPStatusWithGRPCStatus(GRPCStatus)
  40. /// An invalid state was encountered. This is a serious implementation error.
  41. case invalidState
  42. }
  43. enum SendRequestHeadersError: Error {
  44. /// An invalid state was encountered. This is a serious implementation error.
  45. case invalidState
  46. }
  47. enum SendEndOfRequestStreamError: Error {
  48. /// The request stream has already been closed. This may happen if the RPC was cancelled, timed
  49. /// out, the server terminated the RPC, or the user explicitly closed the stream multiple times.
  50. case alreadyClosed
  51. /// An invalid state was encountered. This is a serious implementation error.
  52. case invalidState
  53. }
  54. /// A state machine for a single gRPC call from the perspective of a client.
  55. ///
  56. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  57. struct GRPCClientStateMachine {
  58. /// The combined state of the request (client) and response (server) streams for an RPC call.
  59. ///
  60. /// The following states are not possible:
  61. /// - `.clientIdleServerActive`: The client must initiate the call before the server moves
  62. /// from the idle state.
  63. /// - `.clientIdleServerClosed`: The client must initiate the call before the server moves from
  64. /// the idle state.
  65. /// - `.clientActiveServerClosed`: The client may not stream if the server is closed.
  66. ///
  67. /// Note: when a peer (client or server) state is "active" it means that messages _may_ be sent or
  68. /// received. That is, the headers for the stream have been processed by the state machine and
  69. /// end-of-stream has not yet been processed. A stream may expect any number of messages (i.e. up
  70. /// to one for a unary call and many for a streaming call).
  71. enum State {
  72. /// Initial state. Neither request stream nor response stream have been initiated. Holds the
  73. /// pending write state for the request stream and arity for the response stream, respectively.
  74. ///
  75. /// Valid transitions:
  76. /// - `clientActiveServerIdle`: if the client initiates the RPC,
  77. /// - `clientClosedServerClosed`: if the client terminates the RPC.
  78. case clientIdleServerIdle(pendingWriteState: PendingWriteState, readArity: MessageArity)
  79. /// The client has initiated an RPC and has not received initial metadata from the server. Holds
  80. /// the writing state for request stream and arity for the response stream.
  81. ///
  82. /// Valid transitions:
  83. /// - `clientActiveServerActive`: if the server acknowledges the RPC initiation,
  84. /// - `clientClosedServerIdle`: if the client closes the request stream,
  85. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  86. /// RPC with a "trailers-only" response.
  87. case clientActiveServerIdle(writeState: WriteState, pendingReadState: PendingReadState)
  88. /// The client has indicated to the server that it has finished sending requests. The server
  89. /// has not yet sent response headers for the RPC. Holds the response stream arity.
  90. ///
  91. /// Valid transitions:
  92. /// - `clientClosedServerActive`: if the server acknowledges the RPC initiation,
  93. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  94. /// RPC with a "trailers-only" response.
  95. case clientClosedServerIdle(pendingReadState: PendingReadState)
  96. /// The client has initiated the RPC and the server has acknowledged it. Messages may have been
  97. /// sent and/or received. Holds the request stream write state and response stream read state.
  98. ///
  99. /// Valid transitions:
  100. /// - `clientClosedServerActive`: if the client closes the request stream,
  101. /// - `clientClosedServerClosed`: if the client or server terminates the RPC.
  102. case clientActiveServerActive(writeState: WriteState, readState: ReadState)
  103. /// The client has indicated to the server that it has finished sending requests. The server
  104. /// has acknowledged the RPC. Holds the response stream read state.
  105. ///
  106. /// Valid transitions:
  107. /// - `clientClosedServerClosed`: if the client or server terminate the RPC.
  108. case clientClosedServerActive(readState: ReadState)
  109. /// The RPC has terminated. There are no valid transitions from this state.
  110. case clientClosedServerClosed
  111. /// This isn't a real state. See `withStateAvoidingCoWs`.
  112. case modifying
  113. }
  114. /// The current state of the state machine.
  115. internal private(set) var state: State {
  116. didSet {
  117. switch (oldValue, self.state) {
  118. // Any modifying transitions are fine.
  119. case (.modifying, _),
  120. (_, .modifying):
  121. break
  122. // All valid transitions:
  123. case (.clientIdleServerIdle, .clientActiveServerIdle),
  124. (.clientIdleServerIdle, .clientClosedServerClosed),
  125. (.clientActiveServerIdle, .clientActiveServerActive),
  126. (.clientActiveServerIdle, .clientClosedServerIdle),
  127. (.clientActiveServerIdle, .clientClosedServerClosed),
  128. (.clientClosedServerIdle, .clientClosedServerActive),
  129. (.clientClosedServerIdle, .clientClosedServerClosed),
  130. (.clientActiveServerActive, .clientClosedServerActive),
  131. (.clientActiveServerActive, .clientClosedServerClosed),
  132. (.clientClosedServerActive, .clientClosedServerClosed):
  133. break
  134. // Self transitions, also valid:
  135. case (.clientIdleServerIdle, .clientIdleServerIdle),
  136. (.clientActiveServerIdle, .clientActiveServerIdle),
  137. (.clientClosedServerIdle, .clientClosedServerIdle),
  138. (.clientActiveServerActive, .clientActiveServerActive),
  139. (.clientClosedServerActive, .clientClosedServerActive),
  140. (.clientClosedServerClosed, .clientClosedServerClosed):
  141. break
  142. default:
  143. preconditionFailure("invalid state transition from '\(oldValue)' to '\(self.state)'")
  144. }
  145. }
  146. }
  147. /// Creates a state machine representing a gRPC client's request and response stream state.
  148. ///
  149. /// - Parameter requestArity: The expected number of messages on the request stream.
  150. /// - Parameter responseArity: The expected number of messages on the response stream.
  151. init(requestArity: MessageArity, responseArity: MessageArity) {
  152. self.state = .clientIdleServerIdle(
  153. pendingWriteState: .init(arity: requestArity, contentType: .protobuf),
  154. readArity: responseArity
  155. )
  156. }
  157. /// Creates a state machine representing a gRPC client's request and response stream state.
  158. ///
  159. /// - Parameter state: The initial state of the state machine.
  160. init(state: State) {
  161. self.state = state
  162. }
  163. /// Initiates an RPC.
  164. ///
  165. /// The only valid state transition is:
  166. /// - `.clientIdleServerIdle` → `.clientActiveServerIdle`
  167. ///
  168. /// All other states will result in an `.invalidState` error.
  169. ///
  170. /// On success the state will transition to `.clientActiveServerIdle`.
  171. ///
  172. /// - Parameter requestHead: The client request head for the RPC.
  173. mutating func sendRequestHeaders(
  174. requestHead: _GRPCRequestHead
  175. ) -> Result<HPACKHeaders, SendRequestHeadersError> {
  176. return self.withStateAvoidingCoWs { state in
  177. state.sendRequestHeaders(requestHead: requestHead)
  178. }
  179. }
  180. /// Formats a request to send to the server.
  181. ///
  182. /// The client must be streaming in order for this to return successfully. Therefore the valid
  183. /// state transitions are:
  184. /// - `.clientActiveServerIdle` → `.clientActiveServerIdle`
  185. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  186. ///
  187. /// The client should not attempt to send requests once the request stream is closed, that is
  188. /// from one of the following states:
  189. /// - `.clientClosedServerIdle`
  190. /// - `.clientClosedServerActive`
  191. /// - `.clientClosedServerClosed`
  192. /// Doing so will result in a `.cardinalityViolation`.
  193. ///
  194. /// Sending a message when both peers are idle (in the `.clientIdleServerIdle` state) will result
  195. /// in a `.invalidState` error.
  196. ///
  197. /// - Parameter message: The serialized request to send to the server.
  198. /// - Parameter compressed: Whether the request should be compressed.
  199. /// - Parameter allocator: A `ByteBufferAllocator` to allocate the buffer into which the encoded
  200. /// request will be written.
  201. mutating func sendRequest(
  202. _ message: ByteBuffer,
  203. compressed: Bool,
  204. allocator: ByteBufferAllocator
  205. ) -> Result<ByteBuffer, MessageWriteError> {
  206. return self.withStateAvoidingCoWs { state in
  207. state.sendRequest(message, compressed: compressed, allocator: allocator)
  208. }
  209. }
  210. /// Closes the request stream.
  211. ///
  212. /// The client must be streaming requests in order to terminate the request stream. Valid
  213. /// states transitions are:
  214. /// - `.clientActiveServerIdle` → `.clientClosedServerIdle`
  215. /// - `.clientActiveServerActive` → `.clientClosedServerActive`
  216. ///
  217. /// The client should not attempt to close the request stream if it is already closed, that is
  218. /// from one of the following states:
  219. /// - `.clientClosedServerIdle`
  220. /// - `.clientClosedServerActive`
  221. /// - `.clientClosedServerClosed`
  222. /// Doing so will result in an `.alreadyClosed` error.
  223. ///
  224. /// Closing the request stream when both peers are idle (in the `.clientIdleServerIdle` state)
  225. /// will result in a `.invalidState` error.
  226. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  227. return self.withStateAvoidingCoWs { state in
  228. state.sendEndOfRequestStream()
  229. }
  230. }
  231. /// Receive an acknowledgement of the RPC from the server. This **must not** be a "Trailers-Only"
  232. /// response.
  233. ///
  234. /// The server must be idle in order to receive response headers. The valid state transitions are:
  235. /// - `.clientActiveServerIdle` → `.clientActiveServerActive`
  236. /// - `.clientClosedServerIdle` → `.clientClosedServerActive`
  237. ///
  238. /// The response head will be parsed and validated against the gRPC specification. The following
  239. /// errors may be returned:
  240. /// - `.invalidHTTPStatus` if the status was not "200",
  241. /// - `.invalidContentType` if the "content-type" header does not start with "application/grpc",
  242. /// - `.unsupportedMessageEncoding` if the "grpc-encoding" header is not supported.
  243. ///
  244. /// It is not possible to receive response headers from the following states:
  245. /// - `.clientIdleServerIdle`
  246. /// - `.clientActiveServerActive`
  247. /// - `.clientClosedServerActive`
  248. /// - `.clientClosedServerClosed`
  249. /// Doing so will result in a `.invalidState` error.
  250. ///
  251. /// - Parameter headers: The headers received from the server.
  252. mutating func receiveResponseHeaders(
  253. _ headers: HPACKHeaders
  254. ) -> Result<Void, ReceiveResponseHeadError> {
  255. return self.withStateAvoidingCoWs { state in
  256. state.receiveResponseHeaders(headers)
  257. }
  258. }
  259. /// Read a response buffer from the server and return any decoded messages.
  260. ///
  261. /// If the response stream has an expected count of `.one` then this function is guaranteed to
  262. /// produce *at most* one `Response` in the `Result`.
  263. ///
  264. /// To receive a response buffer the server must be streaming. Valid states are:
  265. /// - `.clientClosedServerActive` → `.clientClosedServerActive`
  266. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  267. ///
  268. /// This function will read all of the bytes in the `buffer` and attempt to produce as many
  269. /// messages as possible. This may lead to a number of errors:
  270. /// - `.cardinalityViolation` if more than one message is received when the state reader is
  271. /// expects at most one.
  272. /// - `.leftOverBytes` if bytes remain in the buffer after reading one message when at most one
  273. /// message is expected.
  274. /// - `.deserializationFailed` if the message could not be deserialized.
  275. ///
  276. /// It is not possible to receive response headers from the following states:
  277. /// - `.clientIdleServerIdle`
  278. /// - `.clientClosedServerActive`
  279. /// - `.clientActiveServerActive`
  280. /// - `.clientClosedServerClosed`
  281. /// Doing so will result in a `.invalidState` error.
  282. ///
  283. /// - Parameter buffer: A buffer of bytes received from the server.
  284. mutating func receiveResponseBuffer(
  285. _ buffer: inout ByteBuffer
  286. ) -> Result<[ByteBuffer], MessageReadError> {
  287. return self.withStateAvoidingCoWs { state in
  288. state.receiveResponseBuffer(&buffer)
  289. }
  290. }
  291. /// Receive the end of the response stream from the server and parse the results into
  292. /// a `GRPCStatus`.
  293. ///
  294. /// To close the response stream the server must be streaming or idle (since the server may choose
  295. /// to 'fast fail' the RPC). Valid states are:
  296. /// - `.clientActiveServerIdle` → `.clientClosedServerClosed`
  297. /// - `.clientActiveServerActive` → `.clientClosedServerClosed`
  298. /// - `.clientClosedServerIdle` → `.clientClosedServerClosed`
  299. /// - `.clientClosedServerActive` → `.clientClosedServerClosed`
  300. ///
  301. /// It is not possible to receive an end-of-stream if the RPC has not been initiated or has
  302. /// already been terminated. That is, in one of the following states:
  303. /// - `.clientIdleServerIdle`
  304. /// - `.clientClosedServerClosed`
  305. /// Doing so will result in a `.invalidState` error.
  306. ///
  307. /// - Parameter trailers: The trailers to parse.
  308. mutating func receiveEndOfResponseStream(
  309. _ trailers: HPACKHeaders
  310. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  311. return self.withStateAvoidingCoWs { state in
  312. state.receiveEndOfResponseStream(trailers)
  313. }
  314. }
  315. /// Temporarily sets `self.state` to `.modifying` before calling the provided block and setting
  316. /// `self.state` to the `State` modified by the block.
  317. ///
  318. /// Since we hold state as associated data on our `State` enum, any modification to that state
  319. /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
  320. /// to `.modifying` allows us to avoid an extra reference to any heap allocated data and therefore
  321. /// avoid a copy on write.
  322. @inline(__always)
  323. private mutating func withStateAvoidingCoWs<ResultType>(
  324. _ body: (inout State) -> ResultType
  325. ) -> ResultType {
  326. var state = State.modifying
  327. swap(&self.state, &state)
  328. defer {
  329. swap(&self.state, &state)
  330. }
  331. return body(&state)
  332. }
  333. }
  334. extension GRPCClientStateMachine.State {
  335. /// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
  336. mutating func sendRequestHeaders(
  337. requestHead: _GRPCRequestHead
  338. ) -> Result<HPACKHeaders, SendRequestHeadersError> {
  339. let result: Result<HPACKHeaders, SendRequestHeadersError>
  340. switch self {
  341. case let .clientIdleServerIdle(pendingWriteState, responseArity):
  342. let headers = self.makeRequestHeaders(
  343. method: requestHead.method,
  344. scheme: requestHead.scheme,
  345. host: requestHead.host,
  346. path: requestHead.path,
  347. timeout: GRPCTimeout(deadline: requestHead.deadline),
  348. customMetadata: requestHead.customMetadata,
  349. compression: requestHead.encoding
  350. )
  351. result = .success(headers)
  352. self = .clientActiveServerIdle(
  353. writeState: pendingWriteState.makeWriteState(messageEncoding: requestHead.encoding),
  354. pendingReadState: .init(arity: responseArity, messageEncoding: requestHead.encoding)
  355. )
  356. case .clientActiveServerIdle,
  357. .clientClosedServerIdle,
  358. .clientClosedServerActive,
  359. .clientActiveServerActive,
  360. .clientClosedServerClosed:
  361. result = .failure(.invalidState)
  362. case .modifying:
  363. preconditionFailure("State left as 'modifying'")
  364. }
  365. return result
  366. }
  367. /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
  368. mutating func sendRequest(
  369. _ message: ByteBuffer,
  370. compressed: Bool,
  371. allocator: ByteBufferAllocator
  372. ) -> Result<ByteBuffer, MessageWriteError> {
  373. let result: Result<ByteBuffer, MessageWriteError>
  374. switch self {
  375. case .clientActiveServerIdle(var writeState, let pendingReadState):
  376. result = writeState.write(message, compressed: compressed, allocator: allocator)
  377. self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
  378. case .clientActiveServerActive(var writeState, let readState):
  379. result = writeState.write(message, compressed: compressed, allocator: allocator)
  380. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  381. case .clientClosedServerIdle,
  382. .clientClosedServerActive,
  383. .clientClosedServerClosed:
  384. result = .failure(.cardinalityViolation)
  385. case .clientIdleServerIdle:
  386. result = .failure(.invalidState)
  387. case .modifying:
  388. preconditionFailure("State left as 'modifying'")
  389. }
  390. return result
  391. }
  392. /// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
  393. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  394. let result: Result<Void, SendEndOfRequestStreamError>
  395. switch self {
  396. case let .clientActiveServerIdle(_, pendingReadState):
  397. result = .success(())
  398. self = .clientClosedServerIdle(pendingReadState: pendingReadState)
  399. case let .clientActiveServerActive(_, readState):
  400. result = .success(())
  401. self = .clientClosedServerActive(readState: readState)
  402. case .clientClosedServerIdle,
  403. .clientClosedServerActive,
  404. .clientClosedServerClosed:
  405. result = .failure(.alreadyClosed)
  406. case .clientIdleServerIdle:
  407. result = .failure(.invalidState)
  408. case .modifying:
  409. preconditionFailure("State left as 'modifying'")
  410. }
  411. return result
  412. }
  413. /// See `GRPCClientStateMachine.receiveResponseHeaders(_:)`.
  414. mutating func receiveResponseHeaders(
  415. _ headers: HPACKHeaders
  416. ) -> Result<Void, ReceiveResponseHeadError> {
  417. let result: Result<Void, ReceiveResponseHeadError>
  418. switch self {
  419. case let .clientActiveServerIdle(writeState, pendingReadState):
  420. result = self.parseResponseHeaders(headers, pendingReadState: pendingReadState)
  421. .map { readState in
  422. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  423. }
  424. case let .clientClosedServerIdle(pendingReadState):
  425. result = self.parseResponseHeaders(headers, pendingReadState: pendingReadState)
  426. .map { readState in
  427. self = .clientClosedServerActive(readState: readState)
  428. }
  429. case .clientIdleServerIdle,
  430. .clientClosedServerActive,
  431. .clientActiveServerActive,
  432. .clientClosedServerClosed:
  433. result = .failure(.invalidState)
  434. case .modifying:
  435. preconditionFailure("State left as 'modifying'")
  436. }
  437. return result
  438. }
  439. /// See `GRPCClientStateMachine.receiveResponseBuffer(_:)`.
  440. mutating func receiveResponseBuffer(
  441. _ buffer: inout ByteBuffer
  442. ) -> Result<[ByteBuffer], MessageReadError> {
  443. let result: Result<[ByteBuffer], MessageReadError>
  444. switch self {
  445. case var .clientClosedServerActive(readState):
  446. result = readState.readMessages(&buffer)
  447. self = .clientClosedServerActive(readState: readState)
  448. case .clientActiveServerActive(let writeState, var readState):
  449. result = readState.readMessages(&buffer)
  450. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  451. case .clientIdleServerIdle,
  452. .clientActiveServerIdle,
  453. .clientClosedServerIdle,
  454. .clientClosedServerClosed:
  455. result = .failure(.invalidState)
  456. case .modifying:
  457. preconditionFailure("State left as 'modifying'")
  458. }
  459. return result
  460. }
  461. /// See `GRPCClientStateMachine.receiveEndOfResponseStream(_:)`.
  462. mutating func receiveEndOfResponseStream(
  463. _ trailers: HPACKHeaders
  464. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  465. let result: Result<GRPCStatus, ReceiveEndOfResponseStreamError>
  466. switch self {
  467. case .clientActiveServerIdle,
  468. .clientClosedServerIdle:
  469. result = self.parseTrailersOnly(trailers).map { status in
  470. self = .clientClosedServerClosed
  471. return status
  472. }
  473. case .clientActiveServerActive,
  474. .clientClosedServerActive:
  475. result = .success(self.parseTrailers(trailers))
  476. self = .clientClosedServerClosed
  477. case .clientIdleServerIdle,
  478. .clientClosedServerClosed:
  479. result = .failure(.invalidState)
  480. case .modifying:
  481. preconditionFailure("State left as 'modifying'")
  482. }
  483. return result
  484. }
  485. /// Makes the request headers (`Request-Headers` in the specification) used to initiate an RPC
  486. /// call.
  487. ///
  488. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  489. ///
  490. /// - Parameter host: The host serving the RPC.
  491. /// - Parameter options: Any options related to the call.
  492. /// - Parameter requestID: A request ID associated with the call. An additional header will be
  493. /// added using this value if `options.requestIDHeader` is specified.
  494. private func makeRequestHeaders(
  495. method: String,
  496. scheme: String,
  497. host: String,
  498. path: String,
  499. timeout: GRPCTimeout,
  500. customMetadata: HPACKHeaders,
  501. compression: ClientMessageEncoding
  502. ) -> HPACKHeaders {
  503. var headers = HPACKHeaders()
  504. // The 10 is:
  505. // - 6 which are required and added just below, and
  506. // - 4 which are possibly added, depending on conditions.
  507. headers.reserveCapacity(10 + customMetadata.count)
  508. // Add the required headers.
  509. headers.add(name: ":method", value: method)
  510. headers.add(name: ":path", value: path)
  511. headers.add(name: ":authority", value: host)
  512. headers.add(name: ":scheme", value: scheme)
  513. headers.add(name: "content-type", value: "application/grpc")
  514. // Used to detect incompatible proxies, part of the gRPC specification.
  515. headers.add(name: "te", value: "trailers")
  516. switch compression {
  517. case let .enabled(configuration):
  518. // Request encoding.
  519. if let outbound = configuration.outbound {
  520. headers.add(name: GRPCHeaderName.encoding, value: outbound.name)
  521. }
  522. // Response encoding.
  523. if !configuration.inbound.isEmpty {
  524. headers.add(name: GRPCHeaderName.acceptEncoding, value: configuration.acceptEncodingHeader)
  525. }
  526. case .disabled:
  527. ()
  528. }
  529. // Add the timeout header, if a timeout was specified.
  530. if timeout != .infinite {
  531. headers.add(name: GRPCHeaderName.timeout, value: String(describing: timeout))
  532. }
  533. // Add user-defined custom metadata: this should come after the call definition headers.
  534. // TODO: make header normalization user-configurable.
  535. headers.add(contentsOf: customMetadata.lazy.map { name, value, indexing in
  536. (name.lowercased(), value, indexing)
  537. })
  538. // Add default user-agent value, if `customMetadata` didn't contain user-agent
  539. if !headers.contains(name: "user-agent") {
  540. // TODO: Add a more specific user-agent.
  541. headers.add(name: "user-agent", value: "grpc-swift-nio")
  542. }
  543. return headers
  544. }
  545. /// Parses the response headers ("Response-Headers" in the specification) from the server into
  546. /// a `ReadState`.
  547. ///
  548. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  549. ///
  550. /// - Parameter headers: The headers to parse.
  551. private func parseResponseHeaders(
  552. _ headers: HPACKHeaders,
  553. pendingReadState: PendingReadState
  554. ) -> Result<ReadState, ReceiveResponseHeadError> {
  555. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  556. //
  557. // "Implementations should expect broken deployments to send non-200 HTTP status codes in
  558. // responses as well as a variety of non-GRPC content-types and to omit Status & Status-Message.
  559. // Implementations must synthesize a Status & Status-Message to propagate to the application
  560. // layer when this occurs."
  561. let statusHeader = headers.first(name: ":status")
  562. let responseStatus = statusHeader
  563. .flatMap(Int.init)
  564. .map { code in
  565. HTTPResponseStatus(statusCode: code)
  566. } ?? .preconditionFailed
  567. guard responseStatus == .ok else {
  568. return .failure(.invalidHTTPStatus(statusHeader))
  569. }
  570. let contentTypeHeader = headers.first(name: "content-type")
  571. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  572. return .failure(.invalidContentType(contentTypeHeader))
  573. }
  574. let result: Result<ReadState, ReceiveResponseHeadError>
  575. // What compression mechanism is the server using, if any?
  576. if let encodingHeader = headers.first(name: GRPCHeaderName.encoding) {
  577. // Note: the server is allowed to encode messages using an algorithm which wasn't included in
  578. // the 'grpc-accept-encoding' header. If the client still supports that algorithm (despite not
  579. // permitting the server to use it) then it must still decode that message. Ideally we should
  580. // log a message here if that was the case but we don't hold that information.
  581. if let compression = CompressionAlgorithm(rawValue: encodingHeader) {
  582. result = .success(pendingReadState.makeReadState(compression: compression))
  583. } else {
  584. // The algorithm isn't one we support.
  585. result = .failure(.unsupportedMessageEncoding(encodingHeader))
  586. }
  587. } else {
  588. // No compression was specified, this is fine.
  589. result = .success(pendingReadState.makeReadState(compression: nil))
  590. }
  591. return result
  592. }
  593. /// Parses the response trailers ("Trailers" in the specification) from the server into
  594. /// a `GRPCStatus`.
  595. ///
  596. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  597. ///
  598. /// - Parameter trailers: Trailers to parse.
  599. private func parseTrailers(_ trailers: HPACKHeaders) -> GRPCStatus {
  600. // Extract the "Status" and "Status-Message"
  601. let code = self.readStatusCode(from: trailers) ?? .unknown
  602. let message = self.readStatusMessage(from: trailers)
  603. return .init(code: code, message: message)
  604. }
  605. private func readStatusCode(from trailers: HPACKHeaders) -> GRPCStatus.Code? {
  606. return trailers.first(name: GRPCHeaderName.statusCode)
  607. .flatMap(Int.init)
  608. .flatMap(GRPCStatus.Code.init)
  609. }
  610. private func readStatusMessage(from trailers: HPACKHeaders) -> String? {
  611. return trailers.first(name: GRPCHeaderName.statusMessage)
  612. .map(GRPCStatusMessageMarshaller.unmarshall)
  613. }
  614. /// Parses a "Trailers-Only" response from the server into a `GRPCStatus`.
  615. ///
  616. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  617. ///
  618. /// - Parameter trailers: Trailers to parse.
  619. private func parseTrailersOnly(
  620. _ trailers: HPACKHeaders
  621. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  622. // We need to check whether we have a valid HTTP status in the headers, if we don't then we also
  623. // need to check whether we have a gRPC status as it should take preference over a synthesising
  624. // one from the ":status".
  625. //
  626. // See: https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  627. let statusHeader = trailers.first(name: ":status")
  628. guard let status = statusHeader.flatMap(Int.init).map({ HTTPResponseStatus(statusCode: $0) })
  629. else {
  630. return .failure(.invalidHTTPStatus(statusHeader))
  631. }
  632. guard status == .ok else {
  633. if let code = self.readStatusCode(from: trailers) {
  634. let message = self.readStatusMessage(from: trailers)
  635. return .failure(.invalidHTTPStatusWithGRPCStatus(.init(code: code, message: message)))
  636. } else {
  637. return .failure(.invalidHTTPStatus(statusHeader))
  638. }
  639. }
  640. // Only validate the content-type header if it's present. This is a small deviation from the
  641. // spec as the content-type is meant to be sent in "Trailers-Only" responses. However, if it's
  642. // missing then we should avoid the error and propagate the status code and message sent by
  643. // the server instead.
  644. if let contentTypeHeader = trailers.first(name: "content-type"),
  645. ContentType(value: contentTypeHeader) == nil {
  646. return .failure(.invalidContentType(contentTypeHeader))
  647. }
  648. // We've verified the status and content type are okay: parse the trailers.
  649. return .success(self.parseTrailers(trailers))
  650. }
  651. }