HTTP2ToRawGRPCStateMachine.swift 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. struct HTTP2ToRawGRPCStateMachine {
  21. /// The current state.
  22. private var state: State
  23. /// Temporarily sets `self.state` to `._modifying` before calling the provided block and setting
  24. /// `self.state` to the `State` modified by the block.
  25. ///
  26. /// Since we hold state as associated data on our `State` enum, any modification to that state
  27. /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
  28. /// to `._modifying` allows us to avoid an extra reference to any heap allocated data and
  29. /// therefore avoid a copy on write.
  30. @inlinable
  31. internal mutating func withStateAvoidingCoWs<Action>(_ body: (inout State) -> Action) -> Action {
  32. var state: State = ._modifying
  33. swap(&self.state, &state)
  34. defer {
  35. swap(&self.state, &state)
  36. }
  37. return body(&state)
  38. }
  39. internal init(
  40. services: [Substring: CallHandlerProvider],
  41. encoding: ServerMessageEncoding,
  42. normalizeHeaders: Bool = true
  43. ) {
  44. let state = RequestIdleResponseIdleState(
  45. services: services,
  46. encoding: encoding,
  47. normalizeHeaders: normalizeHeaders
  48. )
  49. self.state = .requestIdleResponseIdle(state)
  50. }
  51. }
  52. extension HTTP2ToRawGRPCStateMachine {
  53. enum State {
  54. // Both peers are idle. Nothing has happened to the stream.
  55. case requestIdleResponseIdle(RequestIdleResponseIdleState)
  56. // Received valid headers. Nothing has been sent in response.
  57. case requestOpenResponseIdle(RequestOpenResponseIdleState)
  58. // Received valid headers and request(s). Response headers have been sent.
  59. case requestOpenResponseOpen(RequestOpenResponseOpenState)
  60. // The request stream is closed. Nothing has been sent in response.
  61. case requestClosedResponseIdle(RequestClosedResponseIdleState)
  62. // The request stream is closed. Response headers have been sent.
  63. case requestClosedResponseOpen(RequestClosedResponseOpenState)
  64. // Both streams are closed. This state is terminal.
  65. case requestClosedResponseClosed
  66. // Not a real state. See 'withStateAvoidingCoWs'.
  67. case _modifying
  68. }
  69. struct RequestIdleResponseIdleState {
  70. /// The service providers, keyed by service name.
  71. var services: [Substring: CallHandlerProvider]
  72. /// The encoding configuration for this server.
  73. var encoding: ServerMessageEncoding
  74. /// Whether to normalize user-provided metadata.
  75. var normalizeHeaders: Bool
  76. }
  77. struct RequestOpenResponseIdleState {
  78. /// A length prefixed message reader for request messages.
  79. var reader: LengthPrefixedMessageReader
  80. /// A length prefixed message writer for response messages.
  81. var writer: LengthPrefixedMessageWriter
  82. /// The content type of the RPC.
  83. var contentType: ContentType
  84. /// An accept encoding header to send in the response headers indicating the message encoding
  85. /// that the server supports.
  86. var acceptEncoding: String?
  87. /// A message encoding header to send in the response headers indicating the encoding which will
  88. /// be used for responses.
  89. var responseEncoding: String?
  90. /// Whether to normalize user-provided metadata.
  91. var normalizeHeaders: Bool
  92. /// The pipeline configuration state.
  93. var configurationState: ConfigurationState
  94. }
  95. struct RequestClosedResponseIdleState {
  96. /// A length prefixed message reader for request messages.
  97. var reader: LengthPrefixedMessageReader
  98. /// A length prefixed message writer for response messages.
  99. var writer: LengthPrefixedMessageWriter
  100. /// The content type of the RPC.
  101. var contentType: ContentType
  102. /// An accept encoding header to send in the response headers indicating the message encoding
  103. /// that the server supports.
  104. var acceptEncoding: String?
  105. /// A message encoding header to send in the response headers indicating the encoding which will
  106. /// be used for responses.
  107. var responseEncoding: String?
  108. /// Whether to normalize user-provided metadata.
  109. var normalizeHeaders: Bool
  110. /// The pipeline configuration state.
  111. var configurationState: ConfigurationState
  112. init(from state: RequestOpenResponseIdleState) {
  113. self.reader = state.reader
  114. self.writer = state.writer
  115. self.contentType = state.contentType
  116. self.acceptEncoding = state.acceptEncoding
  117. self.responseEncoding = state.responseEncoding
  118. self.normalizeHeaders = state.normalizeHeaders
  119. self.configurationState = state.configurationState
  120. }
  121. }
  122. struct RequestOpenResponseOpenState {
  123. /// A length prefixed message reader for request messages.
  124. var reader: LengthPrefixedMessageReader
  125. /// A length prefixed message writer for response messages.
  126. var writer: LengthPrefixedMessageWriter
  127. /// Whether to normalize user-provided metadata.
  128. var normalizeHeaders: Bool
  129. init(from state: RequestOpenResponseIdleState) {
  130. self.reader = state.reader
  131. self.writer = state.writer
  132. self.normalizeHeaders = state.normalizeHeaders
  133. }
  134. }
  135. struct RequestClosedResponseOpenState {
  136. /// A length prefixed message reader for request messages.
  137. var reader: LengthPrefixedMessageReader
  138. /// A length prefixed message writer for response messages.
  139. var writer: LengthPrefixedMessageWriter
  140. /// Whether to normalize user-provided metadata.
  141. var normalizeHeaders: Bool
  142. init(from state: RequestOpenResponseOpenState) {
  143. self.reader = state.reader
  144. self.writer = state.writer
  145. self.normalizeHeaders = state.normalizeHeaders
  146. }
  147. init(from state: RequestClosedResponseIdleState) {
  148. self.reader = state.reader
  149. self.writer = state.writer
  150. self.normalizeHeaders = state.normalizeHeaders
  151. }
  152. }
  153. /// The pipeline configuration state.
  154. enum ConfigurationState {
  155. /// The pipeline is being configured. Any message data will be buffered into an appropriate
  156. /// message reader.
  157. case configuring(HPACKHeaders)
  158. /// The pipeline is configured.
  159. case configured
  160. /// Returns true if the configuration is in the `.configured` state.
  161. var isConfigured: Bool {
  162. switch self {
  163. case .configuring:
  164. return false
  165. case .configured:
  166. return true
  167. }
  168. }
  169. /// Configuration has completed.
  170. mutating func configured() -> HPACKHeaders {
  171. switch self {
  172. case .configured:
  173. preconditionFailure("Invalid state: already configured")
  174. case let .configuring(headers):
  175. self = .configured
  176. return headers
  177. }
  178. }
  179. }
  180. }
  181. extension HTTP2ToRawGRPCStateMachine {
  182. enum PipelineConfiguredAction {
  183. /// Forward the given headers.
  184. case forwardHeaders(HPACKHeaders)
  185. /// Forward the given headers and try reading the next message.
  186. case forwardHeadersAndRead(HPACKHeaders)
  187. }
  188. enum ReceiveHeadersAction {
  189. /// Configure the RPC to use the given server handler.
  190. case configure(GRPCServerHandlerProtocol)
  191. /// Reject the RPC by writing out the given headers and setting end-stream.
  192. case rejectRPC(HPACKHeaders)
  193. }
  194. enum ReadNextMessageAction {
  195. /// Do nothing.
  196. case none
  197. /// Forward the buffer.
  198. case forwardMessage(ByteBuffer)
  199. /// Forward the buffer and an 'end' of stream request part.
  200. case forwardMessageAndEnd(ByteBuffer)
  201. /// Forward the buffer and try reading the next message.
  202. case forwardMessageThenReadNextMessage(ByteBuffer)
  203. /// Forward the 'end' of stream request part.
  204. case forwardEnd
  205. /// Throw an error down the pipeline.
  206. case errorCaught(Error)
  207. }
  208. struct StateAndReceiveHeadersAction {
  209. /// The next state.
  210. var state: State
  211. /// The action to take.
  212. var action: ReceiveHeadersAction
  213. }
  214. struct StateAndReceiveDataAction {
  215. /// The next state.
  216. var state: State
  217. /// Whether the caller should try reading the next message.
  218. var tryReading: Bool
  219. }
  220. }
  221. // MARK: Receive Headers
  222. // This is the only state in which we can receive headers.
  223. extension HTTP2ToRawGRPCStateMachine.RequestIdleResponseIdleState {
  224. func receive(
  225. headers: HPACKHeaders,
  226. eventLoop: EventLoop,
  227. errorDelegate: ServerErrorDelegate?,
  228. remoteAddress: SocketAddress?,
  229. logger: Logger,
  230. allocator: ByteBufferAllocator,
  231. responseWriter: GRPCServerResponseWriter,
  232. closeFuture: EventLoopFuture<Void>
  233. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  234. // Extract and validate the content type. If it's nil we need to close.
  235. guard let contentType = self.extractContentType(from: headers) else {
  236. return self.unsupportedContentType()
  237. }
  238. // Now extract the request message encoding and setup an appropriate message reader.
  239. // We may send back a list of acceptable request message encodings as well.
  240. let reader: LengthPrefixedMessageReader
  241. let acceptableRequestEncoding: String?
  242. switch self.extractRequestEncoding(from: headers) {
  243. case let .valid(messageReader, acceptEncodingHeader):
  244. reader = messageReader
  245. acceptableRequestEncoding = acceptEncodingHeader
  246. case let .invalid(status, acceptableRequestEncoding):
  247. return self.invalidRequestEncoding(
  248. status: status,
  249. acceptableRequestEncoding: acceptableRequestEncoding,
  250. contentType: contentType
  251. )
  252. }
  253. // Figure out which encoding we should use for responses.
  254. let (writer, responseEncoding) = self.extractResponseEncoding(from: headers)
  255. // Parse the path, and create a call handler.
  256. guard let path = headers.first(name: ":path") else {
  257. return self.methodNotImplemented("", contentType: contentType)
  258. }
  259. guard let callPath = CallPath(requestURI: path),
  260. let service = self.services[Substring(callPath.service)] else {
  261. return self.methodNotImplemented(path, contentType: contentType)
  262. }
  263. // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with,
  264. // some of which is exposed to service providers.
  265. let context = CallHandlerContext(
  266. errorDelegate: errorDelegate,
  267. logger: logger,
  268. encoding: self.encoding,
  269. eventLoop: eventLoop,
  270. path: path,
  271. remoteAddress: remoteAddress,
  272. responseWriter: responseWriter,
  273. allocator: allocator,
  274. closeFuture: closeFuture
  275. )
  276. // We have a matching service, hopefully we have a provider for the method too.
  277. let method = Substring(callPath.method)
  278. if let handler = service.handle(method: method, context: context) {
  279. let nextState = HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState(
  280. reader: reader,
  281. writer: writer,
  282. contentType: contentType,
  283. acceptEncoding: acceptableRequestEncoding,
  284. responseEncoding: responseEncoding,
  285. normalizeHeaders: self.normalizeHeaders,
  286. configurationState: .configuring(headers)
  287. )
  288. return .init(
  289. state: .requestOpenResponseIdle(nextState),
  290. action: .configure(handler)
  291. )
  292. } else {
  293. return self.methodNotImplemented(path, contentType: contentType)
  294. }
  295. }
  296. /// The 'content-type' is not supported; close with status code 415.
  297. private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  298. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  299. //
  300. // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
  301. // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
  302. // clients from interpreting a gRPC error response, which uses status 200 (OK), as
  303. // successful.
  304. let trailers = HPACKHeaders([(":status", "415")])
  305. return .init(
  306. state: .requestClosedResponseClosed,
  307. action: .rejectRPC(trailers)
  308. )
  309. }
  310. /// The RPC method is not implemented. Close with an appropriate status.
  311. private func methodNotImplemented(
  312. _ path: String,
  313. contentType: ContentType
  314. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  315. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  316. for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"),
  317. contentType: contentType,
  318. acceptableRequestEncoding: nil,
  319. userProvidedHeaders: nil,
  320. normalizeUserProvidedHeaders: self.normalizeHeaders
  321. )
  322. return .init(
  323. state: .requestClosedResponseClosed,
  324. action: .rejectRPC(trailers)
  325. )
  326. }
  327. /// The request encoding specified by the client is not supported. Close with an appropriate
  328. /// status.
  329. private func invalidRequestEncoding(
  330. status: GRPCStatus,
  331. acceptableRequestEncoding: String?,
  332. contentType: ContentType
  333. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  334. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  335. for: status,
  336. contentType: contentType,
  337. acceptableRequestEncoding: acceptableRequestEncoding,
  338. userProvidedHeaders: nil,
  339. normalizeUserProvidedHeaders: self.normalizeHeaders
  340. )
  341. return .init(
  342. state: .requestClosedResponseClosed,
  343. action: .rejectRPC(trailers)
  344. )
  345. }
  346. /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the
  347. /// request message encoding is not supported.
  348. ///
  349. /// - Parameters:
  350. /// - encoding: The unsupported request message encoding sent by the client.
  351. /// - acceptable: The list if acceptable request message encoding the client may use.
  352. /// - Returns: The status and trailers to return to the client.
  353. private func makeStatusAndTrailersForUnsupportedEncoding(
  354. _ encoding: String,
  355. advertisedEncoding: [String]
  356. ) -> (GRPCStatus, acceptEncoding: String?) {
  357. let status: GRPCStatus
  358. let acceptEncoding: String?
  359. if advertisedEncoding.isEmpty {
  360. // No compression is supported; there's nothing to tell the client about.
  361. status = GRPCStatus(code: .unimplemented, message: "compression is not supported")
  362. acceptEncoding = nil
  363. } else {
  364. // Return a list of supported encodings which we advertise. (The list we advertise may be a
  365. // subset of the encodings we support.)
  366. acceptEncoding = advertisedEncoding.joined(separator: ",")
  367. status = GRPCStatus(
  368. code: .unimplemented,
  369. message: "\(encoding) compression is not supported, supported algorithms are " +
  370. "listed in '\(GRPCHeaderName.acceptEncoding)'"
  371. )
  372. }
  373. return (status, acceptEncoding)
  374. }
  375. /// Extract and validate the 'content-type' sent by the client.
  376. /// - Parameter headers: The headers to extract the 'content-type' from
  377. private func extractContentType(from headers: HPACKHeaders) -> ContentType? {
  378. return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  379. }
  380. /// The result of validating the request encoding header.
  381. private enum RequestEncodingValidation {
  382. /// The encoding was valid.
  383. case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?)
  384. /// The encoding was invalid, the RPC should be terminated with this status.
  385. case invalid(status: GRPCStatus, acceptEncoding: String?)
  386. }
  387. /// Extract and validate the request message encoding header.
  388. /// - Parameters:
  389. /// - headers: The headers to extract the message encoding header from.
  390. /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests
  391. /// and an accept encoding response header if the request encoding was valid, or a pair of
  392. /// `GRPCStatus` and trailers to close the RPC with.
  393. private func extractRequestEncoding(from headers: HPACKHeaders) -> RequestEncodingValidation {
  394. let encodings = headers[canonicalForm: GRPCHeaderName.encoding]
  395. // Fail if there's more than one encoding header.
  396. if encodings.count > 1 {
  397. let status = GRPCStatus(
  398. code: .invalidArgument,
  399. message: "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'"
  400. )
  401. return .invalid(status: status, acceptEncoding: nil)
  402. }
  403. let encodingHeader = encodings.first
  404. let result: RequestEncodingValidation
  405. let validator = MessageEncodingHeaderValidator(encoding: self.encoding)
  406. switch validator.validate(requestEncoding: encodingHeader) {
  407. case let .supported(algorithm, decompressionLimit, acceptEncoding):
  408. // Request message encoding is valid and supported.
  409. result = .valid(
  410. messageReader: LengthPrefixedMessageReader(
  411. compression: algorithm,
  412. decompressionLimit: decompressionLimit
  413. ),
  414. acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",")
  415. )
  416. case .noCompression:
  417. // No message encoding header was present. This means no compression is being used.
  418. result = .valid(
  419. messageReader: LengthPrefixedMessageReader(),
  420. acceptEncoding: nil
  421. )
  422. case let .unsupported(encoding, acceptable):
  423. // Request encoding is not supported.
  424. let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding(
  425. encoding,
  426. advertisedEncoding: acceptable
  427. )
  428. result = .invalid(status: status, acceptEncoding: acceptEncoding)
  429. }
  430. return result
  431. }
  432. /// Extract a suitable message encoding for responses.
  433. /// - Parameters:
  434. /// - headers: The headers to extract the acceptable response message encoding from.
  435. /// - configuration: The encoding configuration for the server.
  436. /// - Returns: A message writer and the response encoding header to send back to the client.
  437. private func extractResponseEncoding(
  438. from headers: HPACKHeaders
  439. ) -> (LengthPrefixedMessageWriter, String?) {
  440. let writer: LengthPrefixedMessageWriter
  441. let responseEncoding: String?
  442. switch self.encoding {
  443. case let .enabled(configuration):
  444. // Extract the encodings acceptable to the client for response messages.
  445. let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding]
  446. // Select the first algorithm that we support and have enabled. If we don't find one then we
  447. // won't compress response messages.
  448. let algorithm = acceptableResponseEncoding.lazy.compactMap { value in
  449. CompressionAlgorithm(rawValue: value)
  450. }.first {
  451. configuration.enabledAlgorithms.contains($0)
  452. }
  453. writer = LengthPrefixedMessageWriter(compression: algorithm)
  454. responseEncoding = algorithm?.name
  455. case .disabled:
  456. // The server doesn't have compression enabled.
  457. writer = LengthPrefixedMessageWriter(compression: .none)
  458. responseEncoding = nil
  459. }
  460. return (writer, responseEncoding)
  461. }
  462. }
  463. // MARK: - Receive Data
  464. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  465. mutating func receive(
  466. buffer: inout ByteBuffer,
  467. endStream: Bool
  468. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
  469. // Append the bytes to the reader.
  470. self.reader.append(buffer: &buffer)
  471. let state: HTTP2ToRawGRPCStateMachine.State
  472. let tryReading: Bool
  473. switch (self.configurationState.isConfigured, endStream) {
  474. case (true, true):
  475. /// Configured and end stream: read from the buffer, end will be sent as a result of draining
  476. /// the reader in the next state.
  477. state = .requestClosedResponseIdle(.init(from: self))
  478. tryReading = true
  479. case (true, false):
  480. /// Configured but not end stream, just read from the buffer.
  481. state = .requestOpenResponseIdle(self)
  482. tryReading = true
  483. case (false, true):
  484. // Not configured yet, but end of stream. Request stream is now closed but there's no point
  485. // reading yet.
  486. state = .requestClosedResponseIdle(.init(from: self))
  487. tryReading = false
  488. case (false, false):
  489. // Not configured yet, not end stream. No point reading a message yet since we don't have
  490. // anywhere to deliver it.
  491. state = .requestOpenResponseIdle(self)
  492. tryReading = false
  493. }
  494. return .init(state: state, tryReading: tryReading)
  495. }
  496. }
  497. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  498. mutating func receive(
  499. buffer: inout ByteBuffer,
  500. endStream: Bool
  501. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
  502. self.reader.append(buffer: &buffer)
  503. let state: HTTP2ToRawGRPCStateMachine.State
  504. if endStream {
  505. // End stream, so move to the closed state. Any end of request stream events events will
  506. // happen as a result of reading from the closed state.
  507. state = .requestClosedResponseOpen(.init(from: self))
  508. } else {
  509. state = .requestOpenResponseOpen(self)
  510. }
  511. return .init(state: state, tryReading: true)
  512. }
  513. }
  514. // MARK: - Send Headers
  515. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  516. func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
  517. return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  518. contentType: self.contentType,
  519. responseEncoding: self.responseEncoding,
  520. acceptableRequestEncoding: self.acceptEncoding,
  521. userProvidedHeaders: userProvidedHeaders,
  522. normalizeUserProvidedHeaders: self.normalizeHeaders
  523. )
  524. }
  525. }
  526. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  527. func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
  528. return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  529. contentType: self.contentType,
  530. responseEncoding: self.responseEncoding,
  531. acceptableRequestEncoding: self.acceptEncoding,
  532. userProvidedHeaders: userProvidedHeaders,
  533. normalizeUserProvidedHeaders: self.normalizeHeaders
  534. )
  535. }
  536. }
  537. // MARK: - Send Data
  538. extension HTTP2ToRawGRPCStateMachine {
  539. static func writeGRPCFramedMessage(
  540. _ buffer: ByteBuffer,
  541. compress: Bool,
  542. allocator: ByteBufferAllocator,
  543. writer: LengthPrefixedMessageWriter
  544. ) -> Result<ByteBuffer, Error> {
  545. do {
  546. let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
  547. return .success(prefixed)
  548. } catch {
  549. return .failure(error)
  550. }
  551. }
  552. }
  553. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  554. func send(
  555. buffer: ByteBuffer,
  556. allocator: ByteBufferAllocator,
  557. compress: Bool
  558. ) -> Result<ByteBuffer, Error> {
  559. return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  560. buffer,
  561. compress: compress,
  562. allocator: allocator,
  563. writer: self.writer
  564. )
  565. }
  566. }
  567. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  568. func send(
  569. buffer: ByteBuffer,
  570. allocator: ByteBufferAllocator,
  571. compress: Bool
  572. ) -> Result<ByteBuffer, Error> {
  573. return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  574. buffer,
  575. compress: compress,
  576. allocator: allocator,
  577. writer: self.writer
  578. )
  579. }
  580. }
  581. // MARK: - Send End
  582. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  583. func send(
  584. status: GRPCStatus,
  585. trailers userProvidedTrailers: HPACKHeaders
  586. ) -> HPACKHeaders {
  587. return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  588. for: status,
  589. contentType: self.contentType,
  590. acceptableRequestEncoding: self.acceptEncoding,
  591. userProvidedHeaders: userProvidedTrailers,
  592. normalizeUserProvidedHeaders: self.normalizeHeaders
  593. )
  594. }
  595. }
  596. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  597. func send(
  598. status: GRPCStatus,
  599. trailers userProvidedTrailers: HPACKHeaders
  600. ) -> HPACKHeaders {
  601. return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  602. for: status,
  603. contentType: self.contentType,
  604. acceptableRequestEncoding: self.acceptEncoding,
  605. userProvidedHeaders: userProvidedTrailers,
  606. normalizeUserProvidedHeaders: self.normalizeHeaders
  607. )
  608. }
  609. }
  610. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  611. func send(
  612. status: GRPCStatus,
  613. trailers userProvidedTrailers: HPACKHeaders
  614. ) -> HPACKHeaders {
  615. return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  616. for: status,
  617. userProvidedHeaders: userProvidedTrailers,
  618. normalizeUserProvidedHeaders: true
  619. )
  620. }
  621. }
  622. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  623. func send(
  624. status: GRPCStatus,
  625. trailers userProvidedTrailers: HPACKHeaders
  626. ) -> HPACKHeaders {
  627. return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  628. for: status,
  629. userProvidedHeaders: userProvidedTrailers,
  630. normalizeUserProvidedHeaders: true
  631. )
  632. }
  633. }
  634. // MARK: - Pipeline Configured
  635. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  636. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  637. let headers = self.configurationState.configured()
  638. let action: HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction
  639. // If there are unprocessed bytes then we need to read messages as well.
  640. let hasUnprocessedBytes = self.reader.unprocessedBytes != 0
  641. if hasUnprocessedBytes {
  642. // If there are unprocessed bytes, we need to try to read after sending the metadata.
  643. action = .forwardHeadersAndRead(headers)
  644. } else {
  645. // No unprocessed bytes; the reader is empty. Just send the metadata.
  646. action = .forwardHeaders(headers)
  647. }
  648. return action
  649. }
  650. }
  651. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  652. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  653. let headers = self.configurationState.configured()
  654. // Since we're already closed, we need to forward the headers and start reading.
  655. return .forwardHeadersAndRead(headers)
  656. }
  657. }
  658. // MARK: - Read Next Request
  659. extension HTTP2ToRawGRPCStateMachine {
  660. static func read(
  661. from reader: inout LengthPrefixedMessageReader,
  662. requestStreamClosed: Bool
  663. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  664. do {
  665. // Try to read a message.
  666. guard let buffer = try reader.nextMessage() else {
  667. // We didn't read a message: if we're closed then there's no chance of receiving more bytes,
  668. // just forward the end of stream. If we're not closed then we could receive more bytes so
  669. // there's no need to take any action at this point.
  670. return requestStreamClosed ? .forwardEnd : .none
  671. }
  672. guard reader.unprocessedBytes == 0 else {
  673. // There are still unprocessed bytes, continue reading.
  674. return .forwardMessageThenReadNextMessage(buffer)
  675. }
  676. // If we're closed and there's nothing left to read, then we're done, forward the message and
  677. // end of stream. If we're closed we could still receive more bytes (or end stream) so just
  678. // forward the message.
  679. return requestStreamClosed ? .forwardMessageAndEnd(buffer) : .forwardMessage(buffer)
  680. } catch {
  681. return .errorCaught(error)
  682. }
  683. }
  684. }
  685. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  686. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  687. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
  688. }
  689. }
  690. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  691. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  692. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
  693. }
  694. }
  695. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  696. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  697. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
  698. }
  699. }
  700. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  701. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  702. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
  703. }
  704. }
  705. // MARK: - Top Level State Changes
  706. extension HTTP2ToRawGRPCStateMachine {
  707. /// Receive request headers.
  708. mutating func receive(
  709. headers: HPACKHeaders,
  710. eventLoop: EventLoop,
  711. errorDelegate: ServerErrorDelegate?,
  712. remoteAddress: SocketAddress?,
  713. logger: Logger,
  714. allocator: ByteBufferAllocator,
  715. responseWriter: GRPCServerResponseWriter,
  716. closeFuture: EventLoopFuture<Void>
  717. ) -> ReceiveHeadersAction {
  718. return self.withStateAvoidingCoWs { state in
  719. state.receive(
  720. headers: headers,
  721. eventLoop: eventLoop,
  722. errorDelegate: errorDelegate,
  723. remoteAddress: remoteAddress,
  724. logger: logger,
  725. allocator: allocator,
  726. responseWriter: responseWriter,
  727. closeFuture: closeFuture
  728. )
  729. }
  730. }
  731. /// Receive request buffer.
  732. /// - Parameters:
  733. /// - buffer: The received buffer.
  734. /// - endStream: Whether end stream was set.
  735. /// - Returns: Returns whether the caller should try to read a message from the buffer.
  736. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> Bool {
  737. return self.withStateAvoidingCoWs { state in
  738. state.receive(buffer: &buffer, endStream: endStream)
  739. }
  740. }
  741. /// Send response headers.
  742. mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
  743. return self.withStateAvoidingCoWs { state in
  744. state.send(headers: headers)
  745. }
  746. }
  747. /// Send a response buffer.
  748. func send(
  749. buffer: ByteBuffer,
  750. allocator: ByteBufferAllocator,
  751. compress: Bool
  752. ) -> Result<ByteBuffer, Error> {
  753. return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
  754. }
  755. /// Send status and trailers.
  756. mutating func send(
  757. status: GRPCStatus,
  758. trailers: HPACKHeaders
  759. ) -> Result<HPACKHeaders, Error> {
  760. return self.withStateAvoidingCoWs { state in
  761. state.send(status: status, trailers: trailers)
  762. }
  763. }
  764. /// The pipeline has been configured with a service provider.
  765. mutating func pipelineConfigured() -> PipelineConfiguredAction {
  766. return self.withStateAvoidingCoWs { state in
  767. state.pipelineConfigured()
  768. }
  769. }
  770. /// Try to read a request message.
  771. mutating func readNextRequest() -> ReadNextMessageAction {
  772. return self.withStateAvoidingCoWs { state in
  773. state.readNextRequest()
  774. }
  775. }
  776. }
  777. extension HTTP2ToRawGRPCStateMachine.State {
  778. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  779. switch self {
  780. case .requestIdleResponseIdle:
  781. preconditionFailure("Invalid state: pipeline configured before receiving request headers")
  782. case var .requestOpenResponseIdle(state):
  783. let action = state.pipelineConfigured()
  784. self = .requestOpenResponseIdle(state)
  785. return action
  786. case var .requestClosedResponseIdle(state):
  787. let action = state.pipelineConfigured()
  788. self = .requestClosedResponseIdle(state)
  789. return action
  790. case .requestOpenResponseOpen,
  791. .requestClosedResponseOpen,
  792. .requestClosedResponseClosed:
  793. preconditionFailure("Invalid state: response stream opened before pipeline was configured")
  794. case ._modifying:
  795. preconditionFailure("Left in modifying state")
  796. }
  797. }
  798. mutating func receive(
  799. headers: HPACKHeaders,
  800. eventLoop: EventLoop,
  801. errorDelegate: ServerErrorDelegate?,
  802. remoteAddress: SocketAddress?,
  803. logger: Logger,
  804. allocator: ByteBufferAllocator,
  805. responseWriter: GRPCServerResponseWriter,
  806. closeFuture: EventLoopFuture<Void>
  807. ) -> HTTP2ToRawGRPCStateMachine.ReceiveHeadersAction {
  808. switch self {
  809. // This is the only state in which we can receive headers. Everything else is invalid.
  810. case let .requestIdleResponseIdle(state):
  811. let stateAndAction = state.receive(
  812. headers: headers,
  813. eventLoop: eventLoop,
  814. errorDelegate: errorDelegate,
  815. remoteAddress: remoteAddress,
  816. logger: logger,
  817. allocator: allocator,
  818. responseWriter: responseWriter,
  819. closeFuture: closeFuture
  820. )
  821. self = stateAndAction.state
  822. return stateAndAction.action
  823. // We can't receive headers in any of these states.
  824. case .requestOpenResponseIdle,
  825. .requestOpenResponseOpen,
  826. .requestClosedResponseIdle,
  827. .requestClosedResponseOpen,
  828. .requestClosedResponseClosed:
  829. preconditionFailure("Invalid state")
  830. case ._modifying:
  831. preconditionFailure("Left in modifying state")
  832. }
  833. }
  834. /// Receive a buffer from the client.
  835. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> Bool {
  836. switch self {
  837. case .requestIdleResponseIdle:
  838. /// This isn't allowed: we must receive the request headers first.
  839. preconditionFailure("Invalid state")
  840. case var .requestOpenResponseIdle(state):
  841. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  842. self = stateAndAction.state
  843. return stateAndAction.tryReading
  844. case var .requestOpenResponseOpen(state):
  845. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  846. self = stateAndAction.state
  847. return stateAndAction.tryReading
  848. case .requestClosedResponseIdle,
  849. .requestClosedResponseOpen:
  850. preconditionFailure("Invalid state: the request stream is already closed")
  851. case .requestClosedResponseClosed:
  852. // This is okay: we could have closed before receiving end.
  853. return false
  854. case ._modifying:
  855. preconditionFailure("Left in modifying state")
  856. }
  857. }
  858. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  859. switch self {
  860. case .requestIdleResponseIdle:
  861. preconditionFailure("Invalid state")
  862. case var .requestOpenResponseIdle(state):
  863. let action = state.readNextRequest()
  864. self = .requestOpenResponseIdle(state)
  865. return action
  866. case var .requestOpenResponseOpen(state):
  867. let action = state.readNextRequest()
  868. self = .requestOpenResponseOpen(state)
  869. return action
  870. case var .requestClosedResponseIdle(state):
  871. let action = state.readNextRequest()
  872. self = .requestClosedResponseIdle(state)
  873. return action
  874. case var .requestClosedResponseOpen(state):
  875. let action = state.readNextRequest()
  876. self = .requestClosedResponseOpen(state)
  877. return action
  878. case .requestClosedResponseClosed:
  879. return .none
  880. case ._modifying:
  881. preconditionFailure("Left in modifying state")
  882. }
  883. }
  884. mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
  885. switch self {
  886. case .requestIdleResponseIdle:
  887. preconditionFailure("Invalid state: the request stream isn't open")
  888. case let .requestOpenResponseIdle(state):
  889. let headers = state.send(headers: headers)
  890. self = .requestOpenResponseOpen(.init(from: state))
  891. return .success(headers)
  892. case let .requestClosedResponseIdle(state):
  893. let headers = state.send(headers: headers)
  894. self = .requestClosedResponseOpen(.init(from: state))
  895. return .success(headers)
  896. case .requestOpenResponseOpen,
  897. .requestClosedResponseOpen,
  898. .requestClosedResponseClosed:
  899. return .failure(GRPCError.AlreadyComplete())
  900. case ._modifying:
  901. preconditionFailure("Left in modifying state")
  902. }
  903. }
  904. func send(
  905. buffer: ByteBuffer,
  906. allocator: ByteBufferAllocator,
  907. compress: Bool
  908. ) -> Result<ByteBuffer, Error> {
  909. switch self {
  910. case .requestIdleResponseIdle:
  911. preconditionFailure("Invalid state: the request stream is still closed")
  912. case .requestOpenResponseIdle,
  913. .requestClosedResponseIdle:
  914. let error = GRPCError.InvalidState("Response headers must be sent before response message")
  915. return .failure(error)
  916. case let .requestOpenResponseOpen(state):
  917. return state.send(
  918. buffer: buffer,
  919. allocator: allocator,
  920. compress: compress
  921. )
  922. case let .requestClosedResponseOpen(state):
  923. return state.send(
  924. buffer: buffer,
  925. allocator: allocator,
  926. compress: compress
  927. )
  928. case .requestClosedResponseClosed:
  929. return .failure(GRPCError.AlreadyComplete())
  930. case ._modifying:
  931. preconditionFailure("Left in modifying state")
  932. }
  933. }
  934. mutating func send(
  935. status: GRPCStatus,
  936. trailers: HPACKHeaders
  937. ) -> Result<HPACKHeaders, Error> {
  938. switch self {
  939. case .requestIdleResponseIdle:
  940. preconditionFailure("Invalid state: the request stream is still closed")
  941. case let .requestOpenResponseIdle(state):
  942. self = .requestClosedResponseClosed
  943. return .success(state.send(status: status, trailers: trailers))
  944. case let .requestClosedResponseIdle(state):
  945. self = .requestClosedResponseClosed
  946. return .success(state.send(status: status, trailers: trailers))
  947. case let .requestOpenResponseOpen(state):
  948. self = .requestClosedResponseClosed
  949. return .success(state.send(status: status, trailers: trailers))
  950. case let .requestClosedResponseOpen(state):
  951. self = .requestClosedResponseClosed
  952. return .success(state.send(status: status, trailers: trailers))
  953. case .requestClosedResponseClosed:
  954. return .failure(GRPCError.AlreadyComplete())
  955. case ._modifying:
  956. preconditionFailure("Left in modifying state")
  957. }
  958. }
  959. }
  960. // MARK: - Helpers
  961. extension HTTP2ToRawGRPCStateMachine {
  962. static func makeResponseHeaders(
  963. contentType: ContentType,
  964. responseEncoding: String?,
  965. acceptableRequestEncoding: String?,
  966. userProvidedHeaders: HPACKHeaders,
  967. normalizeUserProvidedHeaders: Bool
  968. ) -> HPACKHeaders {
  969. // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and
  970. // 'grpc-accept-encoding' as well.
  971. let capacity = 4 + userProvidedHeaders.count
  972. var headers = HPACKHeaders()
  973. headers.reserveCapacity(capacity)
  974. headers.add(name: ":status", value: "200")
  975. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  976. if let responseEncoding = responseEncoding {
  977. headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
  978. }
  979. if let acceptEncoding = acceptableRequestEncoding {
  980. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  981. }
  982. // Add user provided headers, normalizing if required.
  983. headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  984. return headers
  985. }
  986. static func makeResponseTrailersOnly(
  987. for status: GRPCStatus,
  988. contentType: ContentType,
  989. acceptableRequestEncoding: String?,
  990. userProvidedHeaders: HPACKHeaders?,
  991. normalizeUserProvidedHeaders: Bool
  992. ) -> HPACKHeaders {
  993. // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back
  994. // 'grpc-message' and 'grpc-accept-encoding'.
  995. let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0)
  996. var headers = HPACKHeaders()
  997. headers.reserveCapacity(capacity)
  998. // Add the required trailers.
  999. headers.add(name: ":status", value: "200")
  1000. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  1001. headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1002. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1003. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  1004. }
  1005. // We may include this if the requested encoding was not valid.
  1006. if let acceptEncoding = acceptableRequestEncoding {
  1007. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  1008. }
  1009. if let userProvided = userProvidedHeaders {
  1010. headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders)
  1011. }
  1012. return headers
  1013. }
  1014. static func makeResponseTrailers(
  1015. for status: GRPCStatus,
  1016. userProvidedHeaders: HPACKHeaders,
  1017. normalizeUserProvidedHeaders: Bool
  1018. ) -> HPACKHeaders {
  1019. // Most RPCs should end with status code 'ok' (hopefully!), and if the user didn't provide any
  1020. // additional trailers, then we can use a pre-canned set of headers to avoid an extra
  1021. // allocation.
  1022. if status == .ok, userProvidedHeaders.isEmpty {
  1023. return Self.gRPCStatusOkTrailers
  1024. }
  1025. // 2 because 'grpc-status' is required, we may also send back 'grpc-message'.
  1026. let capacity = 2 + userProvidedHeaders.count
  1027. var trailers = HPACKHeaders()
  1028. trailers.reserveCapacity(capacity)
  1029. // status code.
  1030. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1031. // status message, if present.
  1032. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1033. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  1034. }
  1035. // user provided trailers.
  1036. trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  1037. return trailers
  1038. }
  1039. private static let gRPCStatusOkTrailers: HPACKHeaders = [
  1040. GRPCHeaderName.statusCode: String(describing: GRPCStatus.Code.ok.rawValue),
  1041. ]
  1042. }
  1043. private extension HPACKHeaders {
  1044. mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) {
  1045. if normalize {
  1046. self.add(contentsOf: other.lazy.map { name, value, indexable in
  1047. (name: name.lowercased(), value: value, indexable: indexable)
  1048. })
  1049. } else {
  1050. self.add(contentsOf: other)
  1051. }
  1052. }
  1053. }