GRPCStreamStateMachine.swift 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. enum Scheme: String {
  21. case http
  22. case https
  23. }
  24. enum GRPCStreamStateMachineConfiguration {
  25. case client(ClientConfiguration)
  26. case server(ServerConfiguration)
  27. struct ClientConfiguration {
  28. var methodDescriptor: MethodDescriptor
  29. var scheme: Scheme
  30. var outboundEncoding: CompressionAlgorithm
  31. var acceptedEncodings: [CompressionAlgorithm]
  32. }
  33. struct ServerConfiguration {
  34. var scheme: Scheme
  35. var acceptedEncodings: [CompressionAlgorithm]
  36. }
  37. }
  38. private enum GRPCStreamStateMachineState {
  39. case clientIdleServerIdle(ClientIdleServerIdleState)
  40. case clientOpenServerIdle(ClientOpenServerIdleState)
  41. case clientOpenServerOpen(ClientOpenServerOpenState)
  42. case clientOpenServerClosed(ClientOpenServerClosedState)
  43. case clientClosedServerIdle(ClientClosedServerIdleState)
  44. case clientClosedServerOpen(ClientClosedServerOpenState)
  45. case clientClosedServerClosed(ClientClosedServerClosedState)
  46. struct ClientIdleServerIdleState {
  47. let maximumPayloadSize: Int
  48. }
  49. struct ClientOpenServerIdleState {
  50. let maximumPayloadSize: Int
  51. var framer: GRPCMessageFramer
  52. var compressor: Zlib.Compressor?
  53. var outboundCompression: CompressionAlgorithm?
  54. // The deframer must be optional because the client will not have one configured
  55. // until the server opens and sends a grpc-encoding header.
  56. // It will be present for the server though, because even though it's idle,
  57. // it can still receive compressed messages from the client.
  58. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  59. var decompressor: Zlib.Decompressor?
  60. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  61. init(
  62. previousState: ClientIdleServerIdleState,
  63. compressor: Zlib.Compressor?,
  64. framer: GRPCMessageFramer,
  65. decompressor: Zlib.Decompressor?,
  66. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  67. ) {
  68. self.maximumPayloadSize = previousState.maximumPayloadSize
  69. self.compressor = compressor
  70. self.framer = framer
  71. self.decompressor = decompressor
  72. self.deframer = deframer
  73. self.inboundMessageBuffer = .init()
  74. }
  75. }
  76. struct ClientOpenServerOpenState {
  77. var framer: GRPCMessageFramer
  78. var compressor: Zlib.Compressor?
  79. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>
  80. var decompressor: Zlib.Decompressor?
  81. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  82. init(
  83. previousState: ClientOpenServerIdleState,
  84. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
  85. decompressor: Zlib.Decompressor?
  86. ) {
  87. self.framer = previousState.framer
  88. self.compressor = previousState.compressor
  89. self.deframer = deframer
  90. self.decompressor = decompressor
  91. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  92. }
  93. }
  94. struct ClientOpenServerClosedState {
  95. var framer: GRPCMessageFramer?
  96. var compressor: Zlib.Compressor?
  97. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  98. var decompressor: Zlib.Decompressor?
  99. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  100. // This transition should only happen on the server-side when, upon receiving
  101. // initial client metadata, some of the headers are invalid and we must reject
  102. // the RPC.
  103. // We will mark the client as open (because it sent initial metadata albeit
  104. // invalid) but we'll close the server, meaning all future messages sent from
  105. // the client will be ignored. Because of this, we won't need to frame or
  106. // deframe any messages, as we won't be reading or writing any messages.
  107. init(previousState: ClientIdleServerIdleState) {
  108. self.framer = nil
  109. self.compressor = nil
  110. self.deframer = nil
  111. self.decompressor = nil
  112. self.inboundMessageBuffer = .init()
  113. }
  114. init(previousState: ClientOpenServerOpenState) {
  115. self.framer = previousState.framer
  116. self.compressor = previousState.compressor
  117. self.deframer = previousState.deframer
  118. self.decompressor = previousState.decompressor
  119. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  120. }
  121. init(previousState: ClientOpenServerIdleState) {
  122. self.framer = previousState.framer
  123. self.compressor = previousState.compressor
  124. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  125. // The server went directly from idle to closed - this means it sent a
  126. // trailers-only response:
  127. // - if we're the client, the previous state was a nil deframer, but that
  128. // is okay because we don't need a deframer as the server won't be sending
  129. // any messages;
  130. // - if we're the server, we'll keep whatever deframer we had.
  131. self.deframer = previousState.deframer
  132. self.decompressor = previousState.decompressor
  133. }
  134. }
  135. struct ClientClosedServerIdleState {
  136. let maximumPayloadSize: Int
  137. var framer: GRPCMessageFramer
  138. var compressor: Zlib.Compressor?
  139. var outboundCompression: CompressionAlgorithm?
  140. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  141. var decompressor: Zlib.Decompressor?
  142. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  143. /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
  144. /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
  145. /// any more messages from the client anyways, as it's closed.
  146. init(
  147. previousState: ClientIdleServerIdleState,
  148. compressionAlgorithm: CompressionAlgorithm
  149. ) {
  150. self.maximumPayloadSize = previousState.maximumPayloadSize
  151. if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) {
  152. self.compressor = Zlib.Compressor(method: zlibMethod)
  153. }
  154. self.framer = GRPCMessageFramer()
  155. self.outboundCompression = compressionAlgorithm
  156. // We don't need a deframer since we won't receive any messages from the
  157. // client: it's closed.
  158. self.deframer = nil
  159. self.inboundMessageBuffer = .init()
  160. }
  161. init(previousState: ClientOpenServerIdleState) {
  162. self.maximumPayloadSize = previousState.maximumPayloadSize
  163. self.framer = previousState.framer
  164. self.compressor = previousState.compressor
  165. self.outboundCompression = previousState.outboundCompression
  166. self.deframer = previousState.deframer
  167. self.decompressor = previousState.decompressor
  168. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  169. }
  170. }
  171. struct ClientClosedServerOpenState {
  172. var framer: GRPCMessageFramer
  173. var compressor: Zlib.Compressor?
  174. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  175. var decompressor: Zlib.Decompressor?
  176. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  177. init(previousState: ClientOpenServerOpenState) {
  178. self.framer = previousState.framer
  179. self.compressor = previousState.compressor
  180. self.deframer = previousState.deframer
  181. self.decompressor = previousState.decompressor
  182. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  183. }
  184. /// This should be called from the server path, as the deframer will already be configured in this scenario.
  185. init(previousState: ClientClosedServerIdleState) {
  186. self.framer = previousState.framer
  187. self.compressor = previousState.compressor
  188. // In the case of the server, we don't need to deframe/decompress any more
  189. // messages, since the client's closed.
  190. self.deframer = nil
  191. self.decompressor = nil
  192. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  193. }
  194. /// This should only be called from the client path, as the deframer has not yet been set up.
  195. init(
  196. previousState: ClientClosedServerIdleState,
  197. decompressionAlgorithm: CompressionAlgorithm
  198. ) {
  199. self.framer = previousState.framer
  200. self.compressor = previousState.compressor
  201. // In the case of the client, it will only be able to set up the deframer
  202. // after it receives the chosen encoding from the server.
  203. if let zlibMethod = Zlib.Method(encoding: decompressionAlgorithm) {
  204. self.decompressor = Zlib.Decompressor(method: zlibMethod)
  205. }
  206. let decoder = GRPCMessageDeframer(
  207. maximumPayloadSize: previousState.maximumPayloadSize,
  208. decompressor: self.decompressor
  209. )
  210. self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
  211. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  212. }
  213. }
  214. struct ClientClosedServerClosedState {
  215. // We still need the framer and compressor in case the server has closed
  216. // but its buffer is not yet empty and still needs to send messages out to
  217. // the client.
  218. var framer: GRPCMessageFramer?
  219. var compressor: Zlib.Compressor?
  220. // These are already deframed, so we don't need the deframer anymore.
  221. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  222. // This transition should only happen on the server-side when, upon receiving
  223. // initial client metadata, some of the headers are invalid and we must reject
  224. // the RPC.
  225. // We will mark the client as closed (because it set the EOS flag, even if
  226. // the initial metadata was invalid) and we'll close the server too.
  227. // Because of this, we won't need to frame any messages, as we
  228. // won't be writing any messages.
  229. init(previousState: ClientIdleServerIdleState) {
  230. self.framer = nil
  231. self.compressor = nil
  232. self.inboundMessageBuffer = .init()
  233. }
  234. init(previousState: ClientClosedServerOpenState) {
  235. self.framer = previousState.framer
  236. self.compressor = previousState.compressor
  237. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  238. }
  239. init(previousState: ClientClosedServerIdleState) {
  240. self.framer = previousState.framer
  241. self.compressor = previousState.compressor
  242. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  243. }
  244. init(previousState: ClientOpenServerIdleState) {
  245. self.framer = previousState.framer
  246. self.compressor = previousState.compressor
  247. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  248. }
  249. init(previousState: ClientOpenServerClosedState) {
  250. self.framer = previousState.framer
  251. self.compressor = previousState.compressor
  252. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  253. }
  254. }
  255. }
  256. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  257. struct GRPCStreamStateMachine {
  258. private var state: GRPCStreamStateMachineState
  259. private var configuration: GRPCStreamStateMachineConfiguration
  260. private var skipAssertions: Bool
  261. init(
  262. configuration: GRPCStreamStateMachineConfiguration,
  263. maximumPayloadSize: Int,
  264. skipAssertions: Bool = false
  265. ) {
  266. self.state = .clientIdleServerIdle(.init(maximumPayloadSize: maximumPayloadSize))
  267. self.configuration = configuration
  268. self.skipAssertions = skipAssertions
  269. }
  270. mutating func send(metadata: Metadata) throws -> HPACKHeaders {
  271. switch self.configuration {
  272. case .client(let clientConfiguration):
  273. return try self.clientSend(metadata: metadata, configuration: clientConfiguration)
  274. case .server(let serverConfiguration):
  275. return try self.serverSend(metadata: metadata, configuration: serverConfiguration)
  276. }
  277. }
  278. mutating func send(message: [UInt8]) throws {
  279. switch self.configuration {
  280. case .client:
  281. try self.clientSend(message: message)
  282. case .server:
  283. try self.serverSend(message: message)
  284. }
  285. }
  286. mutating func closeOutbound() throws {
  287. switch self.configuration {
  288. case .client:
  289. try self.clientCloseOutbound()
  290. case .server:
  291. try self.invalidState("Server cannot call close: it must send status and trailers.")
  292. }
  293. }
  294. mutating func send(
  295. status: Status,
  296. metadata: Metadata
  297. ) throws -> HPACKHeaders {
  298. switch self.configuration {
  299. case .client:
  300. try self.invalidState(
  301. "Client cannot send status and trailer."
  302. )
  303. case .server:
  304. return try self.serverSend(
  305. status: status,
  306. customMetadata: metadata
  307. )
  308. }
  309. }
  310. enum OnMetadataReceived: Equatable {
  311. case receivedMetadata(Metadata)
  312. // Client-specific actions
  313. case receivedStatusAndMetadata(status: Status, metadata: Metadata)
  314. case doNothing
  315. // Server-specific actions
  316. case rejectRPC(trailers: HPACKHeaders)
  317. }
  318. mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
  319. switch self.configuration {
  320. case .client:
  321. return try self.clientReceive(headers: headers, endStream: endStream)
  322. case .server(let serverConfiguration):
  323. return try self.serverReceive(
  324. headers: headers,
  325. endStream: endStream,
  326. configuration: serverConfiguration
  327. )
  328. }
  329. }
  330. mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
  331. switch self.configuration {
  332. case .client:
  333. try self.clientReceive(buffer: buffer, endStream: endStream)
  334. case .server:
  335. try self.serverReceive(buffer: buffer, endStream: endStream)
  336. }
  337. }
  338. /// The result of requesting the next outbound message.
  339. enum OnNextOutboundMessage: Equatable {
  340. /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done
  341. /// writing messages (i.e. we are now closed).
  342. case noMoreMessages
  343. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  344. case awaitMoreMessages
  345. /// A message is ready to be sent.
  346. case sendMessage(ByteBuffer)
  347. }
  348. mutating func nextOutboundMessage() throws -> OnNextOutboundMessage {
  349. switch self.configuration {
  350. case .client:
  351. return try self.clientNextOutboundMessage()
  352. case .server:
  353. return try self.serverNextOutboundMessage()
  354. }
  355. }
  356. /// The result of requesting the next inbound message.
  357. enum OnNextInboundMessage: Equatable {
  358. /// The sender is done writing messages and there are no more messages to be received.
  359. case noMoreMessages
  360. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  361. case awaitMoreMessages
  362. /// A message has been received.
  363. case receiveMessage([UInt8])
  364. }
  365. mutating func nextInboundMessage() -> OnNextInboundMessage {
  366. switch self.configuration {
  367. case .client:
  368. return self.clientNextInboundMessage()
  369. case .server:
  370. return self.serverNextInboundMessage()
  371. }
  372. }
  373. mutating func tearDown() {
  374. switch self.state {
  375. case .clientIdleServerIdle:
  376. ()
  377. case .clientOpenServerIdle(let state):
  378. state.compressor?.end()
  379. state.decompressor?.end()
  380. case .clientOpenServerOpen(let state):
  381. state.compressor?.end()
  382. state.decompressor?.end()
  383. case .clientOpenServerClosed(let state):
  384. state.compressor?.end()
  385. state.decompressor?.end()
  386. case .clientClosedServerIdle(let state):
  387. state.compressor?.end()
  388. state.decompressor?.end()
  389. case .clientClosedServerOpen(let state):
  390. state.compressor?.end()
  391. state.decompressor?.end()
  392. case .clientClosedServerClosed(let state):
  393. state.compressor?.end()
  394. }
  395. }
  396. }
  397. // - MARK: Client
  398. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  399. extension GRPCStreamStateMachine {
  400. private func makeClientHeaders(
  401. methodDescriptor: MethodDescriptor,
  402. scheme: Scheme,
  403. outboundEncoding: CompressionAlgorithm?,
  404. acceptedEncodings: [CompressionAlgorithm],
  405. customMetadata: Metadata
  406. ) -> HPACKHeaders {
  407. var headers = HPACKHeaders()
  408. headers.reserveCapacity(7 + customMetadata.count)
  409. // Add required headers.
  410. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  411. // The order is important here: reserved HTTP2 headers (those starting with `:`)
  412. // must come before all other headers.
  413. headers.add("POST", forKey: .method)
  414. headers.add(scheme.rawValue, forKey: .scheme)
  415. headers.add(methodDescriptor.fullyQualifiedMethod, forKey: .path)
  416. // Add required gRPC headers.
  417. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  418. headers.add("trailers", forKey: .te) // Used to detect incompatible proxies
  419. if let encoding = outboundEncoding, encoding != .identity {
  420. headers.add(encoding.name, forKey: .encoding)
  421. }
  422. for acceptedEncoding in acceptedEncodings {
  423. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  424. }
  425. for metadataPair in customMetadata {
  426. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  427. }
  428. return headers
  429. }
  430. private mutating func clientSend(
  431. metadata: Metadata,
  432. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  433. ) throws -> HPACKHeaders {
  434. // Client sends metadata only when opening the stream.
  435. switch self.state {
  436. case .clientIdleServerIdle(let state):
  437. let outboundEncoding = configuration.outboundEncoding
  438. let compressor = Zlib.Method(encoding: outboundEncoding)
  439. .flatMap { Zlib.Compressor(method: $0) }
  440. self.state = .clientOpenServerIdle(
  441. .init(
  442. previousState: state,
  443. compressor: compressor,
  444. framer: GRPCMessageFramer(),
  445. decompressor: nil,
  446. deframer: nil
  447. )
  448. )
  449. return self.makeClientHeaders(
  450. methodDescriptor: configuration.methodDescriptor,
  451. scheme: configuration.scheme,
  452. outboundEncoding: configuration.outboundEncoding,
  453. acceptedEncodings: configuration.acceptedEncodings,
  454. customMetadata: metadata
  455. )
  456. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  457. try self.invalidState(
  458. "Client is already open: shouldn't be sending metadata."
  459. )
  460. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  461. try self.invalidState(
  462. "Client is closed: can't send metadata."
  463. )
  464. }
  465. }
  466. private mutating func clientSend(message: [UInt8]) throws {
  467. switch self.state {
  468. case .clientIdleServerIdle:
  469. try self.invalidState("Client not yet open.")
  470. case .clientOpenServerIdle(var state):
  471. state.framer.append(message)
  472. self.state = .clientOpenServerIdle(state)
  473. case .clientOpenServerOpen(var state):
  474. state.framer.append(message)
  475. self.state = .clientOpenServerOpen(state)
  476. case .clientOpenServerClosed:
  477. // The server has closed, so it makes no sense to send the rest of the request.
  478. ()
  479. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  480. try self.invalidState(
  481. "Client is closed, cannot send a message."
  482. )
  483. }
  484. }
  485. private mutating func clientCloseOutbound() throws {
  486. switch self.state {
  487. case .clientIdleServerIdle:
  488. try self.invalidState("Client not yet open.")
  489. case .clientOpenServerIdle(let state):
  490. self.state = .clientClosedServerIdle(.init(previousState: state))
  491. case .clientOpenServerOpen(let state):
  492. self.state = .clientClosedServerOpen(.init(previousState: state))
  493. case .clientOpenServerClosed(let state):
  494. self.state = .clientClosedServerClosed(.init(previousState: state))
  495. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  496. try self.invalidState(
  497. "Client is closed, cannot send a message."
  498. )
  499. }
  500. }
  501. /// Returns the client's next request to the server.
  502. /// - Returns: The request to be made to the server.
  503. private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage {
  504. switch self.state {
  505. case .clientIdleServerIdle:
  506. try self.invalidState("Client is not open yet.")
  507. case .clientOpenServerIdle(var state):
  508. let request = try state.framer.next(compressor: state.compressor)
  509. self.state = .clientOpenServerIdle(state)
  510. return request.map { .sendMessage($0) } ?? .awaitMoreMessages
  511. case .clientOpenServerOpen(var state):
  512. let request = try state.framer.next(compressor: state.compressor)
  513. self.state = .clientOpenServerOpen(state)
  514. return request.map { .sendMessage($0) } ?? .awaitMoreMessages
  515. case .clientClosedServerIdle(var state):
  516. let request = try state.framer.next(compressor: state.compressor)
  517. self.state = .clientClosedServerIdle(state)
  518. if let request {
  519. return .sendMessage(request)
  520. } else {
  521. return .noMoreMessages
  522. }
  523. case .clientClosedServerOpen(var state):
  524. let request = try state.framer.next(compressor: state.compressor)
  525. self.state = .clientClosedServerOpen(state)
  526. if let request {
  527. return .sendMessage(request)
  528. } else {
  529. return .noMoreMessages
  530. }
  531. case .clientOpenServerClosed, .clientClosedServerClosed:
  532. // No point in sending any more requests if the server is closed.
  533. return .noMoreMessages
  534. }
  535. }
  536. private enum ServerHeadersValidationResult {
  537. case valid
  538. case invalid(OnMetadataReceived)
  539. }
  540. private mutating func clientValidateHeadersReceivedFromServer(
  541. _ metadata: HPACKHeaders
  542. ) -> ServerHeadersValidationResult {
  543. var httpStatus: String? {
  544. metadata.firstString(forKey: .status)
  545. }
  546. var grpcStatus: Status.Code? {
  547. metadata.firstString(forKey: .grpcStatus)
  548. .flatMap { Int($0) }
  549. .flatMap { Status.Code(rawValue: $0) }
  550. }
  551. guard httpStatus == "200" || grpcStatus != nil else {
  552. let httpStatusCode =
  553. httpStatus
  554. .flatMap { Int($0) }
  555. .map { HTTPResponseStatus(statusCode: $0) }
  556. guard let httpStatusCode else {
  557. return .invalid(
  558. .receivedStatusAndMetadata(
  559. status: .init(code: .unknown, message: "HTTP Status Code is missing."),
  560. metadata: Metadata(headers: metadata)
  561. )
  562. )
  563. }
  564. if (100 ... 199).contains(httpStatusCode.code) {
  565. // For 1xx status codes, the entire header should be skipped and a
  566. // subsequent header should be read.
  567. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  568. return .invalid(.doNothing)
  569. }
  570. // Forward the mapped status code.
  571. return .invalid(
  572. .receivedStatusAndMetadata(
  573. status: .init(
  574. code: Status.Code(httpStatusCode: httpStatusCode),
  575. message: "Unexpected non-200 HTTP Status Code."
  576. ),
  577. metadata: Metadata(headers: metadata)
  578. )
  579. )
  580. }
  581. let contentTypeHeader = metadata.first(name: GRPCHTTP2Keys.contentType.rawValue)
  582. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  583. return .invalid(
  584. .receivedStatusAndMetadata(
  585. status: .init(
  586. code: .internalError,
  587. message: "Missing \(GRPCHTTP2Keys.contentType) header"
  588. ),
  589. metadata: Metadata(headers: metadata)
  590. )
  591. )
  592. }
  593. return .valid
  594. }
  595. private enum ProcessInboundEncodingResult {
  596. case error(OnMetadataReceived)
  597. case success(CompressionAlgorithm)
  598. }
  599. private func processInboundEncoding(_ metadata: HPACKHeaders) -> ProcessInboundEncodingResult {
  600. let inboundEncoding: CompressionAlgorithm
  601. if let serverEncoding = metadata.first(name: GRPCHTTP2Keys.encoding.rawValue) {
  602. guard let parsedEncoding = CompressionAlgorithm(rawValue: serverEncoding) else {
  603. return .error(
  604. .receivedStatusAndMetadata(
  605. status: .init(
  606. code: .internalError,
  607. message:
  608. "The server picked a compression algorithm ('\(serverEncoding)') the client does not know about."
  609. ),
  610. metadata: Metadata(headers: metadata)
  611. )
  612. )
  613. }
  614. inboundEncoding = parsedEncoding
  615. } else {
  616. inboundEncoding = .identity
  617. }
  618. return .success(inboundEncoding)
  619. }
  620. private func validateAndReturnStatusAndMetadata(
  621. _ metadata: HPACKHeaders
  622. ) throws -> OnMetadataReceived {
  623. let rawStatusCode = metadata.firstString(forKey: .grpcStatus)
  624. guard let rawStatusCode,
  625. let intStatusCode = Int(rawStatusCode),
  626. let statusCode = Status.Code(rawValue: intStatusCode)
  627. else {
  628. let message =
  629. "Non-initial metadata must be a trailer containing a valid grpc-status"
  630. + (rawStatusCode.flatMap { "but was \($0)" } ?? "")
  631. throw RPCError(code: .unknown, message: message)
  632. }
  633. let statusMessage =
  634. metadata.firstString(forKey: .grpcStatusMessage)
  635. .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
  636. var convertedMetadata = Metadata(headers: metadata)
  637. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  638. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  639. return .receivedStatusAndMetadata(
  640. status: Status(code: statusCode, message: statusMessage),
  641. metadata: convertedMetadata
  642. )
  643. }
  644. private mutating func clientReceive(
  645. headers: HPACKHeaders,
  646. endStream: Bool
  647. ) throws -> OnMetadataReceived {
  648. switch self.state {
  649. case .clientOpenServerIdle(let state):
  650. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  651. case (.invalid(let action), true):
  652. // The headers are invalid, but the server signalled that it was
  653. // closing the stream, so close both client and server.
  654. self.state = .clientClosedServerClosed(.init(previousState: state))
  655. return action
  656. case (.invalid(let action), false):
  657. self.state = .clientClosedServerIdle(.init(previousState: state))
  658. return action
  659. case (.valid, true):
  660. // This is a trailers-only response: close server.
  661. self.state = .clientOpenServerClosed(.init(previousState: state))
  662. return try self.validateAndReturnStatusAndMetadata(headers)
  663. case (.valid, false):
  664. switch self.processInboundEncoding(headers) {
  665. case .error(let failure):
  666. return failure
  667. case .success(let inboundEncoding):
  668. let decompressor = Zlib.Method(encoding: inboundEncoding)
  669. .flatMap { Zlib.Decompressor(method: $0) }
  670. let deframer = GRPCMessageDeframer(
  671. maximumPayloadSize: state.maximumPayloadSize,
  672. decompressor: decompressor
  673. )
  674. self.state = .clientOpenServerOpen(
  675. .init(
  676. previousState: state,
  677. deframer: NIOSingleStepByteToMessageProcessor(deframer),
  678. decompressor: decompressor
  679. )
  680. )
  681. return .receivedMetadata(Metadata(headers: headers))
  682. }
  683. }
  684. case .clientOpenServerOpen(let state):
  685. // This state is valid even if endStream is not set: server can send
  686. // trailing metadata without END_STREAM set, and follow it with an
  687. // empty message frame where it is set.
  688. // However, we must make sure that grpc-status is set, otherwise this
  689. // is an invalid state.
  690. if endStream {
  691. self.state = .clientOpenServerClosed(.init(previousState: state))
  692. }
  693. return try self.validateAndReturnStatusAndMetadata(headers)
  694. case .clientClosedServerIdle(let state):
  695. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  696. case (.invalid(let action), true):
  697. // The headers are invalid, but the server signalled that it was
  698. // closing the stream, so close the server side too.
  699. self.state = .clientClosedServerClosed(.init(previousState: state))
  700. return action
  701. case (.invalid(let action), false):
  702. // Client is already closed, so we don't need to update our state.
  703. return action
  704. case (.valid, true):
  705. // This is a trailers-only response: close server.
  706. self.state = .clientClosedServerClosed(.init(previousState: state))
  707. return try self.validateAndReturnStatusAndMetadata(headers)
  708. case (.valid, false):
  709. switch self.processInboundEncoding(headers) {
  710. case .error(let failure):
  711. return failure
  712. case .success(let inboundEncoding):
  713. self.state = .clientClosedServerOpen(
  714. .init(
  715. previousState: state,
  716. decompressionAlgorithm: inboundEncoding
  717. )
  718. )
  719. return .receivedMetadata(Metadata(headers: headers))
  720. }
  721. }
  722. case .clientClosedServerOpen(let state):
  723. // This state is valid even if endStream is not set: server can send
  724. // trailing metadata without END_STREAM set, and follow it with an
  725. // empty message frame where it is set.
  726. // However, we must make sure that grpc-status is set, otherwise this
  727. // is an invalid state.
  728. if endStream {
  729. self.state = .clientClosedServerClosed(.init(previousState: state))
  730. }
  731. return try self.validateAndReturnStatusAndMetadata(headers)
  732. case .clientClosedServerClosed:
  733. // We could end up here if we received a grpc-status header in a previous
  734. // frame (which would have already close the server) and then we receive
  735. // an empty frame with EOS set.
  736. // We wouldn't want to throw in that scenario, so we just ignore it.
  737. // Note that we don't want to ignore it if EOS is not set here though, as
  738. // then it would be an invalid payload.
  739. if !endStream || headers.count > 0 {
  740. try self.invalidState(
  741. "Server is closed, nothing could have been sent."
  742. )
  743. }
  744. return .doNothing
  745. case .clientIdleServerIdle:
  746. try self.invalidState(
  747. "Server cannot have sent metadata if the client is idle."
  748. )
  749. case .clientOpenServerClosed:
  750. try self.invalidState(
  751. "Server is closed, nothing could have been sent."
  752. )
  753. }
  754. }
  755. private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
  756. // This is a message received by the client, from the server.
  757. switch self.state {
  758. case .clientIdleServerIdle:
  759. try self.invalidState(
  760. "Cannot have received anything from server if client is not yet open."
  761. )
  762. case .clientOpenServerIdle, .clientClosedServerIdle:
  763. try self.invalidState(
  764. "Server cannot have sent a message before sending the initial metadata."
  765. )
  766. case .clientOpenServerOpen(var state):
  767. try state.deframer.process(buffer: buffer) { deframedMessage in
  768. state.inboundMessageBuffer.append(deframedMessage)
  769. }
  770. if endStream {
  771. self.state = .clientOpenServerClosed(.init(previousState: state))
  772. } else {
  773. self.state = .clientOpenServerOpen(state)
  774. }
  775. case .clientClosedServerOpen(var state):
  776. // The client may have sent the end stream and thus it's closed,
  777. // but the server may still be responding.
  778. // The client must have a deframer set up, so force-unwrap is okay.
  779. try state.deframer!.process(buffer: buffer) { deframedMessage in
  780. state.inboundMessageBuffer.append(deframedMessage)
  781. }
  782. if endStream {
  783. self.state = .clientClosedServerClosed(.init(previousState: state))
  784. } else {
  785. self.state = .clientClosedServerOpen(state)
  786. }
  787. case .clientOpenServerClosed, .clientClosedServerClosed:
  788. try self.invalidState(
  789. "Cannot have received anything from a closed server."
  790. )
  791. }
  792. }
  793. private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
  794. switch self.state {
  795. case .clientOpenServerOpen(var state):
  796. let message = state.inboundMessageBuffer.pop()
  797. self.state = .clientOpenServerOpen(state)
  798. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  799. case .clientOpenServerClosed(var state):
  800. let message = state.inboundMessageBuffer.pop()
  801. self.state = .clientOpenServerClosed(state)
  802. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  803. case .clientClosedServerOpen(var state):
  804. let message = state.inboundMessageBuffer.pop()
  805. self.state = .clientClosedServerOpen(state)
  806. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  807. case .clientClosedServerClosed(var state):
  808. let message = state.inboundMessageBuffer.pop()
  809. self.state = .clientClosedServerClosed(state)
  810. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  811. case .clientIdleServerIdle,
  812. .clientOpenServerIdle,
  813. .clientClosedServerIdle:
  814. return .awaitMoreMessages
  815. }
  816. }
  817. private func invalidState(_ message: String, line: UInt = #line) throws -> Never {
  818. if !self.skipAssertions {
  819. assertionFailure(message, line: line)
  820. }
  821. throw RPCError(code: .internalError, message: message)
  822. }
  823. }
  824. // - MARK: Server
  825. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  826. extension GRPCStreamStateMachine {
  827. private func makeResponseHeaders(
  828. outboundEncoding: CompressionAlgorithm?,
  829. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
  830. customMetadata: Metadata
  831. ) -> HPACKHeaders {
  832. // Response headers always contain :status (HTTP Status 200) and content-type.
  833. // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
  834. var headers = HPACKHeaders()
  835. headers.reserveCapacity(4 + customMetadata.count)
  836. headers.add("200", forKey: .status)
  837. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  838. if let outboundEncoding, outboundEncoding != .identity {
  839. headers.add(outboundEncoding.name, forKey: .encoding)
  840. }
  841. for acceptedEncoding in configuration.acceptedEncodings {
  842. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  843. }
  844. for metadataPair in customMetadata {
  845. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  846. }
  847. return headers
  848. }
  849. private mutating func serverSend(
  850. metadata: Metadata,
  851. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  852. ) throws -> HPACKHeaders {
  853. // Server sends initial metadata
  854. switch self.state {
  855. case .clientOpenServerIdle(let state):
  856. self.state = .clientOpenServerOpen(
  857. .init(
  858. previousState: state,
  859. // In the case of the server, it will already have a deframer set up,
  860. // because it already knows what encoding the client is using:
  861. // it's okay to force-unwrap.
  862. deframer: state.deframer!,
  863. decompressor: state.decompressor
  864. )
  865. )
  866. return self.makeResponseHeaders(
  867. outboundEncoding: state.outboundCompression,
  868. configuration: configuration,
  869. customMetadata: metadata
  870. )
  871. case .clientClosedServerIdle(let state):
  872. self.state = .clientClosedServerOpen(.init(previousState: state))
  873. return self.makeResponseHeaders(
  874. outboundEncoding: state.outboundCompression,
  875. configuration: configuration,
  876. customMetadata: metadata
  877. )
  878. case .clientIdleServerIdle:
  879. try self.invalidState(
  880. "Client cannot be idle if server is sending initial metadata: it must have opened."
  881. )
  882. case .clientOpenServerClosed, .clientClosedServerClosed:
  883. try self.invalidState(
  884. "Server cannot send metadata if closed."
  885. )
  886. case .clientOpenServerOpen, .clientClosedServerOpen:
  887. try self.invalidState(
  888. "Server has already sent initial metadata."
  889. )
  890. }
  891. }
  892. private mutating func serverSend(message: [UInt8]) throws {
  893. switch self.state {
  894. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  895. try self.invalidState(
  896. "Server must have sent initial metadata before sending a message."
  897. )
  898. case .clientOpenServerOpen(var state):
  899. state.framer.append(message)
  900. self.state = .clientOpenServerOpen(state)
  901. case .clientClosedServerOpen(var state):
  902. state.framer.append(message)
  903. self.state = .clientClosedServerOpen(state)
  904. case .clientOpenServerClosed, .clientClosedServerClosed:
  905. try self.invalidState(
  906. "Server can't send a message if it's closed."
  907. )
  908. }
  909. }
  910. private func makeTrailers(
  911. status: Status,
  912. customMetadata: Metadata?,
  913. trailersOnly: Bool
  914. ) -> HPACKHeaders {
  915. // Trailers always contain the grpc-status header, and optionally,
  916. // grpc-status-message, and custom metadata.
  917. // If it's a trailers-only response, they will also contain :status and
  918. // content-type.
  919. var headers = HPACKHeaders()
  920. let customMetadataCount = customMetadata?.count ?? 0
  921. if trailersOnly {
  922. // Reserve 4 for capacity: 3 for the required headers, and 1 for the
  923. // optional status message.
  924. headers.reserveCapacity(4 + customMetadataCount)
  925. headers.add("200", forKey: .status)
  926. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  927. } else {
  928. // Reserve 2 for capacity: one for the required grpc-status, and
  929. // one for the optional message.
  930. headers.reserveCapacity(2 + customMetadataCount)
  931. }
  932. headers.add(String(status.code.rawValue), forKey: .grpcStatus)
  933. if !status.message.isEmpty {
  934. if let percentEncodedMessage = GRPCStatusMessageMarshaller.marshall(status.message) {
  935. headers.add(percentEncodedMessage, forKey: .grpcStatusMessage)
  936. }
  937. }
  938. if let customMetadata {
  939. for metadataPair in customMetadata {
  940. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  941. }
  942. }
  943. return headers
  944. }
  945. private mutating func serverSend(
  946. status: Status,
  947. customMetadata: Metadata
  948. ) throws -> HPACKHeaders {
  949. // Close the server.
  950. switch self.state {
  951. case .clientOpenServerOpen(let state):
  952. self.state = .clientOpenServerClosed(.init(previousState: state))
  953. return self.makeTrailers(
  954. status: status,
  955. customMetadata: customMetadata,
  956. trailersOnly: false
  957. )
  958. case .clientClosedServerOpen(let state):
  959. self.state = .clientClosedServerClosed(.init(previousState: state))
  960. return self.makeTrailers(
  961. status: status,
  962. customMetadata: customMetadata,
  963. trailersOnly: false
  964. )
  965. case .clientOpenServerIdle(let state):
  966. self.state = .clientOpenServerClosed(.init(previousState: state))
  967. return self.makeTrailers(
  968. status: status,
  969. customMetadata: customMetadata,
  970. trailersOnly: true
  971. )
  972. case .clientClosedServerIdle(let state):
  973. self.state = .clientClosedServerClosed(.init(previousState: state))
  974. return self.makeTrailers(
  975. status: status,
  976. customMetadata: customMetadata,
  977. trailersOnly: true
  978. )
  979. case .clientIdleServerIdle:
  980. try self.invalidState(
  981. "Server can't send status if client is idle."
  982. )
  983. case .clientOpenServerClosed, .clientClosedServerClosed:
  984. try self.invalidState(
  985. "Server can't send anything if closed."
  986. )
  987. }
  988. }
  989. mutating private func closeServerAndBuildRejectRPCAction(
  990. currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
  991. endStream: Bool,
  992. rejectWithStatus status: Status
  993. ) -> OnMetadataReceived {
  994. if endStream {
  995. self.state = .clientClosedServerClosed(.init(previousState: currentState))
  996. } else {
  997. self.state = .clientOpenServerClosed(.init(previousState: currentState))
  998. }
  999. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1000. return .rejectRPC(trailers: trailers)
  1001. }
  1002. private mutating func serverReceive(
  1003. headers: HPACKHeaders,
  1004. endStream: Bool,
  1005. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  1006. ) throws -> OnMetadataReceived {
  1007. switch self.state {
  1008. case .clientIdleServerIdle(let state):
  1009. let contentType = headers.firstString(forKey: .contentType)
  1010. .flatMap { ContentType(value: $0) }
  1011. if contentType == nil {
  1012. self.state = .clientOpenServerClosed(.init(previousState: state))
  1013. // Respond with HTTP-level Unsupported Media Type status code.
  1014. var trailers = HPACKHeaders()
  1015. trailers.add("415", forKey: .status)
  1016. return .rejectRPC(trailers: trailers)
  1017. }
  1018. let path = headers.firstString(forKey: .path)
  1019. .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
  1020. if path == nil {
  1021. return self.closeServerAndBuildRejectRPCAction(
  1022. currentState: state,
  1023. endStream: endStream,
  1024. rejectWithStatus: Status(
  1025. code: .unimplemented,
  1026. message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
  1027. )
  1028. )
  1029. }
  1030. let scheme = headers.firstString(forKey: .scheme)
  1031. .flatMap { Scheme(rawValue: $0) }
  1032. if scheme == nil {
  1033. return self.closeServerAndBuildRejectRPCAction(
  1034. currentState: state,
  1035. endStream: endStream,
  1036. rejectWithStatus: Status(
  1037. code: .invalidArgument,
  1038. message: ":scheme header must be present and one of \"http\" or \"https\"."
  1039. )
  1040. )
  1041. }
  1042. guard let method = headers.firstString(forKey: .method), method == "POST" else {
  1043. return self.closeServerAndBuildRejectRPCAction(
  1044. currentState: state,
  1045. endStream: endStream,
  1046. rejectWithStatus: Status(
  1047. code: .invalidArgument,
  1048. message: ":method header is expected to be present and have a value of \"POST\"."
  1049. )
  1050. )
  1051. }
  1052. guard let te = headers.firstString(forKey: .te), te == "trailers" else {
  1053. return self.closeServerAndBuildRejectRPCAction(
  1054. currentState: state,
  1055. endStream: endStream,
  1056. rejectWithStatus: Status(
  1057. code: .invalidArgument,
  1058. message: "\"te\" header is expected to be present and have a value of \"trailers\"."
  1059. )
  1060. )
  1061. }
  1062. func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool {
  1063. clientEncoding == .identity || configuration.acceptedEncodings.contains(clientEncoding)
  1064. }
  1065. // Firstly, find out if we support the client's chosen encoding, and reject
  1066. // the RPC if we don't.
  1067. let inboundEncoding: CompressionAlgorithm
  1068. let encodingValues = headers.values(
  1069. forHeader: GRPCHTTP2Keys.encoding.rawValue,
  1070. canonicalForm: true
  1071. )
  1072. var encodingValuesIterator = encodingValues.makeIterator()
  1073. if let rawEncoding = encodingValuesIterator.next() {
  1074. guard encodingValuesIterator.next() == nil else {
  1075. let status = Status(
  1076. code: .internalError,
  1077. message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
  1078. )
  1079. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1080. return .rejectRPC(trailers: trailers)
  1081. }
  1082. guard let clientEncoding = CompressionAlgorithm(rawValue: String(rawEncoding)),
  1083. isIdentityOrCompatibleEncoding(clientEncoding)
  1084. else {
  1085. let statusMessage: String
  1086. let customMetadata: Metadata?
  1087. if configuration.acceptedEncodings.isEmpty {
  1088. statusMessage = "Compression is not supported"
  1089. customMetadata = nil
  1090. } else {
  1091. statusMessage = """
  1092. \(rawEncoding) compression is not supported; \
  1093. supported algorithms are listed in grpc-accept-encoding
  1094. """
  1095. customMetadata = {
  1096. var trailers = Metadata()
  1097. trailers.reserveCapacity(configuration.acceptedEncodings.count)
  1098. for acceptedEncoding in configuration.acceptedEncodings {
  1099. trailers.addString(
  1100. acceptedEncoding.name,
  1101. forKey: GRPCHTTP2Keys.acceptEncoding.rawValue
  1102. )
  1103. }
  1104. return trailers
  1105. }()
  1106. }
  1107. let trailers = self.makeTrailers(
  1108. status: Status(code: .unimplemented, message: statusMessage),
  1109. customMetadata: customMetadata,
  1110. trailersOnly: true
  1111. )
  1112. return .rejectRPC(trailers: trailers)
  1113. }
  1114. // Server supports client's encoding.
  1115. inboundEncoding = clientEncoding
  1116. } else {
  1117. inboundEncoding = .identity
  1118. }
  1119. // Secondly, find a compatible encoding the server can use to compress outbound messages,
  1120. // based on the encodings the client has advertised.
  1121. var outboundEncoding: CompressionAlgorithm = .identity
  1122. let clientAdvertisedEncodings = headers.values(
  1123. forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue,
  1124. canonicalForm: true
  1125. )
  1126. // Find the preferred encoding and use it to compress responses.
  1127. // If it's identity, just skip it altogether, since we won't be
  1128. // compressing.
  1129. for clientAdvertisedEncoding in clientAdvertisedEncodings {
  1130. if let algorithm = CompressionAlgorithm(rawValue: String(clientAdvertisedEncoding)),
  1131. isIdentityOrCompatibleEncoding(algorithm)
  1132. {
  1133. outboundEncoding = algorithm
  1134. break
  1135. }
  1136. }
  1137. if endStream {
  1138. self.state = .clientClosedServerIdle(
  1139. .init(
  1140. previousState: state,
  1141. compressionAlgorithm: outboundEncoding
  1142. )
  1143. )
  1144. } else {
  1145. let compressor = Zlib.Method(encoding: outboundEncoding)
  1146. .flatMap { Zlib.Compressor(method: $0) }
  1147. let decompressor = Zlib.Method(encoding: inboundEncoding)
  1148. .flatMap { Zlib.Decompressor(method: $0) }
  1149. let deframer = GRPCMessageDeframer(
  1150. maximumPayloadSize: state.maximumPayloadSize,
  1151. decompressor: decompressor
  1152. )
  1153. self.state = .clientOpenServerIdle(
  1154. .init(
  1155. previousState: state,
  1156. compressor: compressor,
  1157. framer: GRPCMessageFramer(),
  1158. decompressor: decompressor,
  1159. deframer: NIOSingleStepByteToMessageProcessor(deframer)
  1160. )
  1161. )
  1162. }
  1163. return .receivedMetadata(Metadata(headers: headers))
  1164. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  1165. try self.invalidState(
  1166. "Client shouldn't have sent metadata twice."
  1167. )
  1168. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1169. try self.invalidState(
  1170. "Client can't have sent metadata if closed."
  1171. )
  1172. }
  1173. }
  1174. private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
  1175. switch self.state {
  1176. case .clientIdleServerIdle:
  1177. try self.invalidState(
  1178. "Can't have received a message if client is idle."
  1179. )
  1180. case .clientOpenServerIdle(var state):
  1181. // Deframer must be present on the server side, as we know the decompression
  1182. // algorithm from the moment the client opens.
  1183. try state.deframer!.process(buffer: buffer) { deframedMessage in
  1184. state.inboundMessageBuffer.append(deframedMessage)
  1185. }
  1186. if endStream {
  1187. self.state = .clientClosedServerIdle(.init(previousState: state))
  1188. } else {
  1189. self.state = .clientOpenServerIdle(state)
  1190. }
  1191. case .clientOpenServerOpen(var state):
  1192. try state.deframer.process(buffer: buffer) { deframedMessage in
  1193. state.inboundMessageBuffer.append(deframedMessage)
  1194. }
  1195. if endStream {
  1196. self.state = .clientClosedServerOpen(.init(previousState: state))
  1197. } else {
  1198. self.state = .clientOpenServerOpen(state)
  1199. }
  1200. case .clientOpenServerClosed(let state):
  1201. // Client is not done sending request, but server has already closed.
  1202. // Ignore the rest of the request: do nothing, unless endStream is set,
  1203. // in which case close the client.
  1204. if endStream {
  1205. self.state = .clientClosedServerClosed(.init(previousState: state))
  1206. }
  1207. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1208. try self.invalidState(
  1209. "Client can't send a message if closed."
  1210. )
  1211. }
  1212. }
  1213. private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage {
  1214. switch self.state {
  1215. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  1216. try self.invalidState("Server is not open yet.")
  1217. case .clientOpenServerOpen(var state):
  1218. let response = try state.framer.next(compressor: state.compressor)
  1219. self.state = .clientOpenServerOpen(state)
  1220. return response.map { .sendMessage($0) } ?? .awaitMoreMessages
  1221. case .clientClosedServerOpen(var state):
  1222. let response = try state.framer.next(compressor: state.compressor)
  1223. self.state = .clientClosedServerOpen(state)
  1224. return response.map { .sendMessage($0) } ?? .awaitMoreMessages
  1225. case .clientOpenServerClosed(var state):
  1226. let response = try state.framer?.next(compressor: state.compressor)
  1227. self.state = .clientOpenServerClosed(state)
  1228. if let response {
  1229. return .sendMessage(response)
  1230. } else {
  1231. return .noMoreMessages
  1232. }
  1233. case .clientClosedServerClosed(var state):
  1234. let response = try state.framer?.next(compressor: state.compressor)
  1235. self.state = .clientClosedServerClosed(state)
  1236. if let response {
  1237. return .sendMessage(response)
  1238. } else {
  1239. return .noMoreMessages
  1240. }
  1241. }
  1242. }
  1243. private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
  1244. switch self.state {
  1245. case .clientOpenServerIdle(var state):
  1246. let request = state.inboundMessageBuffer.pop()
  1247. self.state = .clientOpenServerIdle(state)
  1248. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1249. case .clientOpenServerOpen(var state):
  1250. let request = state.inboundMessageBuffer.pop()
  1251. self.state = .clientOpenServerOpen(state)
  1252. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1253. case .clientOpenServerClosed(var state):
  1254. let request = state.inboundMessageBuffer.pop()
  1255. self.state = .clientOpenServerClosed(state)
  1256. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1257. case .clientClosedServerOpen(var state):
  1258. let request = state.inboundMessageBuffer.pop()
  1259. self.state = .clientClosedServerOpen(state)
  1260. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1261. case .clientClosedServerClosed(var state):
  1262. let request = state.inboundMessageBuffer.pop()
  1263. self.state = .clientClosedServerClosed(state)
  1264. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1265. case .clientClosedServerIdle:
  1266. return .noMoreMessages
  1267. case .clientIdleServerIdle:
  1268. return .awaitMoreMessages
  1269. }
  1270. }
  1271. }
  1272. extension MethodDescriptor {
  1273. init?(fullyQualifiedMethod: String) {
  1274. let split = fullyQualifiedMethod.split(separator: "/")
  1275. guard split.count == 2 else {
  1276. return nil
  1277. }
  1278. self.init(service: String(split[0]), method: String(split[1]))
  1279. }
  1280. }
  1281. internal enum GRPCHTTP2Keys: String {
  1282. case path = ":path"
  1283. case contentType = "content-type"
  1284. case encoding = "grpc-encoding"
  1285. case acceptEncoding = "grpc-accept-encoding"
  1286. case scheme = ":scheme"
  1287. case method = ":method"
  1288. case te = "te"
  1289. case status = ":status"
  1290. case grpcStatus = "grpc-status"
  1291. case grpcStatusMessage = "grpc-status-message"
  1292. }
  1293. extension HPACKHeaders {
  1294. internal func firstString(forKey key: GRPCHTTP2Keys) -> String? {
  1295. self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map {
  1296. String($0)
  1297. }
  1298. }
  1299. internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
  1300. self.add(name: key.rawValue, value: value)
  1301. }
  1302. }
  1303. extension Zlib.Method {
  1304. init?(encoding: CompressionAlgorithm) {
  1305. switch encoding {
  1306. case .identity:
  1307. return nil
  1308. case .deflate:
  1309. self = .deflate
  1310. case .gzip:
  1311. self = .gzip
  1312. default:
  1313. return nil
  1314. }
  1315. }
  1316. }
  1317. extension Metadata {
  1318. init(headers: HPACKHeaders) {
  1319. var metadata = Metadata()
  1320. metadata.reserveCapacity(headers.count)
  1321. for header in headers {
  1322. if header.name.hasSuffix("-bin") {
  1323. do {
  1324. let decodedBinary = try header.value.base64Decoded()
  1325. metadata.addBinary(decodedBinary, forKey: header.name)
  1326. } catch {
  1327. metadata.addString(header.value, forKey: header.name)
  1328. }
  1329. } else {
  1330. metadata.addString(header.value, forKey: header.name)
  1331. }
  1332. }
  1333. self = metadata
  1334. }
  1335. }
  1336. extension Status.Code {
  1337. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  1338. init(httpStatusCode: HTTPResponseStatus) {
  1339. switch httpStatusCode {
  1340. case .badRequest:
  1341. self = .internalError
  1342. case .unauthorized:
  1343. self = .unauthenticated
  1344. case .forbidden:
  1345. self = .permissionDenied
  1346. case .notFound:
  1347. self = .unimplemented
  1348. case .tooManyRequests, .badGateway, .serviceUnavailable, .gatewayTimeout:
  1349. self = .unavailable
  1350. default:
  1351. self = .unknown
  1352. }
  1353. }
  1354. }