HTTP2ToRawGRPCStateMachine.swift 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. struct HTTP2ToRawGRPCStateMachine {
  21. /// The current state.
  22. private var state: State
  23. /// Temporarily sets `self.state` to `._modifying` before calling the provided block and setting
  24. /// `self.state` to the `State` modified by the block.
  25. ///
  26. /// Since we hold state as associated data on our `State` enum, any modification to that state
  27. /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
  28. /// to `._modifying` allows us to avoid an extra reference to any heap allocated data and
  29. /// therefore avoid a copy on write.
  30. private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
  31. var state: State = ._modifying
  32. swap(&self.state, &state)
  33. defer {
  34. swap(&self.state, &state)
  35. }
  36. return body(&state)
  37. }
  38. internal init(
  39. services: [Substring: CallHandlerProvider],
  40. encoding: ServerMessageEncoding,
  41. normalizeHeaders: Bool = true
  42. ) {
  43. let state = RequestIdleResponseIdleState(
  44. services: services,
  45. encoding: encoding,
  46. normalizeHeaders: normalizeHeaders
  47. )
  48. self.state = .requestIdleResponseIdle(state)
  49. }
  50. }
  51. extension HTTP2ToRawGRPCStateMachine {
  52. enum State {
  53. // Both peers are idle. Nothing has happened to the stream.
  54. case requestIdleResponseIdle(RequestIdleResponseIdleState)
  55. // Received valid headers. Nothing has been sent in response.
  56. case requestOpenResponseIdle(RequestOpenResponseIdleState)
  57. // Received valid headers and request(s). Response headers have been sent.
  58. case requestOpenResponseOpen(RequestOpenResponseOpenState)
  59. // The request stream is closed. Nothing has been sent in response.
  60. case requestClosedResponseIdle(RequestClosedResponseIdleState)
  61. // The request stream is closed. Response headers have been sent.
  62. case requestClosedResponseOpen(RequestClosedResponseOpenState)
  63. // Both streams are closed. This state is terminal.
  64. case requestClosedResponseClosed
  65. // Not a real state. See 'withStateAvoidingCoWs'.
  66. case _modifying
  67. }
  68. struct RequestIdleResponseIdleState {
  69. /// The service providers, keyed by service name.
  70. var services: [Substring: CallHandlerProvider]
  71. /// The encoding configuration for this server.
  72. var encoding: ServerMessageEncoding
  73. /// Whether to normalize user-provided metadata.
  74. var normalizeHeaders: Bool
  75. }
  76. struct RequestOpenResponseIdleState {
  77. /// A length prefixed message reader for request messages.
  78. var reader: LengthPrefixedMessageReader
  79. /// A length prefixed message writer for response messages.
  80. var writer: LengthPrefixedMessageWriter
  81. /// The content type of the RPC.
  82. var contentType: ContentType
  83. /// An accept encoding header to send in the response headers indicating the message encoding
  84. /// that the server supports.
  85. var acceptEncoding: String?
  86. /// A message encoding header to send in the response headers indicating the encoding which will
  87. /// be used for responses.
  88. var responseEncoding: String?
  89. /// Whether to normalize user-provided metadata.
  90. var normalizeHeaders: Bool
  91. /// The pipeline configuration state.
  92. var configurationState: ConfigurationState
  93. }
  94. struct RequestClosedResponseIdleState {
  95. /// A length prefixed message reader for request messages.
  96. var reader: LengthPrefixedMessageReader
  97. /// A length prefixed message writer for response messages.
  98. var writer: LengthPrefixedMessageWriter
  99. /// The content type of the RPC.
  100. var contentType: ContentType
  101. /// An accept encoding header to send in the response headers indicating the message encoding
  102. /// that the server supports.
  103. var acceptEncoding: String?
  104. /// A message encoding header to send in the response headers indicating the encoding which will
  105. /// be used for responses.
  106. var responseEncoding: String?
  107. /// Whether to normalize user-provided metadata.
  108. var normalizeHeaders: Bool
  109. /// The pipeline configuration state.
  110. var configurationState: ConfigurationState
  111. init(from state: RequestOpenResponseIdleState) {
  112. self.reader = state.reader
  113. self.writer = state.writer
  114. self.contentType = state.contentType
  115. self.acceptEncoding = state.acceptEncoding
  116. self.responseEncoding = state.responseEncoding
  117. self.normalizeHeaders = state.normalizeHeaders
  118. self.configurationState = state.configurationState
  119. }
  120. }
  121. struct RequestOpenResponseOpenState {
  122. /// A length prefixed message reader for request messages.
  123. var reader: LengthPrefixedMessageReader
  124. /// A length prefixed message writer for response messages.
  125. var writer: LengthPrefixedMessageWriter
  126. /// Whether to normalize user-provided metadata.
  127. var normalizeHeaders: Bool
  128. init(from state: RequestOpenResponseIdleState) {
  129. self.reader = state.reader
  130. self.writer = state.writer
  131. self.normalizeHeaders = state.normalizeHeaders
  132. }
  133. }
  134. struct RequestClosedResponseOpenState {
  135. /// A length prefixed message reader for request messages.
  136. var reader: LengthPrefixedMessageReader
  137. /// A length prefixed message writer for response messages.
  138. var writer: LengthPrefixedMessageWriter
  139. /// Whether to normalize user-provided metadata.
  140. var normalizeHeaders: Bool
  141. init(from state: RequestOpenResponseOpenState) {
  142. self.reader = state.reader
  143. self.writer = state.writer
  144. self.normalizeHeaders = state.normalizeHeaders
  145. }
  146. init(from state: RequestClosedResponseIdleState) {
  147. self.reader = state.reader
  148. self.writer = state.writer
  149. self.normalizeHeaders = state.normalizeHeaders
  150. }
  151. }
  152. /// The pipeline configuration state.
  153. enum ConfigurationState {
  154. /// The pipeline is being configured. Any message data will be buffered into an appropriate
  155. /// message reader.
  156. case configuring(HPACKHeaders)
  157. /// The pipeline is configured.
  158. case configured
  159. /// Returns true if the configuration is in the `.configured` state.
  160. var isConfigured: Bool {
  161. switch self {
  162. case .configuring:
  163. return false
  164. case .configured:
  165. return true
  166. }
  167. }
  168. /// Configuration has completed.
  169. mutating func configured() -> HPACKHeaders {
  170. switch self {
  171. case .configured:
  172. preconditionFailure("Invalid state: already configured")
  173. case let .configuring(headers):
  174. self = .configured
  175. return headers
  176. }
  177. }
  178. }
  179. }
  180. extension HTTP2ToRawGRPCStateMachine {
  181. /// Actions to take as a result of interacting with the state machine.
  182. enum Action {
  183. /// No action is required.
  184. case none
  185. /// Configure the pipeline using the provided call handler.
  186. case configure(GRPCCallHandler)
  187. /// An error was caught, fire it down the pipeline.
  188. case errorCaught(Error)
  189. /// Forward the request headers to the next handler.
  190. case forwardHeaders(HPACKHeaders)
  191. /// Forward the buffer to the next handler.
  192. case forwardMessage(ByteBuffer)
  193. /// Forward the buffer to the next handler and then send end stream.
  194. case forwardMessageAndEnd(ByteBuffer)
  195. /// Forward the request headers to the next handler then try reading request messages.
  196. case forwardHeadersThenReadNextMessage(HPACKHeaders)
  197. /// Forward the buffer to the next handler then try reading request messages.
  198. case forwardMessageThenReadNextMessage(ByteBuffer)
  199. /// Forward end of stream to the next handler.
  200. case forwardEnd
  201. /// Try to read a request message.
  202. case readNextRequest
  203. /// Write the frame to the channel, optionally insert an extra flush (i.e. if the state machine
  204. /// is turning a request around rather than processing a response part).
  205. case write(HTTP2Frame.FramePayload, EventLoopPromise<Void>?, flush: Bool)
  206. /// Complete the promise with the given result.
  207. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  208. }
  209. struct StateAndAction {
  210. /// The next state.
  211. var state: State
  212. /// The action to take.
  213. var action: Action
  214. }
  215. }
  216. // MARK: Receive Headers
  217. // This is the only state in which we can receive headers.
  218. extension HTTP2ToRawGRPCStateMachine.RequestIdleResponseIdleState {
  219. func receive(
  220. headers: HPACKHeaders,
  221. eventLoop: EventLoop,
  222. errorDelegate: ServerErrorDelegate?,
  223. remoteAddress: SocketAddress?,
  224. logger: Logger,
  225. allocator: ByteBufferAllocator,
  226. responseWriter: GRPCServerResponseWriter
  227. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  228. // Extract and validate the content type. If it's nil we need to close.
  229. guard let contentType = self.extractContentType(from: headers) else {
  230. return self.unsupportedContentType()
  231. }
  232. // Now extract the request message encoding and setup an appropriate message reader.
  233. // We may send back a list of acceptable request message encodings as well.
  234. let reader: LengthPrefixedMessageReader
  235. let acceptableRequestEncoding: String?
  236. switch self.extractRequestEncoding(from: headers) {
  237. case let .valid(messageReader, acceptEncodingHeader):
  238. reader = messageReader
  239. acceptableRequestEncoding = acceptEncodingHeader
  240. case let .invalid(status, acceptableRequestEncoding):
  241. return self.invalidRequestEncoding(
  242. status: status,
  243. acceptableRequestEncoding: acceptableRequestEncoding,
  244. contentType: contentType
  245. )
  246. }
  247. // Figure out which encoding we should use for responses.
  248. let (writer, responseEncoding) = self.extractResponseEncoding(from: headers)
  249. // Parse the path, and create a call handler.
  250. guard let path = headers.first(name: ":path") else {
  251. return self.methodNotImplemented("", contentType: contentType)
  252. }
  253. guard let callPath = CallPath(requestURI: path),
  254. let service = self.services[Substring(callPath.service)] else {
  255. return self.methodNotImplemented(path, contentType: contentType)
  256. }
  257. // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with,
  258. // some of which is exposed to service providers.
  259. let context = CallHandlerContext(
  260. errorDelegate: errorDelegate,
  261. logger: logger,
  262. encoding: self.encoding,
  263. eventLoop: eventLoop,
  264. path: path,
  265. remoteAddress: remoteAddress,
  266. responseWriter: responseWriter,
  267. allocator: allocator
  268. )
  269. // We have a matching service, hopefully we have a provider for the method too.
  270. let method = Substring(callPath.method)
  271. guard let handler = service.handleMethod(method, callHandlerContext: context) else {
  272. return self.methodNotImplemented(path, contentType: contentType)
  273. }
  274. // Finally, on to the next state!
  275. let requestOpenResponseIdle = HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState(
  276. reader: reader,
  277. writer: writer,
  278. contentType: contentType,
  279. acceptEncoding: acceptableRequestEncoding,
  280. responseEncoding: responseEncoding,
  281. normalizeHeaders: self.normalizeHeaders,
  282. configurationState: .configuring(headers)
  283. )
  284. return .init(
  285. state: .requestOpenResponseIdle(requestOpenResponseIdle),
  286. action: .configure(handler)
  287. )
  288. }
  289. /// The 'content-type' is not supported; close with status code 415.
  290. private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  291. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  292. //
  293. // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
  294. // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
  295. // clients from interpreting a gRPC error response, which uses status 200 (OK), as
  296. // successful.
  297. let trailers = HPACKHeaders([(":status", "415")])
  298. return .init(
  299. state: .requestClosedResponseClosed,
  300. action: .write(.headers(.init(headers: trailers, endStream: true)), nil, flush: true)
  301. )
  302. }
  303. /// The RPC method is not implemented. Close with an appropriate status.
  304. private func methodNotImplemented(
  305. _ path: String,
  306. contentType: ContentType
  307. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  308. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  309. for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"),
  310. contentType: contentType,
  311. acceptableRequestEncoding: nil,
  312. userProvidedHeaders: nil,
  313. normalizeUserProvidedHeaders: self.normalizeHeaders
  314. )
  315. return .init(
  316. state: .requestClosedResponseClosed,
  317. action: .write(.headers(.init(headers: trailers, endStream: true)), nil, flush: true)
  318. )
  319. }
  320. /// The request encoding specified by the client is not supported. Close with an appropriate
  321. /// status.
  322. private func invalidRequestEncoding(
  323. status: GRPCStatus,
  324. acceptableRequestEncoding: String?,
  325. contentType: ContentType
  326. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  327. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  328. for: status,
  329. contentType: contentType,
  330. acceptableRequestEncoding: acceptableRequestEncoding,
  331. userProvidedHeaders: nil,
  332. normalizeUserProvidedHeaders: self.normalizeHeaders
  333. )
  334. return .init(
  335. state: .requestClosedResponseClosed,
  336. action: .write(.headers(.init(headers: trailers, endStream: true)), nil, flush: true)
  337. )
  338. }
  339. /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the
  340. /// request message encoding is not supported.
  341. ///
  342. /// - Parameters:
  343. /// - encoding: The unsupported request message encoding sent by the client.
  344. /// - acceptable: The list if acceptable request message encoding the client may use.
  345. /// - Returns: The status and trailers to return to the client.
  346. private func makeStatusAndTrailersForUnsupportedEncoding(
  347. _ encoding: String,
  348. advertisedEncoding: [String]
  349. ) -> (GRPCStatus, acceptEncoding: String?) {
  350. let status: GRPCStatus
  351. let acceptEncoding: String?
  352. if advertisedEncoding.isEmpty {
  353. // No compression is supported; there's nothing to tell the client about.
  354. status = GRPCStatus(code: .unimplemented, message: "compression is not supported")
  355. acceptEncoding = nil
  356. } else {
  357. // Return a list of supported encodings which we advertise. (The list we advertise may be a
  358. // subset of the encodings we support.)
  359. acceptEncoding = advertisedEncoding.joined(separator: ",")
  360. status = GRPCStatus(
  361. code: .unimplemented,
  362. message: "\(encoding) compression is not supported, supported algorithms are " +
  363. "listed in '\(GRPCHeaderName.acceptEncoding)'"
  364. )
  365. }
  366. return (status, acceptEncoding)
  367. }
  368. /// Extract and validate the 'content-type' sent by the client.
  369. /// - Parameter headers: The headers to extract the 'content-type' from
  370. private func extractContentType(from headers: HPACKHeaders) -> ContentType? {
  371. return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  372. }
  373. /// The result of validating the request encoding header.
  374. private enum RequestEncodingValidation {
  375. /// The encoding was valid.
  376. case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?)
  377. /// The encoding was invalid, the RPC should be terminated with this status.
  378. case invalid(status: GRPCStatus, acceptEncoding: String?)
  379. }
  380. /// Extract and validate the request message encoding header.
  381. /// - Parameters:
  382. /// - headers: The headers to extract the message encoding header from.
  383. /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests
  384. /// and an accept encoding response header if the request encoding was valid, or a pair of
  385. /// `GRPCStatus` and trailers to close the RPC with.
  386. private func extractRequestEncoding(from headers: HPACKHeaders) -> RequestEncodingValidation {
  387. let encodings = headers[canonicalForm: GRPCHeaderName.encoding]
  388. // Fail if there's more than one encoding header.
  389. if encodings.count > 1 {
  390. let status = GRPCStatus(
  391. code: .invalidArgument,
  392. message: "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'"
  393. )
  394. return .invalid(status: status, acceptEncoding: nil)
  395. }
  396. let encodingHeader = encodings.first
  397. let result: RequestEncodingValidation
  398. let validator = MessageEncodingHeaderValidator(encoding: self.encoding)
  399. switch validator.validate(requestEncoding: encodingHeader) {
  400. case let .supported(algorithm, decompressionLimit, acceptEncoding):
  401. // Request message encoding is valid and supported.
  402. result = .valid(
  403. messageReader: LengthPrefixedMessageReader(
  404. compression: algorithm,
  405. decompressionLimit: decompressionLimit
  406. ),
  407. acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",")
  408. )
  409. case .noCompression:
  410. // No message encoding header was present. This means no compression is being used.
  411. result = .valid(
  412. messageReader: LengthPrefixedMessageReader(),
  413. acceptEncoding: nil
  414. )
  415. case let .unsupported(encoding, acceptable):
  416. // Request encoding is not supported.
  417. let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding(
  418. encoding,
  419. advertisedEncoding: acceptable
  420. )
  421. result = .invalid(status: status, acceptEncoding: acceptEncoding)
  422. }
  423. return result
  424. }
  425. /// Extract a suitable message encoding for responses.
  426. /// - Parameters:
  427. /// - headers: The headers to extract the acceptable response message encoding from.
  428. /// - configuration: The encoding configuration for the server.
  429. /// - Returns: A message writer and the response encoding header to send back to the client.
  430. private func extractResponseEncoding(
  431. from headers: HPACKHeaders
  432. ) -> (LengthPrefixedMessageWriter, String?) {
  433. let writer: LengthPrefixedMessageWriter
  434. let responseEncoding: String?
  435. switch self.encoding {
  436. case let .enabled(configuration):
  437. // Extract the encodings acceptable to the client for response messages.
  438. let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding]
  439. // Select the first algorithm that we support and have enabled. If we don't find one then we
  440. // won't compress response messages.
  441. let algorithm = acceptableResponseEncoding.lazy.compactMap { value in
  442. CompressionAlgorithm(rawValue: value)
  443. }.first {
  444. configuration.enabledAlgorithms.contains($0)
  445. }
  446. writer = LengthPrefixedMessageWriter(compression: algorithm)
  447. responseEncoding = algorithm?.name
  448. case .disabled:
  449. // The server doesn't have compression enabled.
  450. writer = LengthPrefixedMessageWriter(compression: .none)
  451. responseEncoding = nil
  452. }
  453. return (writer, responseEncoding)
  454. }
  455. }
  456. // MARK: - Receive Data
  457. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  458. mutating func receive(
  459. buffer: inout ByteBuffer,
  460. endStream: Bool
  461. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  462. // Append the bytes to the reader.
  463. self.reader.append(buffer: &buffer)
  464. let state: HTTP2ToRawGRPCStateMachine.State
  465. let action: HTTP2ToRawGRPCStateMachine.Action
  466. switch (self.configurationState.isConfigured, endStream) {
  467. case (true, true):
  468. /// Configured and end stream: read from the buffer, end will be sent as a result of draining
  469. /// the reader in the next state.
  470. state = .requestClosedResponseIdle(.init(from: self))
  471. action = .readNextRequest
  472. case (true, false):
  473. /// Configured but not end stream, just read from the buffer.
  474. state = .requestOpenResponseIdle(self)
  475. action = .readNextRequest
  476. case (false, true):
  477. // Not configured yet, but end of stream. Request stream is now closed but there's no point
  478. // reading yet.
  479. state = .requestClosedResponseIdle(.init(from: self))
  480. action = .none
  481. case (false, false):
  482. // Not configured yet, not end stream. No point reading a message yet since we don't have
  483. // anywhere to deliver it.
  484. state = .requestOpenResponseIdle(self)
  485. action = .none
  486. }
  487. return .init(state: state, action: action)
  488. }
  489. }
  490. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  491. mutating func receive(
  492. buffer: inout ByteBuffer,
  493. endStream: Bool
  494. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  495. self.reader.append(buffer: &buffer)
  496. let state: HTTP2ToRawGRPCStateMachine.State
  497. if endStream {
  498. // End stream, so move to the closed state. Any end of request stream events events will
  499. // happen as a result of reading from the closed state.
  500. state = .requestClosedResponseOpen(.init(from: self))
  501. } else {
  502. state = .requestOpenResponseOpen(self)
  503. }
  504. return .init(state: state, action: .readNextRequest)
  505. }
  506. }
  507. // MARK: - Send Headers
  508. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  509. func send(
  510. headers userProvidedHeaders: HPACKHeaders,
  511. promise: EventLoopPromise<Void>?
  512. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  513. let headers = HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  514. contentType: self.contentType,
  515. responseEncoding: self.responseEncoding,
  516. acceptableRequestEncoding: self.acceptEncoding,
  517. userProvidedHeaders: userProvidedHeaders,
  518. normalizeUserProvidedHeaders: self.normalizeHeaders
  519. )
  520. return .init(
  521. state: .requestOpenResponseOpen(.init(from: self)),
  522. action: .write(.headers(.init(headers: headers)), promise, flush: false)
  523. )
  524. }
  525. }
  526. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  527. func send(
  528. headers userProvidedHeaders: HPACKHeaders,
  529. promise: EventLoopPromise<Void>?
  530. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  531. let headers = HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  532. contentType: self.contentType,
  533. responseEncoding: self.responseEncoding,
  534. acceptableRequestEncoding: self.acceptEncoding,
  535. userProvidedHeaders: userProvidedHeaders,
  536. normalizeUserProvidedHeaders: self.normalizeHeaders
  537. )
  538. return .init(
  539. state: .requestClosedResponseOpen(.init(from: self)),
  540. action: .write(.headers(.init(headers: headers)), promise, flush: false)
  541. )
  542. }
  543. }
  544. // MARK: - Send Data
  545. extension HTTP2ToRawGRPCStateMachine {
  546. static func writeGRPCFramedMessage(
  547. _ buffer: ByteBuffer,
  548. compress: Bool,
  549. allocator: ByteBufferAllocator,
  550. promise: EventLoopPromise<Void>?,
  551. writer: LengthPrefixedMessageWriter
  552. ) -> Action {
  553. do {
  554. let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
  555. return .write(.data(.init(data: .byteBuffer(prefixed))), promise, flush: false)
  556. } catch {
  557. return .completePromise(promise, .failure(error))
  558. }
  559. }
  560. }
  561. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  562. func send(
  563. buffer: ByteBuffer,
  564. allocator: ByteBufferAllocator,
  565. compress: Bool,
  566. promise: EventLoopPromise<Void>?
  567. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  568. let action = HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  569. buffer,
  570. compress: compress,
  571. allocator: allocator,
  572. promise: promise,
  573. writer: self.writer
  574. )
  575. return .init(state: .requestOpenResponseOpen(self), action: action)
  576. }
  577. }
  578. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  579. func send(
  580. buffer: ByteBuffer,
  581. allocator: ByteBufferAllocator,
  582. compress: Bool,
  583. promise: EventLoopPromise<Void>?
  584. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  585. let action = HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  586. buffer,
  587. compress: compress,
  588. allocator: allocator,
  589. promise: promise,
  590. writer: self.writer
  591. )
  592. return .init(state: .requestClosedResponseOpen(self), action: action)
  593. }
  594. }
  595. // MARK: - Send End
  596. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  597. func send(
  598. status: GRPCStatus,
  599. trailers userProvidedTrailers: HPACKHeaders,
  600. promise: EventLoopPromise<Void>?
  601. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  602. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  603. for: status,
  604. contentType: self.contentType,
  605. acceptableRequestEncoding: self.acceptEncoding,
  606. userProvidedHeaders: userProvidedTrailers,
  607. normalizeUserProvidedHeaders: self.normalizeHeaders
  608. )
  609. return .init(
  610. state: .requestClosedResponseClosed,
  611. action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
  612. )
  613. }
  614. }
  615. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  616. func send(
  617. status: GRPCStatus,
  618. trailers userProvidedTrailers: HPACKHeaders,
  619. promise: EventLoopPromise<Void>?
  620. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  621. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  622. for: status,
  623. contentType: self.contentType,
  624. acceptableRequestEncoding: self.acceptEncoding,
  625. userProvidedHeaders: userProvidedTrailers,
  626. normalizeUserProvidedHeaders: self.normalizeHeaders
  627. )
  628. return .init(
  629. state: .requestClosedResponseClosed,
  630. action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
  631. )
  632. }
  633. }
  634. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  635. func send(
  636. status: GRPCStatus,
  637. trailers userProvidedTrailers: HPACKHeaders,
  638. promise: EventLoopPromise<Void>?
  639. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  640. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  641. for: status,
  642. userProvidedHeaders: userProvidedTrailers,
  643. normalizeUserProvidedHeaders: true
  644. )
  645. return .init(
  646. state: .requestClosedResponseClosed,
  647. action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
  648. )
  649. }
  650. }
  651. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  652. func send(
  653. status: GRPCStatus,
  654. trailers userProvidedTrailers: HPACKHeaders,
  655. promise: EventLoopPromise<Void>?
  656. ) -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  657. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  658. for: status,
  659. userProvidedHeaders: userProvidedTrailers,
  660. normalizeUserProvidedHeaders: true
  661. )
  662. return .init(
  663. state: .requestClosedResponseClosed,
  664. action: .write(.headers(.init(headers: trailers, endStream: true)), promise, flush: false)
  665. )
  666. }
  667. }
  668. // MARK: - Pipeline Configured
  669. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  670. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  671. let headers = self.configurationState.configured()
  672. let action: HTTP2ToRawGRPCStateMachine.Action
  673. // If there are unprocessed bytes then we need to read messages as well.
  674. let hasUnprocessedBytes = self.reader.unprocessedBytes != 0
  675. if hasUnprocessedBytes {
  676. // If there are unprocessed bytes, we need to try to read after sending the metadata.
  677. action = .forwardHeadersThenReadNextMessage(headers)
  678. } else {
  679. // No unprocessed bytes; the reader is empty. Just send the metadata.
  680. action = .forwardHeaders(headers)
  681. }
  682. return .init(state: .requestOpenResponseIdle(self), action: action)
  683. }
  684. }
  685. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  686. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  687. let headers = self.configurationState.configured()
  688. return .init(
  689. state: .requestClosedResponseIdle(self),
  690. // Since we're already closed, we need to forward the headers and start reading.
  691. action: .forwardHeadersThenReadNextMessage(headers)
  692. )
  693. }
  694. }
  695. // MARK: - Read Next Request
  696. extension HTTP2ToRawGRPCStateMachine {
  697. static func read(
  698. from reader: inout LengthPrefixedMessageReader,
  699. requestStreamClosed: Bool
  700. ) -> HTTP2ToRawGRPCStateMachine.Action {
  701. do {
  702. // Try to read a message.
  703. guard let buffer = try reader.nextMessage() else {
  704. // We didn't read a message: if we're closed then there's no chance of receiving more bytes,
  705. // just forward the end of stream. If we're not closed then we could receive more bytes so
  706. // there's no need to take any action at this point.
  707. return requestStreamClosed ? .forwardEnd : .none
  708. }
  709. guard reader.unprocessedBytes == 0 else {
  710. // There are still unprocessed bytes, continue reading.
  711. return .forwardMessageThenReadNextMessage(buffer)
  712. }
  713. // If we're closed and there's nothing left to read, then we're done, forward the message and
  714. // end of stream. If we're closed we could still receive more bytes (or end stream) so just
  715. // forward the message.
  716. return requestStreamClosed ? .forwardMessageAndEnd(buffer) : .forwardMessage(buffer)
  717. } catch {
  718. return .errorCaught(error)
  719. }
  720. }
  721. }
  722. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  723. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  724. let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
  725. return .init(state: .requestOpenResponseIdle(self), action: action)
  726. }
  727. }
  728. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  729. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  730. let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
  731. return .init(state: .requestOpenResponseOpen(self), action: action)
  732. }
  733. }
  734. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  735. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  736. let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
  737. return .init(state: .requestClosedResponseIdle(self), action: action)
  738. }
  739. }
  740. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  741. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.StateAndAction {
  742. let action = HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
  743. return .init(state: .requestClosedResponseOpen(self), action: action)
  744. }
  745. }
  746. // MARK: - Top Level State Changes
  747. extension HTTP2ToRawGRPCStateMachine {
  748. /// Receive request headers.
  749. mutating func receive(
  750. headers: HPACKHeaders,
  751. eventLoop: EventLoop,
  752. errorDelegate: ServerErrorDelegate?,
  753. remoteAddress: SocketAddress?,
  754. logger: Logger,
  755. allocator: ByteBufferAllocator,
  756. responseWriter: GRPCServerResponseWriter
  757. ) -> Action {
  758. return self.withStateAvoidingCoWs { state in
  759. state.receive(
  760. headers: headers,
  761. eventLoop: eventLoop,
  762. errorDelegate: errorDelegate,
  763. remoteAddress: remoteAddress,
  764. logger: logger,
  765. allocator: allocator,
  766. responseWriter: responseWriter
  767. )
  768. }
  769. }
  770. /// Receive request buffer.
  771. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> Action {
  772. return self.withStateAvoidingCoWs { state in
  773. state.receive(buffer: &buffer, endStream: endStream)
  774. }
  775. }
  776. /// Send response headers.
  777. mutating func send(headers: HPACKHeaders, promise: EventLoopPromise<Void>?) -> Action {
  778. return self.withStateAvoidingCoWs { state in
  779. state.send(headers: headers, promise: promise)
  780. }
  781. }
  782. /// Send a response buffer.
  783. mutating func send(
  784. buffer: ByteBuffer,
  785. allocator: ByteBufferAllocator,
  786. compress: Bool,
  787. promise: EventLoopPromise<Void>?
  788. ) -> Action {
  789. return self.withStateAvoidingCoWs { state in
  790. state.send(buffer: buffer, allocator: allocator, compress: compress, promise: promise)
  791. }
  792. }
  793. /// Send status and trailers.
  794. mutating func send(
  795. status: GRPCStatus,
  796. trailers: HPACKHeaders,
  797. promise: EventLoopPromise<Void>?
  798. ) -> Action {
  799. return self.withStateAvoidingCoWs { state in
  800. state.send(status: status, trailers: trailers, promise: promise)
  801. }
  802. }
  803. /// The pipeline has been configured with a service provider.
  804. mutating func pipelineConfigured() -> Action {
  805. return self.withStateAvoidingCoWs { state in
  806. state.pipelineConfigured()
  807. }
  808. }
  809. /// Try to read a request message.
  810. mutating func readNextRequest() -> Action {
  811. return self.withStateAvoidingCoWs { state in
  812. state.readNextRequest()
  813. }
  814. }
  815. }
  816. extension HTTP2ToRawGRPCStateMachine.State {
  817. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.Action {
  818. switch self {
  819. case .requestIdleResponseIdle:
  820. preconditionFailure("Invalid state: pipeline configured before receiving request headers")
  821. case var .requestOpenResponseIdle(state):
  822. let stateAndAction = state.pipelineConfigured()
  823. self = stateAndAction.state
  824. return stateAndAction.action
  825. case var .requestClosedResponseIdle(state):
  826. let stateAndAction = state.pipelineConfigured()
  827. self = stateAndAction.state
  828. return stateAndAction.action
  829. case .requestOpenResponseOpen,
  830. .requestClosedResponseOpen,
  831. .requestClosedResponseClosed:
  832. preconditionFailure("Invalid state: response stream opened before pipeline was configured")
  833. case ._modifying:
  834. preconditionFailure("Left in modifying state")
  835. }
  836. }
  837. mutating func receive(
  838. headers: HPACKHeaders,
  839. eventLoop: EventLoop,
  840. errorDelegate: ServerErrorDelegate?,
  841. remoteAddress: SocketAddress?,
  842. logger: Logger,
  843. allocator: ByteBufferAllocator,
  844. responseWriter: GRPCServerResponseWriter
  845. ) -> HTTP2ToRawGRPCStateMachine.Action {
  846. switch self {
  847. // This is the only state in which we can receive headers. Everything else is invalid.
  848. case let .requestIdleResponseIdle(state):
  849. let stateAndAction = state.receive(
  850. headers: headers,
  851. eventLoop: eventLoop,
  852. errorDelegate: errorDelegate,
  853. remoteAddress: remoteAddress,
  854. logger: logger,
  855. allocator: allocator,
  856. responseWriter: responseWriter
  857. )
  858. self = stateAndAction.state
  859. return stateAndAction.action
  860. // We can't receive headers in any of these states.
  861. case .requestOpenResponseIdle,
  862. .requestOpenResponseOpen,
  863. .requestClosedResponseIdle,
  864. .requestClosedResponseOpen,
  865. .requestClosedResponseClosed:
  866. preconditionFailure("Invalid state")
  867. case ._modifying:
  868. preconditionFailure("Left in modifying state")
  869. }
  870. }
  871. /// Receive a buffer from the client.
  872. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> HTTP2ToRawGRPCStateMachine
  873. .Action {
  874. switch self {
  875. case .requestIdleResponseIdle:
  876. /// This isn't allowed: we must receive the request headers first.
  877. preconditionFailure("Invalid state")
  878. case var .requestOpenResponseIdle(state):
  879. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  880. self = stateAndAction.state
  881. return stateAndAction.action
  882. case var .requestOpenResponseOpen(state):
  883. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  884. self = stateAndAction.state
  885. return stateAndAction.action
  886. case .requestClosedResponseIdle,
  887. .requestClosedResponseOpen:
  888. preconditionFailure("Invalid state: the request stream is already closed")
  889. case .requestClosedResponseClosed:
  890. // This is okay: we could have closed before receiving end.
  891. return .none
  892. case ._modifying:
  893. preconditionFailure("Left in modifying state")
  894. }
  895. }
  896. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.Action {
  897. switch self {
  898. case .requestIdleResponseIdle:
  899. preconditionFailure("Invalid state")
  900. case var .requestOpenResponseIdle(state):
  901. let stateAndAction = state.readNextRequest()
  902. self = stateAndAction.state
  903. return stateAndAction.action
  904. case var .requestOpenResponseOpen(state):
  905. let stateAndAction = state.readNextRequest()
  906. self = stateAndAction.state
  907. return stateAndAction.action
  908. case var .requestClosedResponseIdle(state):
  909. let stateAndAction = state.readNextRequest()
  910. self = stateAndAction.state
  911. return stateAndAction.action
  912. case var .requestClosedResponseOpen(state):
  913. let stateAndAction = state.readNextRequest()
  914. self = stateAndAction.state
  915. return stateAndAction.action
  916. case .requestClosedResponseClosed:
  917. return .none
  918. case ._modifying:
  919. preconditionFailure("Left in modifying state")
  920. }
  921. }
  922. mutating func send(
  923. headers: HPACKHeaders,
  924. promise: EventLoopPromise<Void>?
  925. ) -> HTTP2ToRawGRPCStateMachine.Action {
  926. switch self {
  927. case .requestIdleResponseIdle:
  928. preconditionFailure("Invalid state: the request stream isn't open")
  929. case let .requestOpenResponseIdle(state):
  930. let stateAndAction = state.send(headers: headers, promise: promise)
  931. self = stateAndAction.state
  932. return stateAndAction.action
  933. case let .requestClosedResponseIdle(state):
  934. let stateAndAction = state.send(headers: headers, promise: promise)
  935. self = stateAndAction.state
  936. return stateAndAction.action
  937. case .requestOpenResponseOpen,
  938. .requestClosedResponseOpen,
  939. .requestClosedResponseClosed:
  940. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  941. case ._modifying:
  942. preconditionFailure("Left in modifying state")
  943. }
  944. }
  945. mutating func send(
  946. buffer: ByteBuffer,
  947. allocator: ByteBufferAllocator,
  948. compress: Bool,
  949. promise: EventLoopPromise<Void>?
  950. ) -> HTTP2ToRawGRPCStateMachine.Action {
  951. switch self {
  952. case .requestIdleResponseIdle:
  953. preconditionFailure("Invalid state: the request stream is still closed")
  954. case .requestOpenResponseIdle,
  955. .requestClosedResponseIdle:
  956. let error = GRPCError.InvalidState("Response headers must be sent before response message")
  957. return .completePromise(promise, .failure(error))
  958. case let .requestOpenResponseOpen(state):
  959. let stateAndAction = state.send(
  960. buffer: buffer,
  961. allocator: allocator,
  962. compress: compress,
  963. promise: promise
  964. )
  965. self = stateAndAction.state
  966. return stateAndAction.action
  967. case let .requestClosedResponseOpen(state):
  968. let stateAndAction = state.send(
  969. buffer: buffer,
  970. allocator: allocator,
  971. compress: compress,
  972. promise: promise
  973. )
  974. self = stateAndAction.state
  975. return stateAndAction.action
  976. case .requestClosedResponseClosed:
  977. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  978. case ._modifying:
  979. preconditionFailure("Left in modifying state")
  980. }
  981. }
  982. mutating func send(
  983. status: GRPCStatus,
  984. trailers: HPACKHeaders,
  985. promise: EventLoopPromise<Void>?
  986. ) -> HTTP2ToRawGRPCStateMachine.Action {
  987. switch self {
  988. case .requestIdleResponseIdle:
  989. preconditionFailure("Invalid state: the request stream is still closed")
  990. case let .requestOpenResponseIdle(state):
  991. let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
  992. self = stateAndAction.state
  993. return stateAndAction.action
  994. case let .requestClosedResponseIdle(state):
  995. let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
  996. self = stateAndAction.state
  997. return stateAndAction.action
  998. case let .requestOpenResponseOpen(state):
  999. let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
  1000. self = stateAndAction.state
  1001. return stateAndAction.action
  1002. case let .requestClosedResponseOpen(state):
  1003. let stateAndAction = state.send(status: status, trailers: trailers, promise: promise)
  1004. self = stateAndAction.state
  1005. return stateAndAction.action
  1006. case .requestClosedResponseClosed:
  1007. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  1008. case ._modifying:
  1009. preconditionFailure("Left in modifying state")
  1010. }
  1011. }
  1012. }
  1013. // MARK: - Helpers
  1014. extension HTTP2ToRawGRPCStateMachine {
  1015. static func makeResponseHeaders(
  1016. contentType: ContentType,
  1017. responseEncoding: String?,
  1018. acceptableRequestEncoding: String?,
  1019. userProvidedHeaders: HPACKHeaders,
  1020. normalizeUserProvidedHeaders: Bool
  1021. ) -> HPACKHeaders {
  1022. // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and
  1023. // 'grpc-accept-encoding' as well.
  1024. let capacity = 4 + userProvidedHeaders.count
  1025. var headers = HPACKHeaders()
  1026. headers.reserveCapacity(capacity)
  1027. headers.add(name: ":status", value: "200")
  1028. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  1029. if let responseEncoding = responseEncoding {
  1030. headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
  1031. }
  1032. if let acceptEncoding = acceptableRequestEncoding {
  1033. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  1034. }
  1035. // Add user provided headers, normalizing if required.
  1036. headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  1037. return headers
  1038. }
  1039. static func makeResponseTrailersOnly(
  1040. for status: GRPCStatus,
  1041. contentType: ContentType,
  1042. acceptableRequestEncoding: String?,
  1043. userProvidedHeaders: HPACKHeaders?,
  1044. normalizeUserProvidedHeaders: Bool
  1045. ) -> HPACKHeaders {
  1046. // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back
  1047. // 'grpc-message' and 'grpc-accept-encoding'.
  1048. let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0)
  1049. var headers = HPACKHeaders()
  1050. headers.reserveCapacity(capacity)
  1051. // Add the required trailers.
  1052. headers.add(name: ":status", value: "200")
  1053. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  1054. headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1055. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1056. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  1057. }
  1058. // We may include this if the requested encoding was not valid.
  1059. if let acceptEncoding = acceptableRequestEncoding {
  1060. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  1061. }
  1062. if let userProvided = userProvidedHeaders {
  1063. headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders)
  1064. }
  1065. return headers
  1066. }
  1067. static func makeResponseTrailers(
  1068. for status: GRPCStatus,
  1069. userProvidedHeaders: HPACKHeaders,
  1070. normalizeUserProvidedHeaders: Bool
  1071. ) -> HPACKHeaders {
  1072. // 2 because 'grpc-status' is required, we may also send back 'grpc-message'.
  1073. let capacity = 2 + userProvidedHeaders.count
  1074. var trailers = HPACKHeaders()
  1075. trailers.reserveCapacity(capacity)
  1076. // status code.
  1077. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1078. // status message, if present.
  1079. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1080. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  1081. }
  1082. // user provided trailers.
  1083. trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  1084. return trailers
  1085. }
  1086. }
  1087. private extension HPACKHeaders {
  1088. mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) {
  1089. if normalize {
  1090. self.add(contentsOf: other.lazy.map { name, value, indexable in
  1091. (name: name.lowercased(), value: value, indexable: indexable)
  1092. })
  1093. } else {
  1094. self.add(contentsOf: other)
  1095. }
  1096. }
  1097. }