HTTP2ToRawGRPCStateMachine.swift 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. struct HTTP2ToRawGRPCStateMachine {
  21. /// The current state.
  22. private var state: State = .requestIdleResponseIdle
  23. /// Temporarily sets `self.state` to `._modifying` before calling the provided block and setting
  24. /// `self.state` to the `State` modified by the block.
  25. ///
  26. /// Since we hold state as associated data on our `State` enum, any modification to that state
  27. /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
  28. /// to `._modifying` allows us to avoid an extra reference to any heap allocated data and
  29. /// therefore avoid a copy on write.
  30. @inlinable
  31. internal mutating func withStateAvoidingCoWs<Action>(_ body: (inout State) -> Action) -> Action {
  32. var state: State = ._modifying
  33. swap(&self.state, &state)
  34. defer {
  35. swap(&self.state, &state)
  36. }
  37. return body(&state)
  38. }
  39. }
  40. extension HTTP2ToRawGRPCStateMachine {
  41. enum State {
  42. // Both peers are idle. Nothing has happened to the stream.
  43. case requestIdleResponseIdle
  44. // Received valid headers. Nothing has been sent in response.
  45. case requestOpenResponseIdle(RequestOpenResponseIdleState)
  46. // Received valid headers and request(s). Response headers have been sent.
  47. case requestOpenResponseOpen(RequestOpenResponseOpenState)
  48. // Received valid headers and request(s) but not end of the request stream. Response stream has
  49. // been closed.
  50. case requestOpenResponseClosed
  51. // The request stream is closed. Nothing has been sent in response.
  52. case requestClosedResponseIdle(RequestClosedResponseIdleState)
  53. // The request stream is closed. Response headers have been sent.
  54. case requestClosedResponseOpen(RequestClosedResponseOpenState)
  55. // Both streams are closed. This state is terminal.
  56. case requestClosedResponseClosed
  57. // Not a real state. See 'withStateAvoidingCoWs'.
  58. case _modifying
  59. }
  60. struct RequestOpenResponseIdleState {
  61. /// A length prefixed message reader for request messages.
  62. var reader: LengthPrefixedMessageReader
  63. /// A length prefixed message writer for response messages.
  64. var writer: LengthPrefixedMessageWriter
  65. /// The content type of the RPC.
  66. var contentType: ContentType
  67. /// An accept encoding header to send in the response headers indicating the message encoding
  68. /// that the server supports.
  69. var acceptEncoding: String?
  70. /// A message encoding header to send in the response headers indicating the encoding which will
  71. /// be used for responses.
  72. var responseEncoding: String?
  73. /// Whether to normalize user-provided metadata.
  74. var normalizeHeaders: Bool
  75. /// The pipeline configuration state.
  76. var configurationState: ConfigurationState
  77. }
  78. struct RequestClosedResponseIdleState {
  79. /// A length prefixed message reader for request messages.
  80. var reader: LengthPrefixedMessageReader
  81. /// A length prefixed message writer for response messages.
  82. var writer: LengthPrefixedMessageWriter
  83. /// The content type of the RPC.
  84. var contentType: ContentType
  85. /// An accept encoding header to send in the response headers indicating the message encoding
  86. /// that the server supports.
  87. var acceptEncoding: String?
  88. /// A message encoding header to send in the response headers indicating the encoding which will
  89. /// be used for responses.
  90. var responseEncoding: String?
  91. /// Whether to normalize user-provided metadata.
  92. var normalizeHeaders: Bool
  93. /// The pipeline configuration state.
  94. var configurationState: ConfigurationState
  95. init(from state: RequestOpenResponseIdleState) {
  96. self.reader = state.reader
  97. self.writer = state.writer
  98. self.contentType = state.contentType
  99. self.acceptEncoding = state.acceptEncoding
  100. self.responseEncoding = state.responseEncoding
  101. self.normalizeHeaders = state.normalizeHeaders
  102. self.configurationState = state.configurationState
  103. }
  104. }
  105. struct RequestOpenResponseOpenState {
  106. /// A length prefixed message reader for request messages.
  107. var reader: LengthPrefixedMessageReader
  108. /// A length prefixed message writer for response messages.
  109. var writer: LengthPrefixedMessageWriter
  110. /// Whether to normalize user-provided metadata.
  111. var normalizeHeaders: Bool
  112. init(from state: RequestOpenResponseIdleState) {
  113. self.reader = state.reader
  114. self.writer = state.writer
  115. self.normalizeHeaders = state.normalizeHeaders
  116. }
  117. }
  118. struct RequestClosedResponseOpenState {
  119. /// A length prefixed message reader for request messages.
  120. var reader: LengthPrefixedMessageReader
  121. /// A length prefixed message writer for response messages.
  122. var writer: LengthPrefixedMessageWriter
  123. /// Whether to normalize user-provided metadata.
  124. var normalizeHeaders: Bool
  125. init(from state: RequestOpenResponseOpenState) {
  126. self.reader = state.reader
  127. self.writer = state.writer
  128. self.normalizeHeaders = state.normalizeHeaders
  129. }
  130. init(from state: RequestClosedResponseIdleState) {
  131. self.reader = state.reader
  132. self.writer = state.writer
  133. self.normalizeHeaders = state.normalizeHeaders
  134. }
  135. }
  136. /// The pipeline configuration state.
  137. enum ConfigurationState {
  138. /// The pipeline is being configured. Any message data will be buffered into an appropriate
  139. /// message reader.
  140. case configuring(HPACKHeaders)
  141. /// The pipeline is configured.
  142. case configured
  143. /// Returns true if the configuration is in the `.configured` state.
  144. var isConfigured: Bool {
  145. switch self {
  146. case .configuring:
  147. return false
  148. case .configured:
  149. return true
  150. }
  151. }
  152. /// Configuration has completed.
  153. mutating func configured() -> HPACKHeaders {
  154. switch self {
  155. case .configured:
  156. preconditionFailure("Invalid state: already configured")
  157. case let .configuring(headers):
  158. self = .configured
  159. return headers
  160. }
  161. }
  162. }
  163. }
  164. extension HTTP2ToRawGRPCStateMachine {
  165. enum PipelineConfiguredAction {
  166. /// Forward the given headers.
  167. case forwardHeaders(HPACKHeaders)
  168. /// Forward the given headers and try reading the next message.
  169. case forwardHeadersAndRead(HPACKHeaders)
  170. }
  171. enum ReceiveHeadersAction {
  172. /// Configure the RPC to use the given server handler.
  173. case configure(GRPCServerHandlerProtocol)
  174. /// Reject the RPC by writing out the given headers and setting end-stream.
  175. case rejectRPC(HPACKHeaders)
  176. }
  177. enum ReadNextMessageAction {
  178. /// Do nothing.
  179. case none
  180. /// Forward the buffer.
  181. case forwardMessage(ByteBuffer)
  182. /// Forward the buffer and try reading the next message.
  183. case forwardMessageThenReadNextMessage(ByteBuffer)
  184. /// Forward the 'end' of stream request part.
  185. case forwardEnd
  186. /// Throw an error down the pipeline.
  187. case errorCaught(Error)
  188. }
  189. struct StateAndReceiveHeadersAction {
  190. /// The next state.
  191. var state: State
  192. /// The action to take.
  193. var action: ReceiveHeadersAction
  194. }
  195. struct StateAndReceiveDataAction {
  196. /// The next state.
  197. var state: State
  198. /// The action to take
  199. var action: ReceiveDataAction
  200. }
  201. enum ReceiveDataAction: Hashable {
  202. /// Try to read the next message from the state machine.
  203. case tryReading
  204. /// Invoke 'finish' on the RPC handler.
  205. case finishHandler
  206. /// Do nothing.
  207. case nothing
  208. }
  209. enum SendEndAction {
  210. /// Send trailers to the client.
  211. case sendTrailers(HPACKHeaders)
  212. /// Send trailers to the client and invoke 'finish' on the RPC handler.
  213. case sendTrailersAndFinish(HPACKHeaders)
  214. /// Fail any promise associated with this send.
  215. case failure(Error)
  216. }
  217. }
  218. // MARK: Receive Headers
  219. // This is the only state in which we can receive headers.
  220. extension HTTP2ToRawGRPCStateMachine.State {
  221. private func _receive(
  222. headers: HPACKHeaders,
  223. eventLoop: EventLoop,
  224. errorDelegate: ServerErrorDelegate?,
  225. remoteAddress: SocketAddress?,
  226. logger: Logger,
  227. allocator: ByteBufferAllocator,
  228. responseWriter: GRPCServerResponseWriter,
  229. closeFuture: EventLoopFuture<Void>,
  230. services: [Substring: CallHandlerProvider],
  231. encoding: ServerMessageEncoding,
  232. normalizeHeaders: Bool
  233. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  234. // Extract and validate the content type. If it's nil we need to close.
  235. guard let contentType = self.extractContentType(from: headers) else {
  236. return self.unsupportedContentType()
  237. }
  238. // Now extract the request message encoding and setup an appropriate message reader.
  239. // We may send back a list of acceptable request message encodings as well.
  240. let reader: LengthPrefixedMessageReader
  241. let acceptableRequestEncoding: String?
  242. switch self.extractRequestEncoding(from: headers, encoding: encoding) {
  243. case let .valid(messageReader, acceptEncodingHeader):
  244. reader = messageReader
  245. acceptableRequestEncoding = acceptEncodingHeader
  246. case let .invalid(status, acceptableRequestEncoding):
  247. return self.invalidRequestEncoding(
  248. status: status,
  249. acceptableRequestEncoding: acceptableRequestEncoding,
  250. contentType: contentType
  251. )
  252. }
  253. // Figure out which encoding we should use for responses.
  254. let (writer, responseEncoding) = self.extractResponseEncoding(from: headers, encoding: encoding)
  255. // Parse the path, and create a call handler.
  256. guard let path = headers.first(name: ":path") else {
  257. return self.methodNotImplemented("", contentType: contentType)
  258. }
  259. guard let callPath = CallPath(requestURI: path),
  260. let service = services[Substring(callPath.service)] else {
  261. return self.methodNotImplemented(path, contentType: contentType)
  262. }
  263. // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with,
  264. // some of which is exposed to service providers.
  265. let context = CallHandlerContext(
  266. errorDelegate: errorDelegate,
  267. logger: logger,
  268. encoding: encoding,
  269. eventLoop: eventLoop,
  270. path: path,
  271. remoteAddress: remoteAddress,
  272. responseWriter: responseWriter,
  273. allocator: allocator,
  274. closeFuture: closeFuture
  275. )
  276. // We have a matching service, hopefully we have a provider for the method too.
  277. let method = Substring(callPath.method)
  278. if let handler = service.handle(method: method, context: context) {
  279. let nextState = HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState(
  280. reader: reader,
  281. writer: writer,
  282. contentType: contentType,
  283. acceptEncoding: acceptableRequestEncoding,
  284. responseEncoding: responseEncoding,
  285. normalizeHeaders: normalizeHeaders,
  286. configurationState: .configuring(headers)
  287. )
  288. return .init(
  289. state: .requestOpenResponseIdle(nextState),
  290. action: .configure(handler)
  291. )
  292. } else {
  293. return self.methodNotImplemented(path, contentType: contentType)
  294. }
  295. }
  296. /// The 'content-type' is not supported; close with status code 415.
  297. private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  298. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  299. //
  300. // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
  301. // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
  302. // clients from interpreting a gRPC error response, which uses status 200 (OK), as
  303. // successful.
  304. let trailers = HPACKHeaders([(":status", "415")])
  305. return .init(
  306. state: .requestClosedResponseClosed,
  307. action: .rejectRPC(trailers)
  308. )
  309. }
  310. /// The RPC method is not implemented. Close with an appropriate status.
  311. private func methodNotImplemented(
  312. _ path: String,
  313. contentType: ContentType
  314. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  315. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  316. for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"),
  317. contentType: contentType,
  318. acceptableRequestEncoding: nil,
  319. userProvidedHeaders: nil,
  320. normalizeUserProvidedHeaders: false
  321. )
  322. return .init(
  323. state: .requestClosedResponseClosed,
  324. action: .rejectRPC(trailers)
  325. )
  326. }
  327. /// The request encoding specified by the client is not supported. Close with an appropriate
  328. /// status.
  329. private func invalidRequestEncoding(
  330. status: GRPCStatus,
  331. acceptableRequestEncoding: String?,
  332. contentType: ContentType
  333. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  334. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  335. for: status,
  336. contentType: contentType,
  337. acceptableRequestEncoding: acceptableRequestEncoding,
  338. userProvidedHeaders: nil,
  339. normalizeUserProvidedHeaders: false
  340. )
  341. return .init(
  342. state: .requestClosedResponseClosed,
  343. action: .rejectRPC(trailers)
  344. )
  345. }
  346. /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the
  347. /// request message encoding is not supported.
  348. ///
  349. /// - Parameters:
  350. /// - encoding: The unsupported request message encoding sent by the client.
  351. /// - acceptable: The list if acceptable request message encoding the client may use.
  352. /// - Returns: The status and trailers to return to the client.
  353. private func makeStatusAndTrailersForUnsupportedEncoding(
  354. _ encoding: String,
  355. advertisedEncoding: [String]
  356. ) -> (GRPCStatus, acceptEncoding: String?) {
  357. let status: GRPCStatus
  358. let acceptEncoding: String?
  359. if advertisedEncoding.isEmpty {
  360. // No compression is supported; there's nothing to tell the client about.
  361. status = GRPCStatus(code: .unimplemented, message: "compression is not supported")
  362. acceptEncoding = nil
  363. } else {
  364. // Return a list of supported encodings which we advertise. (The list we advertise may be a
  365. // subset of the encodings we support.)
  366. acceptEncoding = advertisedEncoding.joined(separator: ",")
  367. status = GRPCStatus(
  368. code: .unimplemented,
  369. message: "\(encoding) compression is not supported, supported algorithms are " +
  370. "listed in '\(GRPCHeaderName.acceptEncoding)'"
  371. )
  372. }
  373. return (status, acceptEncoding)
  374. }
  375. /// Extract and validate the 'content-type' sent by the client.
  376. /// - Parameter headers: The headers to extract the 'content-type' from
  377. private func extractContentType(from headers: HPACKHeaders) -> ContentType? {
  378. return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  379. }
  380. /// The result of validating the request encoding header.
  381. private enum RequestEncodingValidation {
  382. /// The encoding was valid.
  383. case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?)
  384. /// The encoding was invalid, the RPC should be terminated with this status.
  385. case invalid(status: GRPCStatus, acceptEncoding: String?)
  386. }
  387. /// Extract and validate the request message encoding header.
  388. /// - Parameters:
  389. /// - headers: The headers to extract the message encoding header from.
  390. /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests
  391. /// and an accept encoding response header if the request encoding was valid, or a pair of
  392. /// `GRPCStatus` and trailers to close the RPC with.
  393. private func extractRequestEncoding(
  394. from headers: HPACKHeaders,
  395. encoding: ServerMessageEncoding
  396. ) -> RequestEncodingValidation {
  397. let encodings = headers[canonicalForm: GRPCHeaderName.encoding]
  398. // Fail if there's more than one encoding header.
  399. if encodings.count > 1 {
  400. let status = GRPCStatus(
  401. code: .invalidArgument,
  402. message: "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'"
  403. )
  404. return .invalid(status: status, acceptEncoding: nil)
  405. }
  406. let encodingHeader = encodings.first
  407. let result: RequestEncodingValidation
  408. let validator = MessageEncodingHeaderValidator(encoding: encoding)
  409. switch validator.validate(requestEncoding: encodingHeader) {
  410. case let .supported(algorithm, decompressionLimit, acceptEncoding):
  411. // Request message encoding is valid and supported.
  412. result = .valid(
  413. messageReader: LengthPrefixedMessageReader(
  414. compression: algorithm,
  415. decompressionLimit: decompressionLimit
  416. ),
  417. acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",")
  418. )
  419. case .noCompression:
  420. // No message encoding header was present. This means no compression is being used.
  421. result = .valid(
  422. messageReader: LengthPrefixedMessageReader(),
  423. acceptEncoding: nil
  424. )
  425. case let .unsupported(encoding, acceptable):
  426. // Request encoding is not supported.
  427. let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding(
  428. encoding,
  429. advertisedEncoding: acceptable
  430. )
  431. result = .invalid(status: status, acceptEncoding: acceptEncoding)
  432. }
  433. return result
  434. }
  435. /// Extract a suitable message encoding for responses.
  436. /// - Parameters:
  437. /// - headers: The headers to extract the acceptable response message encoding from.
  438. /// - configuration: The encoding configuration for the server.
  439. /// - Returns: A message writer and the response encoding header to send back to the client.
  440. private func extractResponseEncoding(
  441. from headers: HPACKHeaders,
  442. encoding: ServerMessageEncoding
  443. ) -> (LengthPrefixedMessageWriter, String?) {
  444. let writer: LengthPrefixedMessageWriter
  445. let responseEncoding: String?
  446. switch encoding {
  447. case let .enabled(configuration):
  448. // Extract the encodings acceptable to the client for response messages.
  449. let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding]
  450. // Select the first algorithm that we support and have enabled. If we don't find one then we
  451. // won't compress response messages.
  452. let algorithm = acceptableResponseEncoding.lazy.compactMap { value in
  453. CompressionAlgorithm(rawValue: value)
  454. }.first {
  455. configuration.enabledAlgorithms.contains($0)
  456. }
  457. writer = LengthPrefixedMessageWriter(compression: algorithm)
  458. responseEncoding = algorithm?.name
  459. case .disabled:
  460. // The server doesn't have compression enabled.
  461. writer = LengthPrefixedMessageWriter(compression: .none)
  462. responseEncoding = nil
  463. }
  464. return (writer, responseEncoding)
  465. }
  466. }
  467. // MARK: - Receive Data
  468. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  469. mutating func receive(
  470. buffer: inout ByteBuffer,
  471. endStream: Bool
  472. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
  473. // Append the bytes to the reader.
  474. self.reader.append(buffer: &buffer)
  475. let state: HTTP2ToRawGRPCStateMachine.State
  476. let action: HTTP2ToRawGRPCStateMachine.ReceiveDataAction
  477. switch (self.configurationState.isConfigured, endStream) {
  478. case (true, true):
  479. /// Configured and end stream: read from the buffer, end will be sent as a result of draining
  480. /// the reader in the next state.
  481. state = .requestClosedResponseIdle(.init(from: self))
  482. action = .tryReading
  483. case (true, false):
  484. /// Configured but not end stream, just read from the buffer.
  485. state = .requestOpenResponseIdle(self)
  486. action = .tryReading
  487. case (false, true):
  488. // Not configured yet, but end of stream. Request stream is now closed but there's no point
  489. // reading yet.
  490. state = .requestClosedResponseIdle(.init(from: self))
  491. action = .nothing
  492. case (false, false):
  493. // Not configured yet, not end stream. No point reading a message yet since we don't have
  494. // anywhere to deliver it.
  495. state = .requestOpenResponseIdle(self)
  496. action = .nothing
  497. }
  498. return .init(state: state, action: action)
  499. }
  500. }
  501. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  502. mutating func receive(
  503. buffer: inout ByteBuffer,
  504. endStream: Bool
  505. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
  506. self.reader.append(buffer: &buffer)
  507. let state: HTTP2ToRawGRPCStateMachine.State
  508. if endStream {
  509. // End stream, so move to the closed state. Any end of request stream events events will
  510. // happen as a result of reading from the closed state.
  511. state = .requestClosedResponseOpen(.init(from: self))
  512. } else {
  513. state = .requestOpenResponseOpen(self)
  514. }
  515. return .init(state: state, action: .tryReading)
  516. }
  517. }
  518. // MARK: - Send Headers
  519. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  520. func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
  521. return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  522. contentType: self.contentType,
  523. responseEncoding: self.responseEncoding,
  524. acceptableRequestEncoding: self.acceptEncoding,
  525. userProvidedHeaders: userProvidedHeaders,
  526. normalizeUserProvidedHeaders: self.normalizeHeaders
  527. )
  528. }
  529. }
  530. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  531. func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
  532. return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  533. contentType: self.contentType,
  534. responseEncoding: self.responseEncoding,
  535. acceptableRequestEncoding: self.acceptEncoding,
  536. userProvidedHeaders: userProvidedHeaders,
  537. normalizeUserProvidedHeaders: self.normalizeHeaders
  538. )
  539. }
  540. }
  541. // MARK: - Send Data
  542. extension HTTP2ToRawGRPCStateMachine {
  543. static func writeGRPCFramedMessage(
  544. _ buffer: ByteBuffer,
  545. compress: Bool,
  546. allocator: ByteBufferAllocator,
  547. writer: LengthPrefixedMessageWriter
  548. ) -> Result<ByteBuffer, Error> {
  549. do {
  550. let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
  551. return .success(prefixed)
  552. } catch {
  553. return .failure(error)
  554. }
  555. }
  556. }
  557. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  558. func send(
  559. buffer: ByteBuffer,
  560. allocator: ByteBufferAllocator,
  561. compress: Bool
  562. ) -> Result<ByteBuffer, Error> {
  563. return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  564. buffer,
  565. compress: compress,
  566. allocator: allocator,
  567. writer: self.writer
  568. )
  569. }
  570. }
  571. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  572. func send(
  573. buffer: ByteBuffer,
  574. allocator: ByteBufferAllocator,
  575. compress: Bool
  576. ) -> Result<ByteBuffer, Error> {
  577. return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  578. buffer,
  579. compress: compress,
  580. allocator: allocator,
  581. writer: self.writer
  582. )
  583. }
  584. }
  585. // MARK: - Send End
  586. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  587. func send(
  588. status: GRPCStatus,
  589. trailers userProvidedTrailers: HPACKHeaders
  590. ) -> HPACKHeaders {
  591. return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  592. for: status,
  593. contentType: self.contentType,
  594. acceptableRequestEncoding: self.acceptEncoding,
  595. userProvidedHeaders: userProvidedTrailers,
  596. normalizeUserProvidedHeaders: self.normalizeHeaders
  597. )
  598. }
  599. }
  600. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  601. func send(
  602. status: GRPCStatus,
  603. trailers userProvidedTrailers: HPACKHeaders
  604. ) -> HPACKHeaders {
  605. return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  606. for: status,
  607. contentType: self.contentType,
  608. acceptableRequestEncoding: self.acceptEncoding,
  609. userProvidedHeaders: userProvidedTrailers,
  610. normalizeUserProvidedHeaders: self.normalizeHeaders
  611. )
  612. }
  613. }
  614. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  615. func send(
  616. status: GRPCStatus,
  617. trailers userProvidedTrailers: HPACKHeaders
  618. ) -> HPACKHeaders {
  619. return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  620. for: status,
  621. userProvidedHeaders: userProvidedTrailers,
  622. normalizeUserProvidedHeaders: true
  623. )
  624. }
  625. }
  626. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  627. func send(
  628. status: GRPCStatus,
  629. trailers userProvidedTrailers: HPACKHeaders
  630. ) -> HPACKHeaders {
  631. return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  632. for: status,
  633. userProvidedHeaders: userProvidedTrailers,
  634. normalizeUserProvidedHeaders: true
  635. )
  636. }
  637. }
  638. // MARK: - Pipeline Configured
  639. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  640. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  641. let headers = self.configurationState.configured()
  642. let action: HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction
  643. // If there are unprocessed bytes then we need to read messages as well.
  644. let hasUnprocessedBytes = self.reader.unprocessedBytes != 0
  645. if hasUnprocessedBytes {
  646. // If there are unprocessed bytes, we need to try to read after sending the metadata.
  647. action = .forwardHeadersAndRead(headers)
  648. } else {
  649. // No unprocessed bytes; the reader is empty. Just send the metadata.
  650. action = .forwardHeaders(headers)
  651. }
  652. return action
  653. }
  654. }
  655. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  656. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  657. let headers = self.configurationState.configured()
  658. // Since we're already closed, we need to forward the headers and start reading.
  659. return .forwardHeadersAndRead(headers)
  660. }
  661. }
  662. // MARK: - Read Next Request
  663. extension HTTP2ToRawGRPCStateMachine {
  664. static func read(
  665. from reader: inout LengthPrefixedMessageReader,
  666. requestStreamClosed: Bool,
  667. maxLength: Int
  668. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  669. do {
  670. if let buffer = try reader.nextMessage(maxLength: maxLength) {
  671. if reader.unprocessedBytes > 0 || requestStreamClosed {
  672. // Either there are unprocessed bytes or the request stream is now closed: deliver the
  673. // message and then try to read. The subsequent read may be another message or it may
  674. // be end stream.
  675. return .forwardMessageThenReadNextMessage(buffer)
  676. } else {
  677. // Nothing left to process and the stream isn't closed yet, just forward the message.
  678. return .forwardMessage(buffer)
  679. }
  680. } else if requestStreamClosed {
  681. return .forwardEnd
  682. } else {
  683. return .none
  684. }
  685. } catch {
  686. return .errorCaught(error)
  687. }
  688. }
  689. }
  690. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  691. mutating func readNextRequest(
  692. maxLength: Int
  693. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  694. return HTTP2ToRawGRPCStateMachine.read(
  695. from: &self.reader,
  696. requestStreamClosed: false,
  697. maxLength: maxLength
  698. )
  699. }
  700. }
  701. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  702. mutating func readNextRequest(
  703. maxLength: Int
  704. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  705. return HTTP2ToRawGRPCStateMachine.read(
  706. from: &self.reader,
  707. requestStreamClosed: false,
  708. maxLength: maxLength
  709. )
  710. }
  711. }
  712. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  713. mutating func readNextRequest(
  714. maxLength: Int
  715. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  716. return HTTP2ToRawGRPCStateMachine.read(
  717. from: &self.reader,
  718. requestStreamClosed: true,
  719. maxLength: maxLength
  720. )
  721. }
  722. }
  723. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  724. mutating func readNextRequest(
  725. maxLength: Int
  726. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  727. return HTTP2ToRawGRPCStateMachine.read(
  728. from: &self.reader,
  729. requestStreamClosed: true,
  730. maxLength: maxLength
  731. )
  732. }
  733. }
  734. // MARK: - Top Level State Changes
  735. extension HTTP2ToRawGRPCStateMachine {
  736. /// Receive request headers.
  737. mutating func receive(
  738. headers: HPACKHeaders,
  739. eventLoop: EventLoop,
  740. errorDelegate: ServerErrorDelegate?,
  741. remoteAddress: SocketAddress?,
  742. logger: Logger,
  743. allocator: ByteBufferAllocator,
  744. responseWriter: GRPCServerResponseWriter,
  745. closeFuture: EventLoopFuture<Void>,
  746. services: [Substring: CallHandlerProvider],
  747. encoding: ServerMessageEncoding,
  748. normalizeHeaders: Bool
  749. ) -> ReceiveHeadersAction {
  750. return self.withStateAvoidingCoWs { state in
  751. state.receive(
  752. headers: headers,
  753. eventLoop: eventLoop,
  754. errorDelegate: errorDelegate,
  755. remoteAddress: remoteAddress,
  756. logger: logger,
  757. allocator: allocator,
  758. responseWriter: responseWriter,
  759. closeFuture: closeFuture,
  760. services: services,
  761. encoding: encoding,
  762. normalizeHeaders: normalizeHeaders
  763. )
  764. }
  765. }
  766. /// Receive request buffer.
  767. /// - Parameters:
  768. /// - buffer: The received buffer.
  769. /// - endStream: Whether end stream was set.
  770. /// - Returns: Returns whether the caller should try to read a message from the buffer.
  771. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> ReceiveDataAction {
  772. return self.withStateAvoidingCoWs { state in
  773. state.receive(buffer: &buffer, endStream: endStream)
  774. }
  775. }
  776. /// Send response headers.
  777. mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
  778. return self.withStateAvoidingCoWs { state in
  779. state.send(headers: headers)
  780. }
  781. }
  782. /// Send a response buffer.
  783. func send(
  784. buffer: ByteBuffer,
  785. allocator: ByteBufferAllocator,
  786. compress: Bool
  787. ) -> Result<ByteBuffer, Error> {
  788. return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
  789. }
  790. /// Send status and trailers.
  791. mutating func send(
  792. status: GRPCStatus,
  793. trailers: HPACKHeaders
  794. ) -> HTTP2ToRawGRPCStateMachine.SendEndAction {
  795. return self.withStateAvoidingCoWs { state in
  796. state.send(status: status, trailers: trailers)
  797. }
  798. }
  799. /// The pipeline has been configured with a service provider.
  800. mutating func pipelineConfigured() -> PipelineConfiguredAction {
  801. return self.withStateAvoidingCoWs { state in
  802. state.pipelineConfigured()
  803. }
  804. }
  805. /// Try to read a request message.
  806. mutating func readNextRequest(maxLength: Int) -> ReadNextMessageAction {
  807. return self.withStateAvoidingCoWs { state in
  808. state.readNextRequest(maxLength: maxLength)
  809. }
  810. }
  811. }
  812. extension HTTP2ToRawGRPCStateMachine.State {
  813. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  814. switch self {
  815. case .requestIdleResponseIdle:
  816. preconditionFailure("Invalid state: pipeline configured before receiving request headers")
  817. case var .requestOpenResponseIdle(state):
  818. let action = state.pipelineConfigured()
  819. self = .requestOpenResponseIdle(state)
  820. return action
  821. case var .requestClosedResponseIdle(state):
  822. let action = state.pipelineConfigured()
  823. self = .requestClosedResponseIdle(state)
  824. return action
  825. case .requestOpenResponseOpen,
  826. .requestOpenResponseClosed,
  827. .requestClosedResponseOpen,
  828. .requestClosedResponseClosed:
  829. preconditionFailure("Invalid state: response stream opened before pipeline was configured")
  830. case ._modifying:
  831. preconditionFailure("Left in modifying state")
  832. }
  833. }
  834. mutating func receive(
  835. headers: HPACKHeaders,
  836. eventLoop: EventLoop,
  837. errorDelegate: ServerErrorDelegate?,
  838. remoteAddress: SocketAddress?,
  839. logger: Logger,
  840. allocator: ByteBufferAllocator,
  841. responseWriter: GRPCServerResponseWriter,
  842. closeFuture: EventLoopFuture<Void>,
  843. services: [Substring: CallHandlerProvider],
  844. encoding: ServerMessageEncoding,
  845. normalizeHeaders: Bool
  846. ) -> HTTP2ToRawGRPCStateMachine.ReceiveHeadersAction {
  847. switch self {
  848. // These are the only states in which we can receive headers. Everything else is invalid.
  849. case .requestIdleResponseIdle,
  850. .requestClosedResponseClosed:
  851. let stateAndAction = self._receive(
  852. headers: headers,
  853. eventLoop: eventLoop,
  854. errorDelegate: errorDelegate,
  855. remoteAddress: remoteAddress,
  856. logger: logger,
  857. allocator: allocator,
  858. responseWriter: responseWriter,
  859. closeFuture: closeFuture,
  860. services: services,
  861. encoding: encoding,
  862. normalizeHeaders: normalizeHeaders
  863. )
  864. self = stateAndAction.state
  865. return stateAndAction.action
  866. // We can't receive headers in any of these states.
  867. case .requestOpenResponseIdle,
  868. .requestOpenResponseOpen,
  869. .requestOpenResponseClosed,
  870. .requestClosedResponseIdle,
  871. .requestClosedResponseOpen:
  872. preconditionFailure("Invalid state: \(self)")
  873. case ._modifying:
  874. preconditionFailure("Left in modifying state")
  875. }
  876. }
  877. /// Receive a buffer from the client.
  878. mutating func receive(
  879. buffer: inout ByteBuffer,
  880. endStream: Bool
  881. ) -> HTTP2ToRawGRPCStateMachine.ReceiveDataAction {
  882. switch self {
  883. case .requestIdleResponseIdle:
  884. /// This isn't allowed: we must receive the request headers first.
  885. preconditionFailure("Invalid state")
  886. case var .requestOpenResponseIdle(state):
  887. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  888. self = stateAndAction.state
  889. return stateAndAction.action
  890. case var .requestOpenResponseOpen(state):
  891. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  892. self = stateAndAction.state
  893. return stateAndAction.action
  894. case .requestClosedResponseIdle,
  895. .requestClosedResponseOpen:
  896. preconditionFailure("Invalid state: the request stream is already closed")
  897. case .requestOpenResponseClosed:
  898. if endStream {
  899. // Server has finish responding and this is the end of the request stream; we're done for
  900. // this RPC now, finish the handler.
  901. self = .requestClosedResponseClosed
  902. return .finishHandler
  903. } else {
  904. // Server has finished responding but this isn't the end of the request stream; ignore the
  905. // input, we need to wait for end stream before tearing down the handler.
  906. return .nothing
  907. }
  908. case .requestClosedResponseClosed:
  909. return .nothing
  910. case ._modifying:
  911. preconditionFailure("Left in modifying state")
  912. }
  913. }
  914. mutating func readNextRequest(
  915. maxLength: Int
  916. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  917. switch self {
  918. case .requestIdleResponseIdle:
  919. preconditionFailure("Invalid state")
  920. case var .requestOpenResponseIdle(state):
  921. let action = state.readNextRequest(maxLength: maxLength)
  922. self = .requestOpenResponseIdle(state)
  923. return action
  924. case var .requestOpenResponseOpen(state):
  925. let action = state.readNextRequest(maxLength: maxLength)
  926. self = .requestOpenResponseOpen(state)
  927. return action
  928. case var .requestClosedResponseIdle(state):
  929. let action = state.readNextRequest(maxLength: maxLength)
  930. self = .requestClosedResponseIdle(state)
  931. return action
  932. case var .requestClosedResponseOpen(state):
  933. let action = state.readNextRequest(maxLength: maxLength)
  934. self = .requestClosedResponseOpen(state)
  935. return action
  936. case .requestOpenResponseClosed,
  937. .requestClosedResponseClosed:
  938. return .none
  939. case ._modifying:
  940. preconditionFailure("Left in modifying state")
  941. }
  942. }
  943. mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
  944. switch self {
  945. case .requestIdleResponseIdle:
  946. preconditionFailure("Invalid state: the request stream isn't open")
  947. case let .requestOpenResponseIdle(state):
  948. let headers = state.send(headers: headers)
  949. self = .requestOpenResponseOpen(.init(from: state))
  950. return .success(headers)
  951. case let .requestClosedResponseIdle(state):
  952. let headers = state.send(headers: headers)
  953. self = .requestClosedResponseOpen(.init(from: state))
  954. return .success(headers)
  955. case .requestOpenResponseOpen,
  956. .requestOpenResponseClosed,
  957. .requestClosedResponseOpen,
  958. .requestClosedResponseClosed:
  959. return .failure(GRPCError.AlreadyComplete())
  960. case ._modifying:
  961. preconditionFailure("Left in modifying state")
  962. }
  963. }
  964. func send(
  965. buffer: ByteBuffer,
  966. allocator: ByteBufferAllocator,
  967. compress: Bool
  968. ) -> Result<ByteBuffer, Error> {
  969. switch self {
  970. case .requestIdleResponseIdle:
  971. preconditionFailure("Invalid state: the request stream is still closed")
  972. case .requestOpenResponseIdle,
  973. .requestClosedResponseIdle:
  974. let error = GRPCError.InvalidState("Response headers must be sent before response message")
  975. return .failure(error)
  976. case let .requestOpenResponseOpen(state):
  977. return state.send(
  978. buffer: buffer,
  979. allocator: allocator,
  980. compress: compress
  981. )
  982. case let .requestClosedResponseOpen(state):
  983. return state.send(
  984. buffer: buffer,
  985. allocator: allocator,
  986. compress: compress
  987. )
  988. case .requestOpenResponseClosed,
  989. .requestClosedResponseClosed:
  990. return .failure(GRPCError.AlreadyComplete())
  991. case ._modifying:
  992. preconditionFailure("Left in modifying state")
  993. }
  994. }
  995. mutating func send(
  996. status: GRPCStatus,
  997. trailers: HPACKHeaders
  998. ) -> HTTP2ToRawGRPCStateMachine.SendEndAction {
  999. switch self {
  1000. case .requestIdleResponseIdle:
  1001. preconditionFailure("Invalid state: the request stream is still closed")
  1002. case let .requestOpenResponseIdle(state):
  1003. self = .requestOpenResponseClosed
  1004. return .sendTrailers(state.send(status: status, trailers: trailers))
  1005. case let .requestClosedResponseIdle(state):
  1006. self = .requestClosedResponseClosed
  1007. return .sendTrailersAndFinish(state.send(status: status, trailers: trailers))
  1008. case let .requestOpenResponseOpen(state):
  1009. self = .requestOpenResponseClosed
  1010. return .sendTrailers(state.send(status: status, trailers: trailers))
  1011. case let .requestClosedResponseOpen(state):
  1012. self = .requestClosedResponseClosed
  1013. return .sendTrailersAndFinish(state.send(status: status, trailers: trailers))
  1014. case .requestOpenResponseClosed,
  1015. .requestClosedResponseClosed:
  1016. return .failure(GRPCError.AlreadyComplete())
  1017. case ._modifying:
  1018. preconditionFailure("Left in modifying state")
  1019. }
  1020. }
  1021. }
  1022. // MARK: - Helpers
  1023. extension HTTP2ToRawGRPCStateMachine {
  1024. static func makeResponseHeaders(
  1025. contentType: ContentType,
  1026. responseEncoding: String?,
  1027. acceptableRequestEncoding: String?,
  1028. userProvidedHeaders: HPACKHeaders,
  1029. normalizeUserProvidedHeaders: Bool
  1030. ) -> HPACKHeaders {
  1031. // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and
  1032. // 'grpc-accept-encoding' as well.
  1033. let capacity = 4 + userProvidedHeaders.count
  1034. var headers = HPACKHeaders()
  1035. headers.reserveCapacity(capacity)
  1036. headers.add(name: ":status", value: "200")
  1037. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  1038. if let responseEncoding = responseEncoding {
  1039. headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
  1040. }
  1041. if let acceptEncoding = acceptableRequestEncoding {
  1042. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  1043. }
  1044. // Add user provided headers, normalizing if required.
  1045. headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  1046. return headers
  1047. }
  1048. static func makeResponseTrailersOnly(
  1049. for status: GRPCStatus,
  1050. contentType: ContentType,
  1051. acceptableRequestEncoding: String?,
  1052. userProvidedHeaders: HPACKHeaders?,
  1053. normalizeUserProvidedHeaders: Bool
  1054. ) -> HPACKHeaders {
  1055. // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back
  1056. // 'grpc-message' and 'grpc-accept-encoding'.
  1057. let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0)
  1058. var headers = HPACKHeaders()
  1059. headers.reserveCapacity(capacity)
  1060. // Add the required trailers.
  1061. headers.add(name: ":status", value: "200")
  1062. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  1063. headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1064. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1065. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  1066. }
  1067. // We may include this if the requested encoding was not valid.
  1068. if let acceptEncoding = acceptableRequestEncoding {
  1069. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  1070. }
  1071. if let userProvided = userProvidedHeaders {
  1072. headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders)
  1073. }
  1074. return headers
  1075. }
  1076. static func makeResponseTrailers(
  1077. for status: GRPCStatus,
  1078. userProvidedHeaders: HPACKHeaders,
  1079. normalizeUserProvidedHeaders: Bool
  1080. ) -> HPACKHeaders {
  1081. // Most RPCs should end with status code 'ok' (hopefully!), and if the user didn't provide any
  1082. // additional trailers, then we can use a pre-canned set of headers to avoid an extra
  1083. // allocation.
  1084. if status == .ok, userProvidedHeaders.isEmpty {
  1085. return Self.gRPCStatusOkTrailers
  1086. }
  1087. // 2 because 'grpc-status' is required, we may also send back 'grpc-message'.
  1088. let capacity = 2 + userProvidedHeaders.count
  1089. var trailers = HPACKHeaders()
  1090. trailers.reserveCapacity(capacity)
  1091. // status code.
  1092. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1093. // status message, if present.
  1094. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1095. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  1096. }
  1097. // user provided trailers.
  1098. trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  1099. return trailers
  1100. }
  1101. private static let gRPCStatusOkTrailers: HPACKHeaders = [
  1102. GRPCHeaderName.statusCode: String(describing: GRPCStatus.Code.ok.rawValue),
  1103. ]
  1104. }
  1105. private extension HPACKHeaders {
  1106. mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) {
  1107. if normalize {
  1108. self.add(contentsOf: other.lazy.map { name, value, indexable in
  1109. (name: name.lowercased(), value: value, indexable: indexable)
  1110. })
  1111. } else {
  1112. self.add(contentsOf: other)
  1113. }
  1114. }
  1115. }