GRPCClientStateMachine.swift 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOCore
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. enum ReceiveResponseHeadError: Error, Equatable {
  23. /// The 'content-type' header was missing or the value is not supported by this implementation.
  24. case invalidContentType(String?)
  25. /// The HTTP response status from the server was not 200 OK.
  26. case invalidHTTPStatus(String?)
  27. /// The encoding used by the server is not supported.
  28. case unsupportedMessageEncoding(String)
  29. /// An invalid state was encountered. This is a serious implementation error.
  30. case invalidState
  31. }
  32. enum ReceiveEndOfResponseStreamError: Error, Equatable {
  33. /// The 'content-type' header was missing or the value is not supported by this implementation.
  34. case invalidContentType(String?)
  35. /// The HTTP response status from the server was not 200 OK.
  36. case invalidHTTPStatus(String?)
  37. /// An invalid state was encountered. This is a serious implementation error.
  38. case invalidState
  39. }
  40. enum SendRequestHeadersError: Error {
  41. /// An invalid state was encountered. This is a serious implementation error.
  42. case invalidState
  43. }
  44. enum SendEndOfRequestStreamError: Error {
  45. /// The request stream has already been closed. This may happen if the RPC was cancelled, timed
  46. /// out, the server terminated the RPC, or the user explicitly closed the stream multiple times.
  47. case alreadyClosed
  48. /// An invalid state was encountered. This is a serious implementation error.
  49. case invalidState
  50. }
  51. /// A state machine for a single gRPC call from the perspective of a client.
  52. ///
  53. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  54. struct GRPCClientStateMachine {
  55. /// The combined state of the request (client) and response (server) streams for an RPC call.
  56. ///
  57. /// The following states are not possible:
  58. /// - `.clientIdleServerActive`: The client must initiate the call before the server moves
  59. /// from the idle state.
  60. /// - `.clientIdleServerClosed`: The client must initiate the call before the server moves from
  61. /// the idle state.
  62. /// - `.clientActiveServerClosed`: The client may not stream if the server is closed.
  63. ///
  64. /// Note: when a peer (client or server) state is "active" it means that messages _may_ be sent or
  65. /// received. That is, the headers for the stream have been processed by the state machine and
  66. /// end-of-stream has not yet been processed. A stream may expect any number of messages (i.e. up
  67. /// to one for a unary call and many for a streaming call).
  68. enum State {
  69. /// Initial state. Neither request stream nor response stream have been initiated. Holds the
  70. /// pending write state for the request stream and arity for the response stream, respectively.
  71. ///
  72. /// Valid transitions:
  73. /// - `clientActiveServerIdle`: if the client initiates the RPC,
  74. /// - `clientClosedServerClosed`: if the client terminates the RPC.
  75. case clientIdleServerIdle(pendingWriteState: PendingWriteState, readArity: MessageArity)
  76. /// The client has initiated an RPC and has not received initial metadata from the server. Holds
  77. /// the writing state for request stream and arity for the response stream.
  78. ///
  79. /// Valid transitions:
  80. /// - `clientActiveServerActive`: if the server acknowledges the RPC initiation,
  81. /// - `clientClosedServerIdle`: if the client closes the request stream,
  82. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  83. /// RPC with a "trailers-only" response.
  84. case clientActiveServerIdle(writeState: WriteState, pendingReadState: PendingReadState)
  85. /// The client has indicated to the server that it has finished sending requests. The server
  86. /// has not yet sent response headers for the RPC. Holds the response stream arity.
  87. ///
  88. /// Valid transitions:
  89. /// - `clientClosedServerActive`: if the server acknowledges the RPC initiation,
  90. /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
  91. /// RPC with a "trailers-only" response.
  92. case clientClosedServerIdle(pendingReadState: PendingReadState)
  93. /// The client has initiated the RPC and the server has acknowledged it. Messages may have been
  94. /// sent and/or received. Holds the request stream write state and response stream read state.
  95. ///
  96. /// Valid transitions:
  97. /// - `clientClosedServerActive`: if the client closes the request stream,
  98. /// - `clientClosedServerClosed`: if the client or server terminates the RPC.
  99. case clientActiveServerActive(writeState: WriteState, readState: ReadState)
  100. /// The client has indicated to the server that it has finished sending requests. The server
  101. /// has acknowledged the RPC. Holds the response stream read state.
  102. ///
  103. /// Valid transitions:
  104. /// - `clientClosedServerClosed`: if the client or server terminate the RPC.
  105. case clientClosedServerActive(readState: ReadState)
  106. /// The RPC has terminated. There are no valid transitions from this state.
  107. case clientClosedServerClosed
  108. /// This isn't a real state. See `withStateAvoidingCoWs`.
  109. case modifying
  110. }
  111. /// The current state of the state machine.
  112. internal private(set) var state: State
  113. /// The default user-agent string.
  114. private static let userAgent = "grpc-swift-nio/\(Version.versionString)"
  115. /// Creates a state machine representing a gRPC client's request and response stream state.
  116. ///
  117. /// - Parameter requestArity: The expected number of messages on the request stream.
  118. /// - Parameter responseArity: The expected number of messages on the response stream.
  119. init(requestArity: MessageArity, responseArity: MessageArity) {
  120. self.state = .clientIdleServerIdle(
  121. pendingWriteState: .init(arity: requestArity, contentType: .protobuf),
  122. readArity: responseArity
  123. )
  124. }
  125. /// Creates a state machine representing a gRPC client's request and response stream state.
  126. ///
  127. /// - Parameter state: The initial state of the state machine.
  128. init(state: State) {
  129. self.state = state
  130. }
  131. /// Initiates an RPC.
  132. ///
  133. /// The only valid state transition is:
  134. /// - `.clientIdleServerIdle` → `.clientActiveServerIdle`
  135. ///
  136. /// All other states will result in an `.invalidState` error.
  137. ///
  138. /// On success the state will transition to `.clientActiveServerIdle`.
  139. ///
  140. /// - Parameter requestHead: The client request head for the RPC.
  141. mutating func sendRequestHeaders(
  142. requestHead: _GRPCRequestHead,
  143. allocator: ByteBufferAllocator
  144. ) -> Result<HPACKHeaders, SendRequestHeadersError> {
  145. return self.withStateAvoidingCoWs { state in
  146. state.sendRequestHeaders(requestHead: requestHead, allocator: allocator)
  147. }
  148. }
  149. /// Formats a request to send to the server.
  150. ///
  151. /// The client must be streaming in order for this to return successfully. Therefore the valid
  152. /// state transitions are:
  153. /// - `.clientActiveServerIdle` → `.clientActiveServerIdle`
  154. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  155. ///
  156. /// The client should not attempt to send requests once the request stream is closed, that is
  157. /// from one of the following states:
  158. /// - `.clientClosedServerIdle`
  159. /// - `.clientClosedServerActive`
  160. /// - `.clientClosedServerClosed`
  161. /// Doing so will result in a `.cardinalityViolation`.
  162. ///
  163. /// Sending a message when both peers are idle (in the `.clientIdleServerIdle` state) will result
  164. /// in a `.invalidState` error.
  165. ///
  166. /// - Parameter message: The serialized request to send to the server.
  167. /// - Parameter compressed: Whether the request should be compressed.
  168. /// - Parameter allocator: A `ByteBufferAllocator` to allocate the buffer into which the encoded
  169. /// request will be written.
  170. mutating func sendRequest(
  171. _ message: ByteBuffer,
  172. compressed: Bool,
  173. promise: EventLoopPromise<Void>? = nil
  174. ) -> Result<Void, MessageWriteError> {
  175. return self.withStateAvoidingCoWs { state in
  176. state.sendRequest(message, compressed: compressed, promise: promise)
  177. }
  178. }
  179. mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
  180. return self.state.nextRequest()
  181. }
  182. /// Closes the request stream.
  183. ///
  184. /// The client must be streaming requests in order to terminate the request stream. Valid
  185. /// states transitions are:
  186. /// - `.clientActiveServerIdle` → `.clientClosedServerIdle`
  187. /// - `.clientActiveServerActive` → `.clientClosedServerActive`
  188. ///
  189. /// The client should not attempt to close the request stream if it is already closed, that is
  190. /// from one of the following states:
  191. /// - `.clientClosedServerIdle`
  192. /// - `.clientClosedServerActive`
  193. /// - `.clientClosedServerClosed`
  194. /// Doing so will result in an `.alreadyClosed` error.
  195. ///
  196. /// Closing the request stream when both peers are idle (in the `.clientIdleServerIdle` state)
  197. /// will result in a `.invalidState` error.
  198. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  199. return self.withStateAvoidingCoWs { state in
  200. state.sendEndOfRequestStream()
  201. }
  202. }
  203. /// Receive an acknowledgement of the RPC from the server. This **must not** be a "Trailers-Only"
  204. /// response.
  205. ///
  206. /// The server must be idle in order to receive response headers. The valid state transitions are:
  207. /// - `.clientActiveServerIdle` → `.clientActiveServerActive`
  208. /// - `.clientClosedServerIdle` → `.clientClosedServerActive`
  209. ///
  210. /// The response head will be parsed and validated against the gRPC specification. The following
  211. /// errors may be returned:
  212. /// - `.invalidHTTPStatus` if the status was not "200",
  213. /// - `.invalidContentType` if the "content-type" header does not start with "application/grpc",
  214. /// - `.unsupportedMessageEncoding` if the "grpc-encoding" header is not supported.
  215. ///
  216. /// It is not possible to receive response headers from the following states:
  217. /// - `.clientIdleServerIdle`
  218. /// - `.clientActiveServerActive`
  219. /// - `.clientClosedServerActive`
  220. /// - `.clientClosedServerClosed`
  221. /// Doing so will result in a `.invalidState` error.
  222. ///
  223. /// - Parameter headers: The headers received from the server.
  224. mutating func receiveResponseHeaders(
  225. _ headers: HPACKHeaders
  226. ) -> Result<Void, ReceiveResponseHeadError> {
  227. return self.withStateAvoidingCoWs { state in
  228. state.receiveResponseHeaders(headers)
  229. }
  230. }
  231. /// Read a response buffer from the server and return any decoded messages.
  232. ///
  233. /// If the response stream has an expected count of `.one` then this function is guaranteed to
  234. /// produce *at most* one `Response` in the `Result`.
  235. ///
  236. /// To receive a response buffer the server must be streaming. Valid states are:
  237. /// - `.clientClosedServerActive` → `.clientClosedServerActive`
  238. /// - `.clientActiveServerActive` → `.clientActiveServerActive`
  239. ///
  240. /// This function will read all of the bytes in the `buffer` and attempt to produce as many
  241. /// messages as possible. This may lead to a number of errors:
  242. /// - `.cardinalityViolation` if more than one message is received when the state reader is
  243. /// expects at most one.
  244. /// - `.leftOverBytes` if bytes remain in the buffer after reading one message when at most one
  245. /// message is expected.
  246. /// - `.deserializationFailed` if the message could not be deserialized.
  247. ///
  248. /// It is not possible to receive response headers from the following states:
  249. /// - `.clientIdleServerIdle`
  250. /// - `.clientClosedServerActive`
  251. /// - `.clientActiveServerActive`
  252. /// - `.clientClosedServerClosed`
  253. /// Doing so will result in a `.invalidState` error.
  254. ///
  255. /// - Parameter buffer: A buffer of bytes received from the server.
  256. mutating func receiveResponseBuffer(
  257. _ buffer: inout ByteBuffer,
  258. maxMessageLength: Int
  259. ) -> Result<[ByteBuffer], MessageReadError> {
  260. return self.withStateAvoidingCoWs { state in
  261. state.receiveResponseBuffer(&buffer, maxMessageLength: maxMessageLength)
  262. }
  263. }
  264. /// Receive the end of the response stream from the server and parse the results into
  265. /// a `GRPCStatus`.
  266. ///
  267. /// To close the response stream the server must be streaming or idle (since the server may choose
  268. /// to 'fast fail' the RPC). Valid states are:
  269. /// - `.clientActiveServerIdle` → `.clientClosedServerClosed`
  270. /// - `.clientActiveServerActive` → `.clientClosedServerClosed`
  271. /// - `.clientClosedServerIdle` → `.clientClosedServerClosed`
  272. /// - `.clientClosedServerActive` → `.clientClosedServerClosed`
  273. ///
  274. /// It is not possible to receive an end-of-stream if the RPC has not been initiated or has
  275. /// already been terminated. That is, in one of the following states:
  276. /// - `.clientIdleServerIdle`
  277. /// - `.clientClosedServerClosed`
  278. /// Doing so will result in a `.invalidState` error.
  279. ///
  280. /// - Parameter trailers: The trailers to parse.
  281. mutating func receiveEndOfResponseStream(
  282. _ trailers: HPACKHeaders
  283. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  284. return self.withStateAvoidingCoWs { state in
  285. state.receiveEndOfResponseStream(trailers)
  286. }
  287. }
  288. /// Receive a DATA frame with the end stream flag set. Determines whether it is safe for the
  289. /// caller to ignore the end stream flag or whether a synthesised status should be forwarded.
  290. ///
  291. /// Receiving a DATA frame with the end stream flag set is unexpected: the specification dictates
  292. /// that an RPC should be ended by the server sending the client a HEADERS frame with end stream
  293. /// set. However, we will tolerate end stream on a DATA frame if we believe the RPC has already
  294. /// completed (i.e. we are in the 'clientClosedServerClosed' state). In cases where we don't
  295. /// expect end of stream on a DATA frame we will emit a status with a message explaining
  296. /// the protocol violation.
  297. mutating func receiveEndOfResponseStream() -> GRPCStatus? {
  298. return self.withStateAvoidingCoWs { state in
  299. state.receiveEndOfResponseStream()
  300. }
  301. }
  302. /// Temporarily sets `self.state` to `.modifying` before calling the provided block and setting
  303. /// `self.state` to the `State` modified by the block.
  304. ///
  305. /// Since we hold state as associated data on our `State` enum, any modification to that state
  306. /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
  307. /// to `.modifying` allows us to avoid an extra reference to any heap allocated data and therefore
  308. /// avoid a copy on write.
  309. @inline(__always)
  310. private mutating func withStateAvoidingCoWs<ResultType>(
  311. _ body: (inout State) -> ResultType
  312. ) -> ResultType {
  313. var state = State.modifying
  314. swap(&self.state, &state)
  315. defer {
  316. swap(&self.state, &state)
  317. }
  318. return body(&state)
  319. }
  320. }
  321. extension GRPCClientStateMachine.State {
  322. /// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
  323. mutating func sendRequestHeaders(
  324. requestHead: _GRPCRequestHead,
  325. allocator: ByteBufferAllocator
  326. ) -> Result<HPACKHeaders, SendRequestHeadersError> {
  327. let result: Result<HPACKHeaders, SendRequestHeadersError>
  328. switch self {
  329. case let .clientIdleServerIdle(pendingWriteState, responseArity):
  330. let headers = self.makeRequestHeaders(
  331. method: requestHead.method,
  332. scheme: requestHead.scheme,
  333. host: requestHead.host,
  334. path: requestHead.path,
  335. timeout: GRPCTimeout(deadline: requestHead.deadline),
  336. customMetadata: requestHead.customMetadata,
  337. compression: requestHead.encoding
  338. )
  339. result = .success(headers)
  340. self = .clientActiveServerIdle(
  341. writeState: pendingWriteState.makeWriteState(
  342. messageEncoding: requestHead.encoding,
  343. allocator: allocator
  344. ),
  345. pendingReadState: .init(arity: responseArity, messageEncoding: requestHead.encoding)
  346. )
  347. case .clientActiveServerIdle,
  348. .clientClosedServerIdle,
  349. .clientClosedServerActive,
  350. .clientActiveServerActive,
  351. .clientClosedServerClosed:
  352. result = .failure(.invalidState)
  353. case .modifying:
  354. preconditionFailure("State left as 'modifying'")
  355. }
  356. return result
  357. }
  358. /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
  359. mutating func sendRequest(
  360. _ message: ByteBuffer,
  361. compressed: Bool,
  362. promise: EventLoopPromise<Void>?
  363. ) -> Result<Void, MessageWriteError> {
  364. let result: Result<Void, MessageWriteError>
  365. switch self {
  366. case .clientActiveServerIdle(var writeState, let pendingReadState):
  367. let result = writeState.write(message, compressed: compressed, promise: promise)
  368. self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
  369. return result
  370. case .clientActiveServerActive(var writeState, let readState):
  371. let result = writeState.write(message, compressed: compressed, promise: promise)
  372. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  373. return result
  374. case .clientClosedServerIdle,
  375. .clientClosedServerActive,
  376. .clientClosedServerClosed:
  377. result = .failure(.cardinalityViolation)
  378. case .clientIdleServerIdle:
  379. result = .failure(.invalidState)
  380. case .modifying:
  381. preconditionFailure("State left as 'modifying'")
  382. }
  383. return result
  384. }
  385. mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
  386. switch self {
  387. case .clientActiveServerIdle(var writeState, let pendingReadState):
  388. self = .modifying
  389. let result = writeState.next()
  390. self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
  391. return result
  392. case .clientActiveServerActive(var writeState, let readState):
  393. self = .modifying
  394. let result = writeState.next()
  395. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  396. return result
  397. case .clientIdleServerIdle,
  398. .clientClosedServerIdle,
  399. .clientClosedServerActive,
  400. .clientClosedServerClosed:
  401. return nil
  402. case .modifying:
  403. preconditionFailure("State left as 'modifying'")
  404. }
  405. }
  406. /// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
  407. mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
  408. let result: Result<Void, SendEndOfRequestStreamError>
  409. switch self {
  410. case let .clientActiveServerIdle(_, pendingReadState):
  411. result = .success(())
  412. self = .clientClosedServerIdle(pendingReadState: pendingReadState)
  413. case let .clientActiveServerActive(_, readState):
  414. result = .success(())
  415. self = .clientClosedServerActive(readState: readState)
  416. case .clientClosedServerIdle,
  417. .clientClosedServerActive,
  418. .clientClosedServerClosed:
  419. result = .failure(.alreadyClosed)
  420. case .clientIdleServerIdle:
  421. result = .failure(.invalidState)
  422. case .modifying:
  423. preconditionFailure("State left as 'modifying'")
  424. }
  425. return result
  426. }
  427. /// See `GRPCClientStateMachine.receiveResponseHeaders(_:)`.
  428. mutating func receiveResponseHeaders(
  429. _ headers: HPACKHeaders
  430. ) -> Result<Void, ReceiveResponseHeadError> {
  431. let result: Result<Void, ReceiveResponseHeadError>
  432. switch self {
  433. case let .clientActiveServerIdle(writeState, pendingReadState):
  434. result = self.parseResponseHeaders(headers, pendingReadState: pendingReadState)
  435. .map { readState in
  436. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  437. }
  438. case let .clientClosedServerIdle(pendingReadState):
  439. result = self.parseResponseHeaders(headers, pendingReadState: pendingReadState)
  440. .map { readState in
  441. self = .clientClosedServerActive(readState: readState)
  442. }
  443. case .clientIdleServerIdle,
  444. .clientClosedServerActive,
  445. .clientActiveServerActive,
  446. .clientClosedServerClosed:
  447. result = .failure(.invalidState)
  448. case .modifying:
  449. preconditionFailure("State left as 'modifying'")
  450. }
  451. return result
  452. }
  453. /// See `GRPCClientStateMachine.receiveResponseBuffer(_:)`.
  454. mutating func receiveResponseBuffer(
  455. _ buffer: inout ByteBuffer,
  456. maxMessageLength: Int
  457. ) -> Result<[ByteBuffer], MessageReadError> {
  458. let result: Result<[ByteBuffer], MessageReadError>
  459. switch self {
  460. case var .clientClosedServerActive(readState):
  461. result = readState.readMessages(&buffer, maxLength: maxMessageLength)
  462. self = .clientClosedServerActive(readState: readState)
  463. case .clientActiveServerActive(let writeState, var readState):
  464. result = readState.readMessages(&buffer, maxLength: maxMessageLength)
  465. self = .clientActiveServerActive(writeState: writeState, readState: readState)
  466. case .clientIdleServerIdle,
  467. .clientActiveServerIdle,
  468. .clientClosedServerIdle,
  469. .clientClosedServerClosed:
  470. result = .failure(.invalidState)
  471. case .modifying:
  472. preconditionFailure("State left as 'modifying'")
  473. }
  474. return result
  475. }
  476. /// See `GRPCClientStateMachine.receiveEndOfResponseStream(_:)`.
  477. mutating func receiveEndOfResponseStream(
  478. _ trailers: HPACKHeaders
  479. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  480. let result: Result<GRPCStatus, ReceiveEndOfResponseStreamError>
  481. switch self {
  482. case .clientActiveServerIdle,
  483. .clientClosedServerIdle:
  484. result = self.parseTrailersOnly(trailers).map { status in
  485. self = .clientClosedServerClosed
  486. return status
  487. }
  488. case .clientActiveServerActive,
  489. .clientClosedServerActive:
  490. result = .success(self.parseTrailers(trailers))
  491. self = .clientClosedServerClosed
  492. case .clientIdleServerIdle,
  493. .clientClosedServerClosed:
  494. result = .failure(.invalidState)
  495. case .modifying:
  496. preconditionFailure("State left as 'modifying'")
  497. }
  498. return result
  499. }
  500. /// See `GRPCClientStateMachine.receiveEndOfResponseStream()`.
  501. mutating func receiveEndOfResponseStream() -> GRPCStatus? {
  502. let status: GRPCStatus?
  503. switch self {
  504. case .clientIdleServerIdle:
  505. // Can't see end stream before writing on it.
  506. preconditionFailure()
  507. case .clientActiveServerIdle,
  508. .clientActiveServerActive,
  509. .clientClosedServerIdle,
  510. .clientClosedServerActive:
  511. self = .clientClosedServerClosed
  512. status = .init(
  513. code: .internalError,
  514. message: "Protocol violation: received DATA frame with end stream set"
  515. )
  516. case .clientClosedServerClosed:
  517. // We've already closed. Ignore this.
  518. status = nil
  519. case .modifying:
  520. preconditionFailure("State left as 'modifying'")
  521. }
  522. return status
  523. }
  524. /// Makes the request headers (`Request-Headers` in the specification) used to initiate an RPC
  525. /// call.
  526. ///
  527. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  528. ///
  529. /// - Parameter host: The host serving the RPC.
  530. /// - Parameter options: Any options related to the call.
  531. /// - Parameter requestID: A request ID associated with the call. An additional header will be
  532. /// added using this value if `options.requestIDHeader` is specified.
  533. private func makeRequestHeaders(
  534. method: String,
  535. scheme: String,
  536. host: String,
  537. path: String,
  538. timeout: GRPCTimeout,
  539. customMetadata: HPACKHeaders,
  540. compression: ClientMessageEncoding
  541. ) -> HPACKHeaders {
  542. var headers = HPACKHeaders()
  543. // The 10 is:
  544. // - 6 which are required and added just below, and
  545. // - 4 which are possibly added, depending on conditions.
  546. headers.reserveCapacity(10 + customMetadata.count)
  547. // Add the required headers.
  548. headers.add(name: ":method", value: method)
  549. headers.add(name: ":path", value: path)
  550. headers.add(name: ":authority", value: host)
  551. headers.add(name: ":scheme", value: scheme)
  552. headers.add(name: "content-type", value: "application/grpc")
  553. // Used to detect incompatible proxies, part of the gRPC specification.
  554. headers.add(name: "te", value: "trailers")
  555. switch compression {
  556. case let .enabled(configuration):
  557. // Request encoding.
  558. if let outbound = configuration.outbound {
  559. headers.add(name: GRPCHeaderName.encoding, value: outbound.name)
  560. }
  561. // Response encoding.
  562. if !configuration.inbound.isEmpty {
  563. headers.add(name: GRPCHeaderName.acceptEncoding, value: configuration.acceptEncodingHeader)
  564. }
  565. case .disabled:
  566. ()
  567. }
  568. // Add the timeout header, if a timeout was specified.
  569. if timeout != .infinite {
  570. headers.add(name: GRPCHeaderName.timeout, value: String(describing: timeout))
  571. }
  572. // Add user-defined custom metadata: this should come after the call definition headers.
  573. // TODO: make header normalization user-configurable.
  574. headers.add(
  575. contentsOf: customMetadata.lazy.map { name, value, indexing in
  576. (name.lowercased(), value, indexing)
  577. }
  578. )
  579. // Add default user-agent value, if `customMetadata` didn't contain user-agent
  580. if !customMetadata.contains(name: "user-agent") {
  581. headers.add(name: "user-agent", value: GRPCClientStateMachine.userAgent)
  582. }
  583. return headers
  584. }
  585. /// Parses the response headers ("Response-Headers" in the specification) from the server into
  586. /// a `ReadState`.
  587. ///
  588. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  589. ///
  590. /// - Parameter headers: The headers to parse.
  591. private func parseResponseHeaders(
  592. _ headers: HPACKHeaders,
  593. pendingReadState: PendingReadState
  594. ) -> Result<ReadState, ReceiveResponseHeadError> {
  595. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  596. //
  597. // "Implementations should expect broken deployments to send non-200 HTTP status codes in
  598. // responses as well as a variety of non-GRPC content-types and to omit Status & Status-Message.
  599. // Implementations must synthesize a Status & Status-Message to propagate to the application
  600. // layer when this occurs."
  601. let statusHeader = headers.first(name: ":status")
  602. let responseStatus =
  603. statusHeader
  604. .flatMap(Int.init)
  605. .map { code in
  606. HTTPResponseStatus(statusCode: code)
  607. } ?? .preconditionFailed
  608. guard responseStatus == .ok else {
  609. return .failure(.invalidHTTPStatus(statusHeader))
  610. }
  611. let contentTypeHeader = headers.first(name: "content-type")
  612. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  613. return .failure(.invalidContentType(contentTypeHeader))
  614. }
  615. let result: Result<ReadState, ReceiveResponseHeadError>
  616. // What compression mechanism is the server using, if any?
  617. if let encodingHeader = headers.first(name: GRPCHeaderName.encoding) {
  618. // Note: the server is allowed to encode messages using an algorithm which wasn't included in
  619. // the 'grpc-accept-encoding' header. If the client still supports that algorithm (despite not
  620. // permitting the server to use it) then it must still decode that message. Ideally we should
  621. // log a message here if that was the case but we don't hold that information.
  622. if let compression = CompressionAlgorithm(rawValue: encodingHeader) {
  623. result = .success(pendingReadState.makeReadState(compression: compression))
  624. } else {
  625. // The algorithm isn't one we support.
  626. result = .failure(.unsupportedMessageEncoding(encodingHeader))
  627. }
  628. } else {
  629. // No compression was specified, this is fine.
  630. result = .success(pendingReadState.makeReadState(compression: nil))
  631. }
  632. return result
  633. }
  634. /// Parses the response trailers ("Trailers" in the specification) from the server into
  635. /// a `GRPCStatus`.
  636. ///
  637. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  638. ///
  639. /// - Parameter trailers: Trailers to parse.
  640. private func parseTrailers(_ trailers: HPACKHeaders) -> GRPCStatus {
  641. // Extract the "Status" and "Status-Message"
  642. let code = self.readStatusCode(from: trailers) ?? .unknown
  643. let message = self.readStatusMessage(from: trailers)
  644. return .init(code: code, message: message)
  645. }
  646. private func readStatusCode(from trailers: HPACKHeaders) -> GRPCStatus.Code? {
  647. return trailers.first(name: GRPCHeaderName.statusCode)
  648. .flatMap(Int.init)
  649. .flatMap({ GRPCStatus.Code(rawValue: $0) })
  650. }
  651. private func readStatusMessage(from trailers: HPACKHeaders) -> String? {
  652. return trailers.first(name: GRPCHeaderName.statusMessage)
  653. .map(GRPCStatusMessageMarshaller.unmarshall)
  654. }
  655. /// Parses a "Trailers-Only" response from the server into a `GRPCStatus`.
  656. ///
  657. /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
  658. ///
  659. /// - Parameter trailers: Trailers to parse.
  660. private func parseTrailersOnly(
  661. _ trailers: HPACKHeaders
  662. ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
  663. // We need to check whether we have a valid HTTP status in the headers, if we don't then we also
  664. // need to check whether we have a gRPC status as it should take preference over a synthesising
  665. // one from the ":status".
  666. //
  667. // See: https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  668. let statusHeader = trailers.first(name: ":status")
  669. let httpResponseStatus = statusHeader.flatMap(Int.init).map {
  670. HTTPResponseStatus(statusCode: $0)
  671. }
  672. guard let httpResponseStatus = httpResponseStatus else {
  673. return .failure(.invalidHTTPStatus(statusHeader))
  674. }
  675. guard httpResponseStatus == .ok else {
  676. // Non-200 response. If there's a 'grpc-status' message we should use that otherwise try
  677. // to create one from the HTTP status code.
  678. let grpcStatusCode =
  679. self.readStatusCode(from: trailers)
  680. ?? GRPCStatus.Code(httpStatus: Int(httpResponseStatus.code))
  681. ?? .unknown
  682. let message = self.readStatusMessage(from: trailers)
  683. return .success(GRPCStatus(code: grpcStatusCode, message: message))
  684. }
  685. // Only validate the content-type header if it's present. This is a small deviation from the
  686. // spec as the content-type is meant to be sent in "Trailers-Only" responses. However, if it's
  687. // missing then we should avoid the error and propagate the status code and message sent by
  688. // the server instead.
  689. if let contentTypeHeader = trailers.first(name: "content-type"),
  690. ContentType(value: contentTypeHeader) == nil
  691. {
  692. return .failure(.invalidContentType(contentTypeHeader))
  693. }
  694. // We've verified the status and content type are okay: parse the trailers.
  695. return .success(self.parseTrailers(trailers))
  696. }
  697. }