HTTP2ToRawGRPCStateMachine.swift 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. struct HTTP2ToRawGRPCStateMachine {
  21. /// The current state.
  22. private var state: State
  23. /// Temporarily sets `self.state` to `._modifying` before calling the provided block and setting
  24. /// `self.state` to the `State` modified by the block.
  25. ///
  26. /// Since we hold state as associated data on our `State` enum, any modification to that state
  27. /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
  28. /// to `._modifying` allows us to avoid an extra reference to any heap allocated data and
  29. /// therefore avoid a copy on write.
  30. @inlinable
  31. internal mutating func withStateAvoidingCoWs<Action>(_ body: (inout State) -> Action) -> Action {
  32. var state: State = ._modifying
  33. swap(&self.state, &state)
  34. defer {
  35. swap(&self.state, &state)
  36. }
  37. return body(&state)
  38. }
  39. internal init(
  40. services: [Substring: CallHandlerProvider],
  41. encoding: ServerMessageEncoding,
  42. normalizeHeaders: Bool = true
  43. ) {
  44. let state = RequestIdleResponseIdleState(
  45. services: services,
  46. encoding: encoding,
  47. normalizeHeaders: normalizeHeaders
  48. )
  49. self.state = .requestIdleResponseIdle(state)
  50. }
  51. }
  52. extension HTTP2ToRawGRPCStateMachine {
  53. enum State {
  54. // Both peers are idle. Nothing has happened to the stream.
  55. case requestIdleResponseIdle(RequestIdleResponseIdleState)
  56. // Received valid headers. Nothing has been sent in response.
  57. case requestOpenResponseIdle(RequestOpenResponseIdleState)
  58. // Received valid headers and request(s). Response headers have been sent.
  59. case requestOpenResponseOpen(RequestOpenResponseOpenState)
  60. // The request stream is closed. Nothing has been sent in response.
  61. case requestClosedResponseIdle(RequestClosedResponseIdleState)
  62. // The request stream is closed. Response headers have been sent.
  63. case requestClosedResponseOpen(RequestClosedResponseOpenState)
  64. // Both streams are closed. This state is terminal.
  65. case requestClosedResponseClosed
  66. // Not a real state. See 'withStateAvoidingCoWs'.
  67. case _modifying
  68. }
  69. struct RequestIdleResponseIdleState {
  70. /// The service providers, keyed by service name.
  71. var services: [Substring: CallHandlerProvider]
  72. /// The encoding configuration for this server.
  73. var encoding: ServerMessageEncoding
  74. /// Whether to normalize user-provided metadata.
  75. var normalizeHeaders: Bool
  76. }
  77. struct RequestOpenResponseIdleState {
  78. /// A length prefixed message reader for request messages.
  79. var reader: LengthPrefixedMessageReader
  80. /// A length prefixed message writer for response messages.
  81. var writer: LengthPrefixedMessageWriter
  82. /// The content type of the RPC.
  83. var contentType: ContentType
  84. /// An accept encoding header to send in the response headers indicating the message encoding
  85. /// that the server supports.
  86. var acceptEncoding: String?
  87. /// A message encoding header to send in the response headers indicating the encoding which will
  88. /// be used for responses.
  89. var responseEncoding: String?
  90. /// Whether to normalize user-provided metadata.
  91. var normalizeHeaders: Bool
  92. /// The pipeline configuration state.
  93. var configurationState: ConfigurationState
  94. }
  95. struct RequestClosedResponseIdleState {
  96. /// A length prefixed message reader for request messages.
  97. var reader: LengthPrefixedMessageReader
  98. /// A length prefixed message writer for response messages.
  99. var writer: LengthPrefixedMessageWriter
  100. /// The content type of the RPC.
  101. var contentType: ContentType
  102. /// An accept encoding header to send in the response headers indicating the message encoding
  103. /// that the server supports.
  104. var acceptEncoding: String?
  105. /// A message encoding header to send in the response headers indicating the encoding which will
  106. /// be used for responses.
  107. var responseEncoding: String?
  108. /// Whether to normalize user-provided metadata.
  109. var normalizeHeaders: Bool
  110. /// The pipeline configuration state.
  111. var configurationState: ConfigurationState
  112. init(from state: RequestOpenResponseIdleState) {
  113. self.reader = state.reader
  114. self.writer = state.writer
  115. self.contentType = state.contentType
  116. self.acceptEncoding = state.acceptEncoding
  117. self.responseEncoding = state.responseEncoding
  118. self.normalizeHeaders = state.normalizeHeaders
  119. self.configurationState = state.configurationState
  120. }
  121. }
  122. struct RequestOpenResponseOpenState {
  123. /// A length prefixed message reader for request messages.
  124. var reader: LengthPrefixedMessageReader
  125. /// A length prefixed message writer for response messages.
  126. var writer: LengthPrefixedMessageWriter
  127. /// Whether to normalize user-provided metadata.
  128. var normalizeHeaders: Bool
  129. init(from state: RequestOpenResponseIdleState) {
  130. self.reader = state.reader
  131. self.writer = state.writer
  132. self.normalizeHeaders = state.normalizeHeaders
  133. }
  134. }
  135. struct RequestClosedResponseOpenState {
  136. /// A length prefixed message reader for request messages.
  137. var reader: LengthPrefixedMessageReader
  138. /// A length prefixed message writer for response messages.
  139. var writer: LengthPrefixedMessageWriter
  140. /// Whether to normalize user-provided metadata.
  141. var normalizeHeaders: Bool
  142. init(from state: RequestOpenResponseOpenState) {
  143. self.reader = state.reader
  144. self.writer = state.writer
  145. self.normalizeHeaders = state.normalizeHeaders
  146. }
  147. init(from state: RequestClosedResponseIdleState) {
  148. self.reader = state.reader
  149. self.writer = state.writer
  150. self.normalizeHeaders = state.normalizeHeaders
  151. }
  152. }
  153. /// The pipeline configuration state.
  154. enum ConfigurationState {
  155. /// The pipeline is being configured. Any message data will be buffered into an appropriate
  156. /// message reader.
  157. case configuring(HPACKHeaders)
  158. /// The pipeline is configured.
  159. case configured
  160. /// Returns true if the configuration is in the `.configured` state.
  161. var isConfigured: Bool {
  162. switch self {
  163. case .configuring:
  164. return false
  165. case .configured:
  166. return true
  167. }
  168. }
  169. /// Configuration has completed.
  170. mutating func configured() -> HPACKHeaders {
  171. switch self {
  172. case .configured:
  173. preconditionFailure("Invalid state: already configured")
  174. case let .configuring(headers):
  175. self = .configured
  176. return headers
  177. }
  178. }
  179. }
  180. }
  181. extension HTTP2ToRawGRPCStateMachine {
  182. enum PipelineConfiguredAction {
  183. /// Forward the given headers.
  184. case forwardHeaders(HPACKHeaders)
  185. /// Forward the given headers and try reading the next message.
  186. case forwardHeadersAndRead(HPACKHeaders)
  187. }
  188. enum ReceiveHeadersAction {
  189. /// Configure the pipeline with the given call handler.
  190. case configureLegacy(GRPCCallHandler)
  191. /// Configure the RPC to use the given server handler.
  192. case configure(GRPCServerHandlerProtocol)
  193. /// Reject the RPC by writing out the given headers and setting end-stream.
  194. case rejectRPC(HPACKHeaders)
  195. }
  196. enum ReadNextMessageAction {
  197. /// Do nothing.
  198. case none
  199. /// Forward the buffer.
  200. case forwardMessage(ByteBuffer)
  201. /// Forward the buffer and an 'end' of stream request part.
  202. case forwardMessageAndEnd(ByteBuffer)
  203. /// Forward the buffer and try reading the next message.
  204. case forwardMessageThenReadNextMessage(ByteBuffer)
  205. /// Forward the 'end' of stream request part.
  206. case forwardEnd
  207. /// Throw an error down the pipeline.
  208. case errorCaught(Error)
  209. }
  210. struct StateAndReceiveHeadersAction {
  211. /// The next state.
  212. var state: State
  213. /// The action to take.
  214. var action: ReceiveHeadersAction
  215. }
  216. struct StateAndReceiveDataAction {
  217. /// The next state.
  218. var state: State
  219. /// Whether the caller should try reading the next message.
  220. var tryReading: Bool
  221. }
  222. }
  223. // MARK: Receive Headers
  224. // This is the only state in which we can receive headers.
  225. extension HTTP2ToRawGRPCStateMachine.RequestIdleResponseIdleState {
  226. func receive(
  227. headers: HPACKHeaders,
  228. eventLoop: EventLoop,
  229. errorDelegate: ServerErrorDelegate?,
  230. remoteAddress: SocketAddress?,
  231. logger: Logger,
  232. allocator: ByteBufferAllocator,
  233. responseWriter: GRPCServerResponseWriter
  234. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  235. // Extract and validate the content type. If it's nil we need to close.
  236. guard let contentType = self.extractContentType(from: headers) else {
  237. return self.unsupportedContentType()
  238. }
  239. // Now extract the request message encoding and setup an appropriate message reader.
  240. // We may send back a list of acceptable request message encodings as well.
  241. let reader: LengthPrefixedMessageReader
  242. let acceptableRequestEncoding: String?
  243. switch self.extractRequestEncoding(from: headers) {
  244. case let .valid(messageReader, acceptEncodingHeader):
  245. reader = messageReader
  246. acceptableRequestEncoding = acceptEncodingHeader
  247. case let .invalid(status, acceptableRequestEncoding):
  248. return self.invalidRequestEncoding(
  249. status: status,
  250. acceptableRequestEncoding: acceptableRequestEncoding,
  251. contentType: contentType
  252. )
  253. }
  254. // Figure out which encoding we should use for responses.
  255. let (writer, responseEncoding) = self.extractResponseEncoding(from: headers)
  256. // Parse the path, and create a call handler.
  257. guard let path = headers.first(name: ":path") else {
  258. return self.methodNotImplemented("", contentType: contentType)
  259. }
  260. guard let callPath = CallPath(requestURI: path),
  261. let service = self.services[Substring(callPath.service)] else {
  262. return self.methodNotImplemented(path, contentType: contentType)
  263. }
  264. // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with,
  265. // some of which is exposed to service providers.
  266. let context = CallHandlerContext(
  267. errorDelegate: errorDelegate,
  268. logger: logger,
  269. encoding: self.encoding,
  270. eventLoop: eventLoop,
  271. path: path,
  272. remoteAddress: remoteAddress,
  273. responseWriter: responseWriter,
  274. allocator: allocator
  275. )
  276. // We have a matching service, hopefully we have a provider for the method too.
  277. let method = Substring(callPath.method)
  278. func nextState() -> HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  279. return HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState(
  280. reader: reader,
  281. writer: writer,
  282. contentType: contentType,
  283. acceptEncoding: acceptableRequestEncoding,
  284. responseEncoding: responseEncoding,
  285. normalizeHeaders: self.normalizeHeaders,
  286. configurationState: .configuring(headers)
  287. )
  288. }
  289. if let handler = service.handle(method: method, context: context) {
  290. return .init(
  291. state: .requestOpenResponseIdle(nextState()),
  292. action: .configure(handler)
  293. )
  294. } else if let handler = service.handleMethod(method, callHandlerContext: context) {
  295. return .init(
  296. state: .requestOpenResponseIdle(nextState()),
  297. action: .configureLegacy(handler)
  298. )
  299. } else {
  300. return self.methodNotImplemented(path, contentType: contentType)
  301. }
  302. }
  303. /// The 'content-type' is not supported; close with status code 415.
  304. private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  305. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  306. //
  307. // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
  308. // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
  309. // clients from interpreting a gRPC error response, which uses status 200 (OK), as
  310. // successful.
  311. let trailers = HPACKHeaders([(":status", "415")])
  312. return .init(
  313. state: .requestClosedResponseClosed,
  314. action: .rejectRPC(trailers)
  315. )
  316. }
  317. /// The RPC method is not implemented. Close with an appropriate status.
  318. private func methodNotImplemented(
  319. _ path: String,
  320. contentType: ContentType
  321. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  322. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  323. for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"),
  324. contentType: contentType,
  325. acceptableRequestEncoding: nil,
  326. userProvidedHeaders: nil,
  327. normalizeUserProvidedHeaders: self.normalizeHeaders
  328. )
  329. return .init(
  330. state: .requestClosedResponseClosed,
  331. action: .rejectRPC(trailers)
  332. )
  333. }
  334. /// The request encoding specified by the client is not supported. Close with an appropriate
  335. /// status.
  336. private func invalidRequestEncoding(
  337. status: GRPCStatus,
  338. acceptableRequestEncoding: String?,
  339. contentType: ContentType
  340. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
  341. let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  342. for: status,
  343. contentType: contentType,
  344. acceptableRequestEncoding: acceptableRequestEncoding,
  345. userProvidedHeaders: nil,
  346. normalizeUserProvidedHeaders: self.normalizeHeaders
  347. )
  348. return .init(
  349. state: .requestClosedResponseClosed,
  350. action: .rejectRPC(trailers)
  351. )
  352. }
  353. /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the
  354. /// request message encoding is not supported.
  355. ///
  356. /// - Parameters:
  357. /// - encoding: The unsupported request message encoding sent by the client.
  358. /// - acceptable: The list if acceptable request message encoding the client may use.
  359. /// - Returns: The status and trailers to return to the client.
  360. private func makeStatusAndTrailersForUnsupportedEncoding(
  361. _ encoding: String,
  362. advertisedEncoding: [String]
  363. ) -> (GRPCStatus, acceptEncoding: String?) {
  364. let status: GRPCStatus
  365. let acceptEncoding: String?
  366. if advertisedEncoding.isEmpty {
  367. // No compression is supported; there's nothing to tell the client about.
  368. status = GRPCStatus(code: .unimplemented, message: "compression is not supported")
  369. acceptEncoding = nil
  370. } else {
  371. // Return a list of supported encodings which we advertise. (The list we advertise may be a
  372. // subset of the encodings we support.)
  373. acceptEncoding = advertisedEncoding.joined(separator: ",")
  374. status = GRPCStatus(
  375. code: .unimplemented,
  376. message: "\(encoding) compression is not supported, supported algorithms are " +
  377. "listed in '\(GRPCHeaderName.acceptEncoding)'"
  378. )
  379. }
  380. return (status, acceptEncoding)
  381. }
  382. /// Extract and validate the 'content-type' sent by the client.
  383. /// - Parameter headers: The headers to extract the 'content-type' from
  384. private func extractContentType(from headers: HPACKHeaders) -> ContentType? {
  385. return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  386. }
  387. /// The result of validating the request encoding header.
  388. private enum RequestEncodingValidation {
  389. /// The encoding was valid.
  390. case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?)
  391. /// The encoding was invalid, the RPC should be terminated with this status.
  392. case invalid(status: GRPCStatus, acceptEncoding: String?)
  393. }
  394. /// Extract and validate the request message encoding header.
  395. /// - Parameters:
  396. /// - headers: The headers to extract the message encoding header from.
  397. /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests
  398. /// and an accept encoding response header if the request encoding was valid, or a pair of
  399. /// `GRPCStatus` and trailers to close the RPC with.
  400. private func extractRequestEncoding(from headers: HPACKHeaders) -> RequestEncodingValidation {
  401. let encodings = headers[canonicalForm: GRPCHeaderName.encoding]
  402. // Fail if there's more than one encoding header.
  403. if encodings.count > 1 {
  404. let status = GRPCStatus(
  405. code: .invalidArgument,
  406. message: "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'"
  407. )
  408. return .invalid(status: status, acceptEncoding: nil)
  409. }
  410. let encodingHeader = encodings.first
  411. let result: RequestEncodingValidation
  412. let validator = MessageEncodingHeaderValidator(encoding: self.encoding)
  413. switch validator.validate(requestEncoding: encodingHeader) {
  414. case let .supported(algorithm, decompressionLimit, acceptEncoding):
  415. // Request message encoding is valid and supported.
  416. result = .valid(
  417. messageReader: LengthPrefixedMessageReader(
  418. compression: algorithm,
  419. decompressionLimit: decompressionLimit
  420. ),
  421. acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",")
  422. )
  423. case .noCompression:
  424. // No message encoding header was present. This means no compression is being used.
  425. result = .valid(
  426. messageReader: LengthPrefixedMessageReader(),
  427. acceptEncoding: nil
  428. )
  429. case let .unsupported(encoding, acceptable):
  430. // Request encoding is not supported.
  431. let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding(
  432. encoding,
  433. advertisedEncoding: acceptable
  434. )
  435. result = .invalid(status: status, acceptEncoding: acceptEncoding)
  436. }
  437. return result
  438. }
  439. /// Extract a suitable message encoding for responses.
  440. /// - Parameters:
  441. /// - headers: The headers to extract the acceptable response message encoding from.
  442. /// - configuration: The encoding configuration for the server.
  443. /// - Returns: A message writer and the response encoding header to send back to the client.
  444. private func extractResponseEncoding(
  445. from headers: HPACKHeaders
  446. ) -> (LengthPrefixedMessageWriter, String?) {
  447. let writer: LengthPrefixedMessageWriter
  448. let responseEncoding: String?
  449. switch self.encoding {
  450. case let .enabled(configuration):
  451. // Extract the encodings acceptable to the client for response messages.
  452. let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding]
  453. // Select the first algorithm that we support and have enabled. If we don't find one then we
  454. // won't compress response messages.
  455. let algorithm = acceptableResponseEncoding.lazy.compactMap { value in
  456. CompressionAlgorithm(rawValue: value)
  457. }.first {
  458. configuration.enabledAlgorithms.contains($0)
  459. }
  460. writer = LengthPrefixedMessageWriter(compression: algorithm)
  461. responseEncoding = algorithm?.name
  462. case .disabled:
  463. // The server doesn't have compression enabled.
  464. writer = LengthPrefixedMessageWriter(compression: .none)
  465. responseEncoding = nil
  466. }
  467. return (writer, responseEncoding)
  468. }
  469. }
  470. // MARK: - Receive Data
  471. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  472. mutating func receive(
  473. buffer: inout ByteBuffer,
  474. endStream: Bool
  475. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
  476. // Append the bytes to the reader.
  477. self.reader.append(buffer: &buffer)
  478. let state: HTTP2ToRawGRPCStateMachine.State
  479. let tryReading: Bool
  480. switch (self.configurationState.isConfigured, endStream) {
  481. case (true, true):
  482. /// Configured and end stream: read from the buffer, end will be sent as a result of draining
  483. /// the reader in the next state.
  484. state = .requestClosedResponseIdle(.init(from: self))
  485. tryReading = true
  486. case (true, false):
  487. /// Configured but not end stream, just read from the buffer.
  488. state = .requestOpenResponseIdle(self)
  489. tryReading = true
  490. case (false, true):
  491. // Not configured yet, but end of stream. Request stream is now closed but there's no point
  492. // reading yet.
  493. state = .requestClosedResponseIdle(.init(from: self))
  494. tryReading = false
  495. case (false, false):
  496. // Not configured yet, not end stream. No point reading a message yet since we don't have
  497. // anywhere to deliver it.
  498. state = .requestOpenResponseIdle(self)
  499. tryReading = false
  500. }
  501. return .init(state: state, tryReading: tryReading)
  502. }
  503. }
  504. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  505. mutating func receive(
  506. buffer: inout ByteBuffer,
  507. endStream: Bool
  508. ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
  509. self.reader.append(buffer: &buffer)
  510. let state: HTTP2ToRawGRPCStateMachine.State
  511. if endStream {
  512. // End stream, so move to the closed state. Any end of request stream events events will
  513. // happen as a result of reading from the closed state.
  514. state = .requestClosedResponseOpen(.init(from: self))
  515. } else {
  516. state = .requestOpenResponseOpen(self)
  517. }
  518. return .init(state: state, tryReading: true)
  519. }
  520. }
  521. // MARK: - Send Headers
  522. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  523. func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
  524. return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  525. contentType: self.contentType,
  526. responseEncoding: self.responseEncoding,
  527. acceptableRequestEncoding: self.acceptEncoding,
  528. userProvidedHeaders: userProvidedHeaders,
  529. normalizeUserProvidedHeaders: self.normalizeHeaders
  530. )
  531. }
  532. }
  533. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  534. func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
  535. return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
  536. contentType: self.contentType,
  537. responseEncoding: self.responseEncoding,
  538. acceptableRequestEncoding: self.acceptEncoding,
  539. userProvidedHeaders: userProvidedHeaders,
  540. normalizeUserProvidedHeaders: self.normalizeHeaders
  541. )
  542. }
  543. }
  544. // MARK: - Send Data
  545. extension HTTP2ToRawGRPCStateMachine {
  546. static func writeGRPCFramedMessage(
  547. _ buffer: ByteBuffer,
  548. compress: Bool,
  549. allocator: ByteBufferAllocator,
  550. writer: LengthPrefixedMessageWriter
  551. ) -> Result<ByteBuffer, Error> {
  552. do {
  553. let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
  554. return .success(prefixed)
  555. } catch {
  556. return .failure(error)
  557. }
  558. }
  559. }
  560. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  561. func send(
  562. buffer: ByteBuffer,
  563. allocator: ByteBufferAllocator,
  564. compress: Bool
  565. ) -> Result<ByteBuffer, Error> {
  566. return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  567. buffer,
  568. compress: compress,
  569. allocator: allocator,
  570. writer: self.writer
  571. )
  572. }
  573. }
  574. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  575. func send(
  576. buffer: ByteBuffer,
  577. allocator: ByteBufferAllocator,
  578. compress: Bool
  579. ) -> Result<ByteBuffer, Error> {
  580. return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
  581. buffer,
  582. compress: compress,
  583. allocator: allocator,
  584. writer: self.writer
  585. )
  586. }
  587. }
  588. // MARK: - Send End
  589. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  590. func send(
  591. status: GRPCStatus,
  592. trailers userProvidedTrailers: HPACKHeaders
  593. ) -> HPACKHeaders {
  594. return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  595. for: status,
  596. contentType: self.contentType,
  597. acceptableRequestEncoding: self.acceptEncoding,
  598. userProvidedHeaders: userProvidedTrailers,
  599. normalizeUserProvidedHeaders: self.normalizeHeaders
  600. )
  601. }
  602. }
  603. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  604. func send(
  605. status: GRPCStatus,
  606. trailers userProvidedTrailers: HPACKHeaders
  607. ) -> HPACKHeaders {
  608. return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
  609. for: status,
  610. contentType: self.contentType,
  611. acceptableRequestEncoding: self.acceptEncoding,
  612. userProvidedHeaders: userProvidedTrailers,
  613. normalizeUserProvidedHeaders: self.normalizeHeaders
  614. )
  615. }
  616. }
  617. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  618. func send(
  619. status: GRPCStatus,
  620. trailers userProvidedTrailers: HPACKHeaders
  621. ) -> HPACKHeaders {
  622. return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  623. for: status,
  624. userProvidedHeaders: userProvidedTrailers,
  625. normalizeUserProvidedHeaders: true
  626. )
  627. }
  628. }
  629. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  630. func send(
  631. status: GRPCStatus,
  632. trailers userProvidedTrailers: HPACKHeaders
  633. ) -> HPACKHeaders {
  634. return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
  635. for: status,
  636. userProvidedHeaders: userProvidedTrailers,
  637. normalizeUserProvidedHeaders: true
  638. )
  639. }
  640. }
  641. // MARK: - Pipeline Configured
  642. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  643. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  644. let headers = self.configurationState.configured()
  645. let action: HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction
  646. // If there are unprocessed bytes then we need to read messages as well.
  647. let hasUnprocessedBytes = self.reader.unprocessedBytes != 0
  648. if hasUnprocessedBytes {
  649. // If there are unprocessed bytes, we need to try to read after sending the metadata.
  650. action = .forwardHeadersAndRead(headers)
  651. } else {
  652. // No unprocessed bytes; the reader is empty. Just send the metadata.
  653. action = .forwardHeaders(headers)
  654. }
  655. return action
  656. }
  657. }
  658. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  659. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  660. let headers = self.configurationState.configured()
  661. // Since we're already closed, we need to forward the headers and start reading.
  662. return .forwardHeadersAndRead(headers)
  663. }
  664. }
  665. // MARK: - Read Next Request
  666. extension HTTP2ToRawGRPCStateMachine {
  667. static func read(
  668. from reader: inout LengthPrefixedMessageReader,
  669. requestStreamClosed: Bool
  670. ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  671. do {
  672. // Try to read a message.
  673. guard let buffer = try reader.nextMessage() else {
  674. // We didn't read a message: if we're closed then there's no chance of receiving more bytes,
  675. // just forward the end of stream. If we're not closed then we could receive more bytes so
  676. // there's no need to take any action at this point.
  677. return requestStreamClosed ? .forwardEnd : .none
  678. }
  679. guard reader.unprocessedBytes == 0 else {
  680. // There are still unprocessed bytes, continue reading.
  681. return .forwardMessageThenReadNextMessage(buffer)
  682. }
  683. // If we're closed and there's nothing left to read, then we're done, forward the message and
  684. // end of stream. If we're closed we could still receive more bytes (or end stream) so just
  685. // forward the message.
  686. return requestStreamClosed ? .forwardMessageAndEnd(buffer) : .forwardMessage(buffer)
  687. } catch {
  688. return .errorCaught(error)
  689. }
  690. }
  691. }
  692. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
  693. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  694. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
  695. }
  696. }
  697. extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
  698. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  699. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: false)
  700. }
  701. }
  702. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
  703. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  704. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
  705. }
  706. }
  707. extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
  708. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  709. return HTTP2ToRawGRPCStateMachine.read(from: &self.reader, requestStreamClosed: true)
  710. }
  711. }
  712. // MARK: - Top Level State Changes
  713. extension HTTP2ToRawGRPCStateMachine {
  714. /// Receive request headers.
  715. mutating func receive(
  716. headers: HPACKHeaders,
  717. eventLoop: EventLoop,
  718. errorDelegate: ServerErrorDelegate?,
  719. remoteAddress: SocketAddress?,
  720. logger: Logger,
  721. allocator: ByteBufferAllocator,
  722. responseWriter: GRPCServerResponseWriter
  723. ) -> ReceiveHeadersAction {
  724. return self.withStateAvoidingCoWs { state in
  725. state.receive(
  726. headers: headers,
  727. eventLoop: eventLoop,
  728. errorDelegate: errorDelegate,
  729. remoteAddress: remoteAddress,
  730. logger: logger,
  731. allocator: allocator,
  732. responseWriter: responseWriter
  733. )
  734. }
  735. }
  736. /// Receive request buffer.
  737. /// - Parameters:
  738. /// - buffer: The received buffer.
  739. /// - endStream: Whether end stream was set.
  740. /// - Returns: Returns whether the caller should try to read a message from the buffer.
  741. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> Bool {
  742. return self.withStateAvoidingCoWs { state in
  743. state.receive(buffer: &buffer, endStream: endStream)
  744. }
  745. }
  746. /// Send response headers.
  747. mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
  748. return self.withStateAvoidingCoWs { state in
  749. state.send(headers: headers)
  750. }
  751. }
  752. /// Send a response buffer.
  753. func send(
  754. buffer: ByteBuffer,
  755. allocator: ByteBufferAllocator,
  756. compress: Bool
  757. ) -> Result<ByteBuffer, Error> {
  758. return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
  759. }
  760. /// Send status and trailers.
  761. mutating func send(
  762. status: GRPCStatus,
  763. trailers: HPACKHeaders
  764. ) -> Result<HPACKHeaders, Error> {
  765. return self.withStateAvoidingCoWs { state in
  766. state.send(status: status, trailers: trailers)
  767. }
  768. }
  769. /// The pipeline has been configured with a service provider.
  770. mutating func pipelineConfigured() -> PipelineConfiguredAction {
  771. return self.withStateAvoidingCoWs { state in
  772. state.pipelineConfigured()
  773. }
  774. }
  775. /// Try to read a request message.
  776. mutating func readNextRequest() -> ReadNextMessageAction {
  777. return self.withStateAvoidingCoWs { state in
  778. state.readNextRequest()
  779. }
  780. }
  781. }
  782. extension HTTP2ToRawGRPCStateMachine.State {
  783. mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
  784. switch self {
  785. case .requestIdleResponseIdle:
  786. preconditionFailure("Invalid state: pipeline configured before receiving request headers")
  787. case var .requestOpenResponseIdle(state):
  788. let action = state.pipelineConfigured()
  789. self = .requestOpenResponseIdle(state)
  790. return action
  791. case var .requestClosedResponseIdle(state):
  792. let action = state.pipelineConfigured()
  793. self = .requestClosedResponseIdle(state)
  794. return action
  795. case .requestOpenResponseOpen,
  796. .requestClosedResponseOpen,
  797. .requestClosedResponseClosed:
  798. preconditionFailure("Invalid state: response stream opened before pipeline was configured")
  799. case ._modifying:
  800. preconditionFailure("Left in modifying state")
  801. }
  802. }
  803. mutating func receive(
  804. headers: HPACKHeaders,
  805. eventLoop: EventLoop,
  806. errorDelegate: ServerErrorDelegate?,
  807. remoteAddress: SocketAddress?,
  808. logger: Logger,
  809. allocator: ByteBufferAllocator,
  810. responseWriter: GRPCServerResponseWriter
  811. ) -> HTTP2ToRawGRPCStateMachine.ReceiveHeadersAction {
  812. switch self {
  813. // This is the only state in which we can receive headers. Everything else is invalid.
  814. case let .requestIdleResponseIdle(state):
  815. let stateAndAction = state.receive(
  816. headers: headers,
  817. eventLoop: eventLoop,
  818. errorDelegate: errorDelegate,
  819. remoteAddress: remoteAddress,
  820. logger: logger,
  821. allocator: allocator,
  822. responseWriter: responseWriter
  823. )
  824. self = stateAndAction.state
  825. return stateAndAction.action
  826. // We can't receive headers in any of these states.
  827. case .requestOpenResponseIdle,
  828. .requestOpenResponseOpen,
  829. .requestClosedResponseIdle,
  830. .requestClosedResponseOpen,
  831. .requestClosedResponseClosed:
  832. preconditionFailure("Invalid state")
  833. case ._modifying:
  834. preconditionFailure("Left in modifying state")
  835. }
  836. }
  837. /// Receive a buffer from the client.
  838. mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> Bool {
  839. switch self {
  840. case .requestIdleResponseIdle:
  841. /// This isn't allowed: we must receive the request headers first.
  842. preconditionFailure("Invalid state")
  843. case var .requestOpenResponseIdle(state):
  844. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  845. self = stateAndAction.state
  846. return stateAndAction.tryReading
  847. case var .requestOpenResponseOpen(state):
  848. let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
  849. self = stateAndAction.state
  850. return stateAndAction.tryReading
  851. case .requestClosedResponseIdle,
  852. .requestClosedResponseOpen:
  853. preconditionFailure("Invalid state: the request stream is already closed")
  854. case .requestClosedResponseClosed:
  855. // This is okay: we could have closed before receiving end.
  856. return false
  857. case ._modifying:
  858. preconditionFailure("Left in modifying state")
  859. }
  860. }
  861. mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  862. switch self {
  863. case .requestIdleResponseIdle:
  864. preconditionFailure("Invalid state")
  865. case var .requestOpenResponseIdle(state):
  866. let action = state.readNextRequest()
  867. self = .requestOpenResponseIdle(state)
  868. return action
  869. case var .requestOpenResponseOpen(state):
  870. let action = state.readNextRequest()
  871. self = .requestOpenResponseOpen(state)
  872. return action
  873. case var .requestClosedResponseIdle(state):
  874. let action = state.readNextRequest()
  875. self = .requestClosedResponseIdle(state)
  876. return action
  877. case var .requestClosedResponseOpen(state):
  878. let action = state.readNextRequest()
  879. self = .requestClosedResponseOpen(state)
  880. return action
  881. case .requestClosedResponseClosed:
  882. return .none
  883. case ._modifying:
  884. preconditionFailure("Left in modifying state")
  885. }
  886. }
  887. mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
  888. switch self {
  889. case .requestIdleResponseIdle:
  890. preconditionFailure("Invalid state: the request stream isn't open")
  891. case let .requestOpenResponseIdle(state):
  892. let headers = state.send(headers: headers)
  893. self = .requestOpenResponseOpen(.init(from: state))
  894. return .success(headers)
  895. case let .requestClosedResponseIdle(state):
  896. let headers = state.send(headers: headers)
  897. self = .requestClosedResponseOpen(.init(from: state))
  898. return .success(headers)
  899. case .requestOpenResponseOpen,
  900. .requestClosedResponseOpen,
  901. .requestClosedResponseClosed:
  902. return .failure(GRPCError.AlreadyComplete())
  903. case ._modifying:
  904. preconditionFailure("Left in modifying state")
  905. }
  906. }
  907. func send(
  908. buffer: ByteBuffer,
  909. allocator: ByteBufferAllocator,
  910. compress: Bool
  911. ) -> Result<ByteBuffer, Error> {
  912. switch self {
  913. case .requestIdleResponseIdle:
  914. preconditionFailure("Invalid state: the request stream is still closed")
  915. case .requestOpenResponseIdle,
  916. .requestClosedResponseIdle:
  917. let error = GRPCError.InvalidState("Response headers must be sent before response message")
  918. return .failure(error)
  919. case let .requestOpenResponseOpen(state):
  920. return state.send(
  921. buffer: buffer,
  922. allocator: allocator,
  923. compress: compress
  924. )
  925. case let .requestClosedResponseOpen(state):
  926. return state.send(
  927. buffer: buffer,
  928. allocator: allocator,
  929. compress: compress
  930. )
  931. case .requestClosedResponseClosed:
  932. return .failure(GRPCError.AlreadyComplete())
  933. case ._modifying:
  934. preconditionFailure("Left in modifying state")
  935. }
  936. }
  937. mutating func send(
  938. status: GRPCStatus,
  939. trailers: HPACKHeaders
  940. ) -> Result<HPACKHeaders, Error> {
  941. switch self {
  942. case .requestIdleResponseIdle:
  943. preconditionFailure("Invalid state: the request stream is still closed")
  944. case let .requestOpenResponseIdle(state):
  945. self = .requestClosedResponseClosed
  946. return .success(state.send(status: status, trailers: trailers))
  947. case let .requestClosedResponseIdle(state):
  948. self = .requestClosedResponseClosed
  949. return .success(state.send(status: status, trailers: trailers))
  950. case let .requestOpenResponseOpen(state):
  951. self = .requestClosedResponseClosed
  952. return .success(state.send(status: status, trailers: trailers))
  953. case let .requestClosedResponseOpen(state):
  954. self = .requestClosedResponseClosed
  955. return .success(state.send(status: status, trailers: trailers))
  956. case .requestClosedResponseClosed:
  957. return .failure(GRPCError.AlreadyComplete())
  958. case ._modifying:
  959. preconditionFailure("Left in modifying state")
  960. }
  961. }
  962. }
  963. // MARK: - Helpers
  964. extension HTTP2ToRawGRPCStateMachine {
  965. static func makeResponseHeaders(
  966. contentType: ContentType,
  967. responseEncoding: String?,
  968. acceptableRequestEncoding: String?,
  969. userProvidedHeaders: HPACKHeaders,
  970. normalizeUserProvidedHeaders: Bool
  971. ) -> HPACKHeaders {
  972. // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and
  973. // 'grpc-accept-encoding' as well.
  974. let capacity = 4 + userProvidedHeaders.count
  975. var headers = HPACKHeaders()
  976. headers.reserveCapacity(capacity)
  977. headers.add(name: ":status", value: "200")
  978. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  979. if let responseEncoding = responseEncoding {
  980. headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
  981. }
  982. if let acceptEncoding = acceptableRequestEncoding {
  983. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  984. }
  985. // Add user provided headers, normalizing if required.
  986. headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  987. return headers
  988. }
  989. static func makeResponseTrailersOnly(
  990. for status: GRPCStatus,
  991. contentType: ContentType,
  992. acceptableRequestEncoding: String?,
  993. userProvidedHeaders: HPACKHeaders?,
  994. normalizeUserProvidedHeaders: Bool
  995. ) -> HPACKHeaders {
  996. // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back
  997. // 'grpc-message' and 'grpc-accept-encoding'.
  998. let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0)
  999. var headers = HPACKHeaders()
  1000. headers.reserveCapacity(capacity)
  1001. // Add the required trailers.
  1002. headers.add(name: ":status", value: "200")
  1003. headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
  1004. headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1005. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1006. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  1007. }
  1008. // We may include this if the requested encoding was not valid.
  1009. if let acceptEncoding = acceptableRequestEncoding {
  1010. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
  1011. }
  1012. if let userProvided = userProvidedHeaders {
  1013. headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders)
  1014. }
  1015. return headers
  1016. }
  1017. static func makeResponseTrailers(
  1018. for status: GRPCStatus,
  1019. userProvidedHeaders: HPACKHeaders,
  1020. normalizeUserProvidedHeaders: Bool
  1021. ) -> HPACKHeaders {
  1022. // 2 because 'grpc-status' is required, we may also send back 'grpc-message'.
  1023. let capacity = 2 + userProvidedHeaders.count
  1024. var trailers = HPACKHeaders()
  1025. trailers.reserveCapacity(capacity)
  1026. // status code.
  1027. trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
  1028. // status message, if present.
  1029. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  1030. trailers.add(name: GRPCHeaderName.statusMessage, value: message)
  1031. }
  1032. // user provided trailers.
  1033. trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
  1034. return trailers
  1035. }
  1036. }
  1037. private extension HPACKHeaders {
  1038. mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) {
  1039. if normalize {
  1040. self.add(contentsOf: other.lazy.map { name, value, indexable in
  1041. (name: name.lowercased(), value: value, indexable: indexable)
  1042. })
  1043. } else {
  1044. self.add(contentsOf: other)
  1045. }
  1046. }
  1047. }