GRPCStreamStateMachine.swift 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. enum Scheme: String {
  21. case http
  22. case https
  23. }
  24. enum GRPCStreamStateMachineConfiguration {
  25. case client(ClientConfiguration)
  26. case server(ServerConfiguration)
  27. struct ClientConfiguration {
  28. var methodDescriptor: MethodDescriptor
  29. var scheme: Scheme
  30. var outboundEncoding: CompressionAlgorithm
  31. var acceptedEncodings: [CompressionAlgorithm]
  32. }
  33. struct ServerConfiguration {
  34. var scheme: Scheme
  35. var acceptedEncodings: [CompressionAlgorithm]
  36. }
  37. }
  38. private enum GRPCStreamStateMachineState {
  39. case clientIdleServerIdle(ClientIdleServerIdleState)
  40. case clientOpenServerIdle(ClientOpenServerIdleState)
  41. case clientOpenServerOpen(ClientOpenServerOpenState)
  42. case clientOpenServerClosed(ClientOpenServerClosedState)
  43. case clientClosedServerIdle(ClientClosedServerIdleState)
  44. case clientClosedServerOpen(ClientClosedServerOpenState)
  45. case clientClosedServerClosed(ClientClosedServerClosedState)
  46. struct ClientIdleServerIdleState {
  47. let maximumPayloadSize: Int
  48. }
  49. struct ClientOpenServerIdleState {
  50. let maximumPayloadSize: Int
  51. var framer: GRPCMessageFramer
  52. var compressor: Zlib.Compressor?
  53. var outboundCompression: CompressionAlgorithm?
  54. // The deframer must be optional because the client will not have one configured
  55. // until the server opens and sends a grpc-encoding header.
  56. // It will be present for the server though, because even though it's idle,
  57. // it can still receive compressed messages from the client.
  58. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  59. var decompressor: Zlib.Decompressor?
  60. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  61. init(
  62. previousState: ClientIdleServerIdleState,
  63. compressor: Zlib.Compressor?,
  64. framer: GRPCMessageFramer,
  65. decompressor: Zlib.Decompressor?,
  66. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  67. ) {
  68. self.maximumPayloadSize = previousState.maximumPayloadSize
  69. self.compressor = compressor
  70. self.framer = framer
  71. self.decompressor = decompressor
  72. self.deframer = deframer
  73. self.inboundMessageBuffer = .init()
  74. }
  75. }
  76. struct ClientOpenServerOpenState {
  77. var framer: GRPCMessageFramer
  78. var compressor: Zlib.Compressor?
  79. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>
  80. var decompressor: Zlib.Decompressor?
  81. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  82. init(
  83. previousState: ClientOpenServerIdleState,
  84. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
  85. decompressor: Zlib.Decompressor?
  86. ) {
  87. self.framer = previousState.framer
  88. self.compressor = previousState.compressor
  89. self.deframer = deframer
  90. self.decompressor = decompressor
  91. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  92. }
  93. }
  94. struct ClientOpenServerClosedState {
  95. var framer: GRPCMessageFramer?
  96. var compressor: Zlib.Compressor?
  97. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  98. var decompressor: Zlib.Decompressor?
  99. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  100. // This transition should only happen on the server-side when, upon receiving
  101. // initial client metadata, some of the headers are invalid and we must reject
  102. // the RPC.
  103. // We will mark the client as open (because it sent initial metadata albeit
  104. // invalid) but we'll close the server, meaning all future messages sent from
  105. // the client will be ignored. Because of this, we won't need to frame or
  106. // deframe any messages, as we won't be reading or writing any messages.
  107. init(previousState: ClientIdleServerIdleState) {
  108. self.framer = nil
  109. self.compressor = nil
  110. self.deframer = nil
  111. self.decompressor = nil
  112. self.inboundMessageBuffer = .init()
  113. }
  114. init(previousState: ClientOpenServerOpenState) {
  115. self.framer = previousState.framer
  116. self.compressor = previousState.compressor
  117. self.deframer = previousState.deframer
  118. self.decompressor = previousState.decompressor
  119. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  120. }
  121. init(previousState: ClientOpenServerIdleState) {
  122. self.framer = previousState.framer
  123. self.compressor = previousState.compressor
  124. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  125. // The server went directly from idle to closed - this means it sent a
  126. // trailers-only response:
  127. // - if we're the client, the previous state was a nil deframer, but that
  128. // is okay because we don't need a deframer as the server won't be sending
  129. // any messages;
  130. // - if we're the server, we'll keep whatever deframer we had.
  131. self.deframer = previousState.deframer
  132. self.decompressor = previousState.decompressor
  133. }
  134. }
  135. struct ClientClosedServerIdleState {
  136. let maximumPayloadSize: Int
  137. var framer: GRPCMessageFramer
  138. var compressor: Zlib.Compressor?
  139. var outboundCompression: CompressionAlgorithm?
  140. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  141. var decompressor: Zlib.Decompressor?
  142. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  143. /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
  144. /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
  145. /// any more messages from the client anyways, as it's closed.
  146. init(
  147. previousState: ClientIdleServerIdleState,
  148. compressionAlgorithm: CompressionAlgorithm
  149. ) {
  150. self.maximumPayloadSize = previousState.maximumPayloadSize
  151. if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) {
  152. self.compressor = Zlib.Compressor(method: zlibMethod)
  153. }
  154. self.framer = GRPCMessageFramer()
  155. self.outboundCompression = compressionAlgorithm
  156. // We don't need a deframer since we won't receive any messages from the
  157. // client: it's closed.
  158. self.deframer = nil
  159. self.inboundMessageBuffer = .init()
  160. }
  161. init(previousState: ClientOpenServerIdleState) {
  162. self.maximumPayloadSize = previousState.maximumPayloadSize
  163. self.framer = previousState.framer
  164. self.compressor = previousState.compressor
  165. self.outboundCompression = previousState.outboundCompression
  166. self.deframer = previousState.deframer
  167. self.decompressor = previousState.decompressor
  168. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  169. }
  170. }
  171. struct ClientClosedServerOpenState {
  172. var framer: GRPCMessageFramer
  173. var compressor: Zlib.Compressor?
  174. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  175. var decompressor: Zlib.Decompressor?
  176. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  177. init(previousState: ClientOpenServerOpenState) {
  178. self.framer = previousState.framer
  179. self.compressor = previousState.compressor
  180. self.deframer = previousState.deframer
  181. self.decompressor = previousState.decompressor
  182. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  183. }
  184. /// This should be called from the server path, as the deframer will already be configured in this scenario.
  185. init(previousState: ClientClosedServerIdleState) {
  186. self.framer = previousState.framer
  187. self.compressor = previousState.compressor
  188. // In the case of the server, we don't need to deframe/decompress any more
  189. // messages, since the client's closed.
  190. self.deframer = nil
  191. self.decompressor = nil
  192. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  193. }
  194. /// This should only be called from the client path, as the deframer has not yet been set up.
  195. init(
  196. previousState: ClientClosedServerIdleState,
  197. decompressionAlgorithm: CompressionAlgorithm
  198. ) {
  199. self.framer = previousState.framer
  200. self.compressor = previousState.compressor
  201. // In the case of the client, it will only be able to set up the deframer
  202. // after it receives the chosen encoding from the server.
  203. if let zlibMethod = Zlib.Method(encoding: decompressionAlgorithm) {
  204. self.decompressor = Zlib.Decompressor(method: zlibMethod)
  205. }
  206. let decoder = GRPCMessageDeframer(
  207. maximumPayloadSize: previousState.maximumPayloadSize,
  208. decompressor: self.decompressor
  209. )
  210. self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
  211. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  212. }
  213. }
  214. struct ClientClosedServerClosedState {
  215. // We still need the framer and compressor in case the server has closed
  216. // but its buffer is not yet empty and still needs to send messages out to
  217. // the client.
  218. var framer: GRPCMessageFramer?
  219. var compressor: Zlib.Compressor?
  220. // These are already deframed, so we don't need the deframer anymore.
  221. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  222. // This transition should only happen on the server-side when, upon receiving
  223. // initial client metadata, some of the headers are invalid and we must reject
  224. // the RPC.
  225. // We will mark the client as closed (because it set the EOS flag, even if
  226. // the initial metadata was invalid) and we'll close the server too.
  227. // Because of this, we won't need to frame any messages, as we
  228. // won't be writing any messages.
  229. init(previousState: ClientIdleServerIdleState) {
  230. self.framer = nil
  231. self.compressor = nil
  232. self.inboundMessageBuffer = .init()
  233. }
  234. init(previousState: ClientClosedServerOpenState) {
  235. self.framer = previousState.framer
  236. self.compressor = previousState.compressor
  237. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  238. }
  239. init(previousState: ClientClosedServerIdleState) {
  240. self.framer = previousState.framer
  241. self.compressor = previousState.compressor
  242. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  243. }
  244. init(previousState: ClientOpenServerIdleState) {
  245. self.framer = previousState.framer
  246. self.compressor = previousState.compressor
  247. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  248. }
  249. init(previousState: ClientOpenServerClosedState) {
  250. self.framer = previousState.framer
  251. self.compressor = previousState.compressor
  252. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  253. }
  254. }
  255. }
  256. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  257. struct GRPCStreamStateMachine {
  258. private var state: GRPCStreamStateMachineState
  259. private var configuration: GRPCStreamStateMachineConfiguration
  260. private var skipAssertions: Bool
  261. init(
  262. configuration: GRPCStreamStateMachineConfiguration,
  263. maximumPayloadSize: Int,
  264. skipAssertions: Bool = false
  265. ) {
  266. self.state = .clientIdleServerIdle(.init(maximumPayloadSize: maximumPayloadSize))
  267. self.configuration = configuration
  268. self.skipAssertions = skipAssertions
  269. }
  270. mutating func send(metadata: Metadata) throws -> HPACKHeaders {
  271. switch self.configuration {
  272. case .client(let clientConfiguration):
  273. return try self.clientSend(metadata: metadata, configuration: clientConfiguration)
  274. case .server(let serverConfiguration):
  275. return try self.serverSend(metadata: metadata, configuration: serverConfiguration)
  276. }
  277. }
  278. mutating func send(message: [UInt8], endStream: Bool) throws {
  279. switch self.configuration {
  280. case .client:
  281. try self.clientSend(message: message, endStream: endStream)
  282. case .server:
  283. if endStream {
  284. try self.invalidState(
  285. "Can't end response stream by sending a message - send(status:metadata:) must be called"
  286. )
  287. }
  288. try self.serverSend(message: message)
  289. }
  290. }
  291. mutating func send(
  292. status: Status,
  293. metadata: Metadata
  294. ) throws -> HPACKHeaders {
  295. switch self.configuration {
  296. case .client:
  297. try self.invalidState(
  298. "Client cannot send status and trailer."
  299. )
  300. case .server:
  301. return try self.serverSend(
  302. status: status,
  303. customMetadata: metadata
  304. )
  305. }
  306. }
  307. enum OnMetadataReceived: Equatable {
  308. case receivedMetadata(Metadata)
  309. // Client-specific actions
  310. case receivedStatusAndMetadata(status: Status, metadata: Metadata)
  311. case doNothing
  312. // Server-specific actions
  313. case rejectRPC(trailers: HPACKHeaders)
  314. }
  315. mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
  316. switch self.configuration {
  317. case .client:
  318. return try self.clientReceive(headers: headers, endStream: endStream)
  319. case .server(let serverConfiguration):
  320. return try self.serverReceive(
  321. headers: headers,
  322. endStream: endStream,
  323. configuration: serverConfiguration
  324. )
  325. }
  326. }
  327. mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
  328. switch self.configuration {
  329. case .client:
  330. try self.clientReceive(buffer: buffer, endStream: endStream)
  331. case .server:
  332. try self.serverReceive(buffer: buffer, endStream: endStream)
  333. }
  334. }
  335. /// The result of requesting the next outbound message.
  336. enum OnNextOutboundMessage: Equatable {
  337. /// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done
  338. /// writing messages (i.e. we are now closed).
  339. case noMoreMessages
  340. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  341. case awaitMoreMessages
  342. /// A message is ready to be sent.
  343. case sendMessage(ByteBuffer)
  344. }
  345. mutating func nextOutboundMessage() throws -> OnNextOutboundMessage {
  346. switch self.configuration {
  347. case .client:
  348. return try self.clientNextOutboundMessage()
  349. case .server:
  350. return try self.serverNextOutboundMessage()
  351. }
  352. }
  353. /// The result of requesting the next inbound message.
  354. enum OnNextInboundMessage: Equatable {
  355. /// The sender is done writing messages and there are no more messages to be received.
  356. case noMoreMessages
  357. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  358. case awaitMoreMessages
  359. /// A message has been received.
  360. case receiveMessage([UInt8])
  361. }
  362. mutating func nextInboundMessage() -> OnNextInboundMessage {
  363. switch self.configuration {
  364. case .client:
  365. return self.clientNextInboundMessage()
  366. case .server:
  367. return self.serverNextInboundMessage()
  368. }
  369. }
  370. mutating func tearDown() {
  371. switch self.state {
  372. case .clientIdleServerIdle:
  373. ()
  374. case .clientOpenServerIdle(let state):
  375. state.compressor?.end()
  376. state.decompressor?.end()
  377. case .clientOpenServerOpen(let state):
  378. state.compressor?.end()
  379. state.decompressor?.end()
  380. case .clientOpenServerClosed(let state):
  381. state.compressor?.end()
  382. state.decompressor?.end()
  383. case .clientClosedServerIdle(let state):
  384. state.compressor?.end()
  385. state.decompressor?.end()
  386. case .clientClosedServerOpen(let state):
  387. state.compressor?.end()
  388. state.decompressor?.end()
  389. case .clientClosedServerClosed(let state):
  390. state.compressor?.end()
  391. }
  392. }
  393. }
  394. // - MARK: Client
  395. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  396. extension GRPCStreamStateMachine {
  397. private func makeClientHeaders(
  398. methodDescriptor: MethodDescriptor,
  399. scheme: Scheme,
  400. outboundEncoding: CompressionAlgorithm?,
  401. acceptedEncodings: [CompressionAlgorithm],
  402. customMetadata: Metadata
  403. ) -> HPACKHeaders {
  404. var headers = HPACKHeaders()
  405. headers.reserveCapacity(7 + customMetadata.count)
  406. // Add required headers.
  407. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  408. // The order is important here: reserved HTTP2 headers (those starting with `:`)
  409. // must come before all other headers.
  410. headers.add("POST", forKey: .method)
  411. headers.add(scheme.rawValue, forKey: .scheme)
  412. headers.add(methodDescriptor.fullyQualifiedMethod, forKey: .path)
  413. // Add required gRPC headers.
  414. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  415. headers.add("trailers", forKey: .te) // Used to detect incompatible proxies
  416. if let encoding = outboundEncoding, encoding != .identity {
  417. headers.add(encoding.name, forKey: .encoding)
  418. }
  419. for acceptedEncoding in acceptedEncodings {
  420. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  421. }
  422. for metadataPair in customMetadata {
  423. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  424. }
  425. return headers
  426. }
  427. private mutating func clientSend(
  428. metadata: Metadata,
  429. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  430. ) throws -> HPACKHeaders {
  431. // Client sends metadata only when opening the stream.
  432. switch self.state {
  433. case .clientIdleServerIdle(let state):
  434. let outboundEncoding = configuration.outboundEncoding
  435. let compressor = Zlib.Method(encoding: outboundEncoding)
  436. .flatMap { Zlib.Compressor(method: $0) }
  437. self.state = .clientOpenServerIdle(
  438. .init(
  439. previousState: state,
  440. compressor: compressor,
  441. framer: GRPCMessageFramer(),
  442. decompressor: nil,
  443. deframer: nil
  444. )
  445. )
  446. return self.makeClientHeaders(
  447. methodDescriptor: configuration.methodDescriptor,
  448. scheme: configuration.scheme,
  449. outboundEncoding: configuration.outboundEncoding,
  450. acceptedEncodings: configuration.acceptedEncodings,
  451. customMetadata: metadata
  452. )
  453. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  454. try self.invalidState(
  455. "Client is already open: shouldn't be sending metadata."
  456. )
  457. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  458. try self.invalidState(
  459. "Client is closed: can't send metadata."
  460. )
  461. }
  462. }
  463. private mutating func clientSend(message: [UInt8], endStream: Bool) throws {
  464. // Client sends message.
  465. switch self.state {
  466. case .clientIdleServerIdle:
  467. try self.invalidState("Client not yet open.")
  468. case .clientOpenServerIdle(var state):
  469. state.framer.append(message)
  470. if endStream {
  471. self.state = .clientClosedServerIdle(.init(previousState: state))
  472. } else {
  473. self.state = .clientOpenServerIdle(state)
  474. }
  475. case .clientOpenServerOpen(var state):
  476. state.framer.append(message)
  477. if endStream {
  478. self.state = .clientClosedServerOpen(.init(previousState: state))
  479. } else {
  480. self.state = .clientOpenServerOpen(state)
  481. }
  482. case .clientOpenServerClosed(let state):
  483. // The server has closed, so it makes no sense to send the rest of the request.
  484. // However, do close if endStream is set.
  485. if endStream {
  486. self.state = .clientClosedServerClosed(.init(previousState: state))
  487. }
  488. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  489. try self.invalidState(
  490. "Client is closed, cannot send a message."
  491. )
  492. }
  493. }
  494. /// Returns the client's next request to the server.
  495. /// - Returns: The request to be made to the server.
  496. private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage {
  497. switch self.state {
  498. case .clientIdleServerIdle:
  499. try self.invalidState("Client is not open yet.")
  500. case .clientOpenServerIdle(var state):
  501. let request = try state.framer.next(compressor: state.compressor)
  502. self.state = .clientOpenServerIdle(state)
  503. return request.map { .sendMessage($0) } ?? .awaitMoreMessages
  504. case .clientOpenServerOpen(var state):
  505. let request = try state.framer.next(compressor: state.compressor)
  506. self.state = .clientOpenServerOpen(state)
  507. return request.map { .sendMessage($0) } ?? .awaitMoreMessages
  508. case .clientClosedServerIdle(var state):
  509. let request = try state.framer.next(compressor: state.compressor)
  510. self.state = .clientClosedServerIdle(state)
  511. if let request {
  512. return .sendMessage(request)
  513. } else {
  514. return .noMoreMessages
  515. }
  516. case .clientClosedServerOpen(var state):
  517. let request = try state.framer.next(compressor: state.compressor)
  518. self.state = .clientClosedServerOpen(state)
  519. if let request {
  520. return .sendMessage(request)
  521. } else {
  522. return .noMoreMessages
  523. }
  524. case .clientOpenServerClosed, .clientClosedServerClosed:
  525. // No point in sending any more requests if the server is closed.
  526. return .noMoreMessages
  527. }
  528. }
  529. private enum ServerHeadersValidationResult {
  530. case valid
  531. case invalid(OnMetadataReceived)
  532. }
  533. private mutating func clientValidateHeadersReceivedFromServer(
  534. _ metadata: HPACKHeaders
  535. ) -> ServerHeadersValidationResult {
  536. var httpStatus: String? {
  537. metadata.firstString(forKey: .status)
  538. }
  539. var grpcStatus: Status.Code? {
  540. metadata.firstString(forKey: .grpcStatus)
  541. .flatMap { Int($0) }
  542. .flatMap { Status.Code(rawValue: $0) }
  543. }
  544. guard httpStatus == "200" || grpcStatus != nil else {
  545. let httpStatusCode =
  546. httpStatus
  547. .flatMap { Int($0) }
  548. .map { HTTPResponseStatus(statusCode: $0) }
  549. guard let httpStatusCode else {
  550. return .invalid(
  551. .receivedStatusAndMetadata(
  552. status: .init(code: .unknown, message: "HTTP Status Code is missing."),
  553. metadata: Metadata(headers: metadata)
  554. )
  555. )
  556. }
  557. if (100 ... 199).contains(httpStatusCode.code) {
  558. // For 1xx status codes, the entire header should be skipped and a
  559. // subsequent header should be read.
  560. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  561. return .invalid(.doNothing)
  562. }
  563. // Forward the mapped status code.
  564. return .invalid(
  565. .receivedStatusAndMetadata(
  566. status: .init(
  567. code: Status.Code(httpStatusCode: httpStatusCode),
  568. message: "Unexpected non-200 HTTP Status Code."
  569. ),
  570. metadata: Metadata(headers: metadata)
  571. )
  572. )
  573. }
  574. let contentTypeHeader = metadata.first(name: GRPCHTTP2Keys.contentType.rawValue)
  575. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  576. return .invalid(
  577. .receivedStatusAndMetadata(
  578. status: .init(
  579. code: .internalError,
  580. message: "Missing \(GRPCHTTP2Keys.contentType) header"
  581. ),
  582. metadata: Metadata(headers: metadata)
  583. )
  584. )
  585. }
  586. return .valid
  587. }
  588. private enum ProcessInboundEncodingResult {
  589. case error(OnMetadataReceived)
  590. case success(CompressionAlgorithm)
  591. }
  592. private func processInboundEncoding(_ metadata: HPACKHeaders) -> ProcessInboundEncodingResult {
  593. let inboundEncoding: CompressionAlgorithm
  594. if let serverEncoding = metadata.first(name: GRPCHTTP2Keys.encoding.rawValue) {
  595. guard let parsedEncoding = CompressionAlgorithm(rawValue: serverEncoding) else {
  596. return .error(
  597. .receivedStatusAndMetadata(
  598. status: .init(
  599. code: .internalError,
  600. message:
  601. "The server picked a compression algorithm ('\(serverEncoding)') the client does not know about."
  602. ),
  603. metadata: Metadata(headers: metadata)
  604. )
  605. )
  606. }
  607. inboundEncoding = parsedEncoding
  608. } else {
  609. inboundEncoding = .identity
  610. }
  611. return .success(inboundEncoding)
  612. }
  613. private func validateAndReturnStatusAndMetadata(
  614. _ metadata: HPACKHeaders
  615. ) throws -> OnMetadataReceived {
  616. let rawStatusCode = metadata.firstString(forKey: .grpcStatus)
  617. guard let rawStatusCode,
  618. let intStatusCode = Int(rawStatusCode),
  619. let statusCode = Status.Code(rawValue: intStatusCode)
  620. else {
  621. let message =
  622. "Non-initial metadata must be a trailer containing a valid grpc-status"
  623. + (rawStatusCode.flatMap { "but was \($0)" } ?? "")
  624. throw RPCError(code: .unknown, message: message)
  625. }
  626. let statusMessage =
  627. metadata.firstString(forKey: .grpcStatusMessage)
  628. .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
  629. var convertedMetadata = Metadata(headers: metadata)
  630. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  631. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  632. return .receivedStatusAndMetadata(
  633. status: Status(code: statusCode, message: statusMessage),
  634. metadata: convertedMetadata
  635. )
  636. }
  637. private mutating func clientReceive(
  638. headers: HPACKHeaders,
  639. endStream: Bool
  640. ) throws -> OnMetadataReceived {
  641. switch self.state {
  642. case .clientOpenServerIdle(let state):
  643. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  644. case (.invalid(let action), true):
  645. // The headers are invalid, but the server signalled that it was
  646. // closing the stream, so close both client and server.
  647. self.state = .clientClosedServerClosed(.init(previousState: state))
  648. return action
  649. case (.invalid(let action), false):
  650. self.state = .clientClosedServerIdle(.init(previousState: state))
  651. return action
  652. case (.valid, true):
  653. // This is a trailers-only response: close server.
  654. self.state = .clientOpenServerClosed(.init(previousState: state))
  655. return try self.validateAndReturnStatusAndMetadata(headers)
  656. case (.valid, false):
  657. switch self.processInboundEncoding(headers) {
  658. case .error(let failure):
  659. return failure
  660. case .success(let inboundEncoding):
  661. let decompressor = Zlib.Method(encoding: inboundEncoding)
  662. .flatMap { Zlib.Decompressor(method: $0) }
  663. let deframer = GRPCMessageDeframer(
  664. maximumPayloadSize: state.maximumPayloadSize,
  665. decompressor: decompressor
  666. )
  667. self.state = .clientOpenServerOpen(
  668. .init(
  669. previousState: state,
  670. deframer: NIOSingleStepByteToMessageProcessor(deframer),
  671. decompressor: decompressor
  672. )
  673. )
  674. return .receivedMetadata(Metadata(headers: headers))
  675. }
  676. }
  677. case .clientOpenServerOpen(let state):
  678. // This state is valid even if endStream is not set: server can send
  679. // trailing metadata without END_STREAM set, and follow it with an
  680. // empty message frame where it is set.
  681. // However, we must make sure that grpc-status is set, otherwise this
  682. // is an invalid state.
  683. if endStream {
  684. self.state = .clientOpenServerClosed(.init(previousState: state))
  685. }
  686. return try self.validateAndReturnStatusAndMetadata(headers)
  687. case .clientClosedServerIdle(let state):
  688. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  689. case (.invalid(let action), true):
  690. // The headers are invalid, but the server signalled that it was
  691. // closing the stream, so close the server side too.
  692. self.state = .clientClosedServerClosed(.init(previousState: state))
  693. return action
  694. case (.invalid(let action), false):
  695. // Client is already closed, so we don't need to update our state.
  696. return action
  697. case (.valid, true):
  698. // This is a trailers-only response: close server.
  699. self.state = .clientClosedServerClosed(.init(previousState: state))
  700. return try self.validateAndReturnStatusAndMetadata(headers)
  701. case (.valid, false):
  702. switch self.processInboundEncoding(headers) {
  703. case .error(let failure):
  704. return failure
  705. case .success(let inboundEncoding):
  706. self.state = .clientClosedServerOpen(
  707. .init(
  708. previousState: state,
  709. decompressionAlgorithm: inboundEncoding
  710. )
  711. )
  712. return .receivedMetadata(Metadata(headers: headers))
  713. }
  714. }
  715. case .clientClosedServerOpen(let state):
  716. // This state is valid even if endStream is not set: server can send
  717. // trailing metadata without END_STREAM set, and follow it with an
  718. // empty message frame where it is set.
  719. // However, we must make sure that grpc-status is set, otherwise this
  720. // is an invalid state.
  721. if endStream {
  722. self.state = .clientClosedServerClosed(.init(previousState: state))
  723. }
  724. return try self.validateAndReturnStatusAndMetadata(headers)
  725. case .clientClosedServerClosed:
  726. // We could end up here if we received a grpc-status header in a previous
  727. // frame (which would have already close the server) and then we receive
  728. // an empty frame with EOS set.
  729. // We wouldn't want to throw in that scenario, so we just ignore it.
  730. // Note that we don't want to ignore it if EOS is not set here though, as
  731. // then it would be an invalid payload.
  732. if !endStream || headers.count > 0 {
  733. try self.invalidState(
  734. "Server is closed, nothing could have been sent."
  735. )
  736. }
  737. return .doNothing
  738. case .clientIdleServerIdle:
  739. try self.invalidState(
  740. "Server cannot have sent metadata if the client is idle."
  741. )
  742. case .clientOpenServerClosed:
  743. try self.invalidState(
  744. "Server is closed, nothing could have been sent."
  745. )
  746. }
  747. }
  748. private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
  749. // This is a message received by the client, from the server.
  750. switch self.state {
  751. case .clientIdleServerIdle:
  752. try self.invalidState(
  753. "Cannot have received anything from server if client is not yet open."
  754. )
  755. case .clientOpenServerIdle, .clientClosedServerIdle:
  756. try self.invalidState(
  757. "Server cannot have sent a message before sending the initial metadata."
  758. )
  759. case .clientOpenServerOpen(var state):
  760. try state.deframer.process(buffer: buffer) { deframedMessage in
  761. state.inboundMessageBuffer.append(deframedMessage)
  762. }
  763. if endStream {
  764. self.state = .clientOpenServerClosed(.init(previousState: state))
  765. } else {
  766. self.state = .clientOpenServerOpen(state)
  767. }
  768. case .clientClosedServerOpen(var state):
  769. // The client may have sent the end stream and thus it's closed,
  770. // but the server may still be responding.
  771. // The client must have a deframer set up, so force-unwrap is okay.
  772. try state.deframer!.process(buffer: buffer) { deframedMessage in
  773. state.inboundMessageBuffer.append(deframedMessage)
  774. }
  775. if endStream {
  776. self.state = .clientClosedServerClosed(.init(previousState: state))
  777. } else {
  778. self.state = .clientClosedServerOpen(state)
  779. }
  780. case .clientOpenServerClosed, .clientClosedServerClosed:
  781. try self.invalidState(
  782. "Cannot have received anything from a closed server."
  783. )
  784. }
  785. }
  786. private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
  787. switch self.state {
  788. case .clientOpenServerOpen(var state):
  789. let message = state.inboundMessageBuffer.pop()
  790. self.state = .clientOpenServerOpen(state)
  791. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  792. case .clientOpenServerClosed(var state):
  793. let message = state.inboundMessageBuffer.pop()
  794. self.state = .clientOpenServerClosed(state)
  795. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  796. case .clientClosedServerOpen(var state):
  797. let message = state.inboundMessageBuffer.pop()
  798. self.state = .clientClosedServerOpen(state)
  799. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  800. case .clientClosedServerClosed(var state):
  801. let message = state.inboundMessageBuffer.pop()
  802. self.state = .clientClosedServerClosed(state)
  803. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  804. case .clientIdleServerIdle,
  805. .clientOpenServerIdle,
  806. .clientClosedServerIdle:
  807. return .awaitMoreMessages
  808. }
  809. }
  810. private func invalidState(_ message: String, line: UInt = #line) throws -> Never {
  811. if !self.skipAssertions {
  812. assertionFailure(message, line: line)
  813. }
  814. throw RPCError(code: .internalError, message: message)
  815. }
  816. }
  817. // - MARK: Server
  818. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  819. extension GRPCStreamStateMachine {
  820. private func makeResponseHeaders(
  821. outboundEncoding: CompressionAlgorithm?,
  822. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
  823. customMetadata: Metadata
  824. ) -> HPACKHeaders {
  825. // Response headers always contain :status (HTTP Status 200) and content-type.
  826. // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
  827. var headers = HPACKHeaders()
  828. headers.reserveCapacity(4 + customMetadata.count)
  829. headers.add("200", forKey: .status)
  830. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  831. if let outboundEncoding, outboundEncoding != .identity {
  832. headers.add(outboundEncoding.name, forKey: .encoding)
  833. }
  834. for acceptedEncoding in configuration.acceptedEncodings {
  835. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  836. }
  837. for metadataPair in customMetadata {
  838. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  839. }
  840. return headers
  841. }
  842. private mutating func serverSend(
  843. metadata: Metadata,
  844. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  845. ) throws -> HPACKHeaders {
  846. // Server sends initial metadata
  847. switch self.state {
  848. case .clientOpenServerIdle(let state):
  849. self.state = .clientOpenServerOpen(
  850. .init(
  851. previousState: state,
  852. // In the case of the server, it will already have a deframer set up,
  853. // because it already knows what encoding the client is using:
  854. // it's okay to force-unwrap.
  855. deframer: state.deframer!,
  856. decompressor: state.decompressor
  857. )
  858. )
  859. return self.makeResponseHeaders(
  860. outboundEncoding: state.outboundCompression,
  861. configuration: configuration,
  862. customMetadata: metadata
  863. )
  864. case .clientClosedServerIdle(let state):
  865. self.state = .clientClosedServerOpen(.init(previousState: state))
  866. return self.makeResponseHeaders(
  867. outboundEncoding: state.outboundCompression,
  868. configuration: configuration,
  869. customMetadata: metadata
  870. )
  871. case .clientIdleServerIdle:
  872. try self.invalidState(
  873. "Client cannot be idle if server is sending initial metadata: it must have opened."
  874. )
  875. case .clientOpenServerClosed, .clientClosedServerClosed:
  876. try self.invalidState(
  877. "Server cannot send metadata if closed."
  878. )
  879. case .clientOpenServerOpen, .clientClosedServerOpen:
  880. try self.invalidState(
  881. "Server has already sent initial metadata."
  882. )
  883. }
  884. }
  885. private mutating func serverSend(message: [UInt8]) throws {
  886. switch self.state {
  887. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  888. try self.invalidState(
  889. "Server must have sent initial metadata before sending a message."
  890. )
  891. case .clientOpenServerOpen(var state):
  892. state.framer.append(message)
  893. self.state = .clientOpenServerOpen(state)
  894. case .clientClosedServerOpen(var state):
  895. state.framer.append(message)
  896. self.state = .clientClosedServerOpen(state)
  897. case .clientOpenServerClosed, .clientClosedServerClosed:
  898. try self.invalidState(
  899. "Server can't send a message if it's closed."
  900. )
  901. }
  902. }
  903. private func makeTrailers(
  904. status: Status,
  905. customMetadata: Metadata?,
  906. trailersOnly: Bool
  907. ) -> HPACKHeaders {
  908. // Trailers always contain the grpc-status header, and optionally,
  909. // grpc-status-message, and custom metadata.
  910. // If it's a trailers-only response, they will also contain :status and
  911. // content-type.
  912. var headers = HPACKHeaders()
  913. let customMetadataCount = customMetadata?.count ?? 0
  914. if trailersOnly {
  915. // Reserve 4 for capacity: 3 for the required headers, and 1 for the
  916. // optional status message.
  917. headers.reserveCapacity(4 + customMetadataCount)
  918. headers.add("200", forKey: .status)
  919. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  920. } else {
  921. // Reserve 2 for capacity: one for the required grpc-status, and
  922. // one for the optional message.
  923. headers.reserveCapacity(2 + customMetadataCount)
  924. }
  925. headers.add(String(status.code.rawValue), forKey: .grpcStatus)
  926. if !status.message.isEmpty {
  927. if let percentEncodedMessage = GRPCStatusMessageMarshaller.marshall(status.message) {
  928. headers.add(percentEncodedMessage, forKey: .grpcStatusMessage)
  929. }
  930. }
  931. if let customMetadata {
  932. for metadataPair in customMetadata {
  933. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  934. }
  935. }
  936. return headers
  937. }
  938. private mutating func serverSend(
  939. status: Status,
  940. customMetadata: Metadata
  941. ) throws -> HPACKHeaders {
  942. // Close the server.
  943. switch self.state {
  944. case .clientOpenServerOpen(let state):
  945. self.state = .clientOpenServerClosed(.init(previousState: state))
  946. return self.makeTrailers(
  947. status: status,
  948. customMetadata: customMetadata,
  949. trailersOnly: false
  950. )
  951. case .clientClosedServerOpen(let state):
  952. self.state = .clientClosedServerClosed(.init(previousState: state))
  953. return self.makeTrailers(
  954. status: status,
  955. customMetadata: customMetadata,
  956. trailersOnly: false
  957. )
  958. case .clientOpenServerIdle(let state):
  959. self.state = .clientOpenServerClosed(.init(previousState: state))
  960. return self.makeTrailers(
  961. status: status,
  962. customMetadata: customMetadata,
  963. trailersOnly: true
  964. )
  965. case .clientClosedServerIdle(let state):
  966. self.state = .clientClosedServerClosed(.init(previousState: state))
  967. return self.makeTrailers(
  968. status: status,
  969. customMetadata: customMetadata,
  970. trailersOnly: true
  971. )
  972. case .clientIdleServerIdle:
  973. try self.invalidState(
  974. "Server can't send status if client is idle."
  975. )
  976. case .clientOpenServerClosed, .clientClosedServerClosed:
  977. try self.invalidState(
  978. "Server can't send anything if closed."
  979. )
  980. }
  981. }
  982. mutating private func closeServerAndBuildRejectRPCAction(
  983. currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
  984. endStream: Bool,
  985. rejectWithStatus status: Status
  986. ) -> OnMetadataReceived {
  987. if endStream {
  988. self.state = .clientClosedServerClosed(.init(previousState: currentState))
  989. } else {
  990. self.state = .clientOpenServerClosed(.init(previousState: currentState))
  991. }
  992. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  993. return .rejectRPC(trailers: trailers)
  994. }
  995. private mutating func serverReceive(
  996. headers: HPACKHeaders,
  997. endStream: Bool,
  998. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  999. ) throws -> OnMetadataReceived {
  1000. switch self.state {
  1001. case .clientIdleServerIdle(let state):
  1002. let contentType = headers.firstString(forKey: .contentType)
  1003. .flatMap { ContentType(value: $0) }
  1004. if contentType == nil {
  1005. self.state = .clientOpenServerClosed(.init(previousState: state))
  1006. // Respond with HTTP-level Unsupported Media Type status code.
  1007. var trailers = HPACKHeaders()
  1008. trailers.add("415", forKey: .status)
  1009. return .rejectRPC(trailers: trailers)
  1010. }
  1011. let path = headers.firstString(forKey: .path)
  1012. .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
  1013. if path == nil {
  1014. return self.closeServerAndBuildRejectRPCAction(
  1015. currentState: state,
  1016. endStream: endStream,
  1017. rejectWithStatus: Status(
  1018. code: .unimplemented,
  1019. message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
  1020. )
  1021. )
  1022. }
  1023. let scheme = headers.firstString(forKey: .scheme)
  1024. .flatMap { Scheme(rawValue: $0) }
  1025. if scheme == nil {
  1026. return self.closeServerAndBuildRejectRPCAction(
  1027. currentState: state,
  1028. endStream: endStream,
  1029. rejectWithStatus: Status(
  1030. code: .invalidArgument,
  1031. message: ":scheme header must be present and one of \"http\" or \"https\"."
  1032. )
  1033. )
  1034. }
  1035. guard let method = headers.firstString(forKey: .method), method == "POST" else {
  1036. return self.closeServerAndBuildRejectRPCAction(
  1037. currentState: state,
  1038. endStream: endStream,
  1039. rejectWithStatus: Status(
  1040. code: .invalidArgument,
  1041. message: ":method header is expected to be present and have a value of \"POST\"."
  1042. )
  1043. )
  1044. }
  1045. guard let te = headers.firstString(forKey: .te), te == "trailers" else {
  1046. return self.closeServerAndBuildRejectRPCAction(
  1047. currentState: state,
  1048. endStream: endStream,
  1049. rejectWithStatus: Status(
  1050. code: .invalidArgument,
  1051. message: "\"te\" header is expected to be present and have a value of \"trailers\"."
  1052. )
  1053. )
  1054. }
  1055. func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool {
  1056. clientEncoding == .identity || configuration.acceptedEncodings.contains(clientEncoding)
  1057. }
  1058. // Firstly, find out if we support the client's chosen encoding, and reject
  1059. // the RPC if we don't.
  1060. let inboundEncoding: CompressionAlgorithm
  1061. let encodingValues = headers.values(
  1062. forHeader: GRPCHTTP2Keys.encoding.rawValue,
  1063. canonicalForm: true
  1064. )
  1065. var encodingValuesIterator = encodingValues.makeIterator()
  1066. if let rawEncoding = encodingValuesIterator.next() {
  1067. guard encodingValuesIterator.next() == nil else {
  1068. let status = Status(
  1069. code: .internalError,
  1070. message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
  1071. )
  1072. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1073. return .rejectRPC(trailers: trailers)
  1074. }
  1075. guard let clientEncoding = CompressionAlgorithm(rawValue: String(rawEncoding)),
  1076. isIdentityOrCompatibleEncoding(clientEncoding)
  1077. else {
  1078. let statusMessage: String
  1079. let customMetadata: Metadata?
  1080. if configuration.acceptedEncodings.isEmpty {
  1081. statusMessage = "Compression is not supported"
  1082. customMetadata = nil
  1083. } else {
  1084. statusMessage = """
  1085. \(rawEncoding) compression is not supported; \
  1086. supported algorithms are listed in grpc-accept-encoding
  1087. """
  1088. customMetadata = {
  1089. var trailers = Metadata()
  1090. trailers.reserveCapacity(configuration.acceptedEncodings.count)
  1091. for acceptedEncoding in configuration.acceptedEncodings {
  1092. trailers.addString(
  1093. acceptedEncoding.name,
  1094. forKey: GRPCHTTP2Keys.acceptEncoding.rawValue
  1095. )
  1096. }
  1097. return trailers
  1098. }()
  1099. }
  1100. let trailers = self.makeTrailers(
  1101. status: Status(code: .unimplemented, message: statusMessage),
  1102. customMetadata: customMetadata,
  1103. trailersOnly: true
  1104. )
  1105. return .rejectRPC(trailers: trailers)
  1106. }
  1107. // Server supports client's encoding.
  1108. inboundEncoding = clientEncoding
  1109. } else {
  1110. inboundEncoding = .identity
  1111. }
  1112. // Secondly, find a compatible encoding the server can use to compress outbound messages,
  1113. // based on the encodings the client has advertised.
  1114. var outboundEncoding: CompressionAlgorithm = .identity
  1115. let clientAdvertisedEncodings = headers.values(
  1116. forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue,
  1117. canonicalForm: true
  1118. )
  1119. // Find the preferred encoding and use it to compress responses.
  1120. // If it's identity, just skip it altogether, since we won't be
  1121. // compressing.
  1122. for clientAdvertisedEncoding in clientAdvertisedEncodings {
  1123. if let algorithm = CompressionAlgorithm(rawValue: String(clientAdvertisedEncoding)),
  1124. isIdentityOrCompatibleEncoding(algorithm)
  1125. {
  1126. outboundEncoding = algorithm
  1127. break
  1128. }
  1129. }
  1130. if endStream {
  1131. self.state = .clientClosedServerIdle(
  1132. .init(
  1133. previousState: state,
  1134. compressionAlgorithm: outboundEncoding
  1135. )
  1136. )
  1137. } else {
  1138. let compressor = Zlib.Method(encoding: outboundEncoding)
  1139. .flatMap { Zlib.Compressor(method: $0) }
  1140. let decompressor = Zlib.Method(encoding: inboundEncoding)
  1141. .flatMap { Zlib.Decompressor(method: $0) }
  1142. let deframer = GRPCMessageDeframer(
  1143. maximumPayloadSize: state.maximumPayloadSize,
  1144. decompressor: decompressor
  1145. )
  1146. self.state = .clientOpenServerIdle(
  1147. .init(
  1148. previousState: state,
  1149. compressor: compressor,
  1150. framer: GRPCMessageFramer(),
  1151. decompressor: decompressor,
  1152. deframer: NIOSingleStepByteToMessageProcessor(deframer)
  1153. )
  1154. )
  1155. }
  1156. return .receivedMetadata(Metadata(headers: headers))
  1157. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  1158. try self.invalidState(
  1159. "Client shouldn't have sent metadata twice."
  1160. )
  1161. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1162. try self.invalidState(
  1163. "Client can't have sent metadata if closed."
  1164. )
  1165. }
  1166. }
  1167. private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
  1168. switch self.state {
  1169. case .clientIdleServerIdle:
  1170. try self.invalidState(
  1171. "Can't have received a message if client is idle."
  1172. )
  1173. case .clientOpenServerIdle(var state):
  1174. // Deframer must be present on the server side, as we know the decompression
  1175. // algorithm from the moment the client opens.
  1176. try state.deframer!.process(buffer: buffer) { deframedMessage in
  1177. state.inboundMessageBuffer.append(deframedMessage)
  1178. }
  1179. if endStream {
  1180. self.state = .clientClosedServerIdle(.init(previousState: state))
  1181. } else {
  1182. self.state = .clientOpenServerIdle(state)
  1183. }
  1184. case .clientOpenServerOpen(var state):
  1185. try state.deframer.process(buffer: buffer) { deframedMessage in
  1186. state.inboundMessageBuffer.append(deframedMessage)
  1187. }
  1188. if endStream {
  1189. self.state = .clientClosedServerOpen(.init(previousState: state))
  1190. } else {
  1191. self.state = .clientOpenServerOpen(state)
  1192. }
  1193. case .clientOpenServerClosed(let state):
  1194. // Client is not done sending request, but server has already closed.
  1195. // Ignore the rest of the request: do nothing, unless endStream is set,
  1196. // in which case close the client.
  1197. if endStream {
  1198. self.state = .clientClosedServerClosed(.init(previousState: state))
  1199. }
  1200. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1201. try self.invalidState(
  1202. "Client can't send a message if closed."
  1203. )
  1204. }
  1205. }
  1206. private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage {
  1207. switch self.state {
  1208. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  1209. try self.invalidState("Server is not open yet.")
  1210. case .clientOpenServerOpen(var state):
  1211. let response = try state.framer.next(compressor: state.compressor)
  1212. self.state = .clientOpenServerOpen(state)
  1213. return response.map { .sendMessage($0) } ?? .awaitMoreMessages
  1214. case .clientClosedServerOpen(var state):
  1215. let response = try state.framer.next(compressor: state.compressor)
  1216. self.state = .clientClosedServerOpen(state)
  1217. return response.map { .sendMessage($0) } ?? .awaitMoreMessages
  1218. case .clientOpenServerClosed(var state):
  1219. let response = try state.framer?.next(compressor: state.compressor)
  1220. self.state = .clientOpenServerClosed(state)
  1221. if let response {
  1222. return .sendMessage(response)
  1223. } else {
  1224. return .noMoreMessages
  1225. }
  1226. case .clientClosedServerClosed(var state):
  1227. let response = try state.framer?.next(compressor: state.compressor)
  1228. self.state = .clientClosedServerClosed(state)
  1229. if let response {
  1230. return .sendMessage(response)
  1231. } else {
  1232. return .noMoreMessages
  1233. }
  1234. }
  1235. }
  1236. private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
  1237. switch self.state {
  1238. case .clientOpenServerIdle(var state):
  1239. let request = state.inboundMessageBuffer.pop()
  1240. self.state = .clientOpenServerIdle(state)
  1241. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1242. case .clientOpenServerOpen(var state):
  1243. let request = state.inboundMessageBuffer.pop()
  1244. self.state = .clientOpenServerOpen(state)
  1245. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1246. case .clientOpenServerClosed(var state):
  1247. let request = state.inboundMessageBuffer.pop()
  1248. self.state = .clientOpenServerClosed(state)
  1249. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1250. case .clientClosedServerOpen(var state):
  1251. let request = state.inboundMessageBuffer.pop()
  1252. self.state = .clientClosedServerOpen(state)
  1253. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1254. case .clientClosedServerClosed(var state):
  1255. let request = state.inboundMessageBuffer.pop()
  1256. self.state = .clientClosedServerClosed(state)
  1257. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1258. case .clientClosedServerIdle:
  1259. return .noMoreMessages
  1260. case .clientIdleServerIdle:
  1261. return .awaitMoreMessages
  1262. }
  1263. }
  1264. }
  1265. extension MethodDescriptor {
  1266. init?(fullyQualifiedMethod: String) {
  1267. let split = fullyQualifiedMethod.split(separator: "/")
  1268. guard split.count == 2 else {
  1269. return nil
  1270. }
  1271. self.init(service: String(split[0]), method: String(split[1]))
  1272. }
  1273. }
  1274. internal enum GRPCHTTP2Keys: String {
  1275. case path = ":path"
  1276. case contentType = "content-type"
  1277. case encoding = "grpc-encoding"
  1278. case acceptEncoding = "grpc-accept-encoding"
  1279. case scheme = ":scheme"
  1280. case method = ":method"
  1281. case te = "te"
  1282. case status = ":status"
  1283. case grpcStatus = "grpc-status"
  1284. case grpcStatusMessage = "grpc-status-message"
  1285. }
  1286. extension HPACKHeaders {
  1287. internal func firstString(forKey key: GRPCHTTP2Keys) -> String? {
  1288. self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map {
  1289. String($0)
  1290. }
  1291. }
  1292. internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
  1293. self.add(name: key.rawValue, value: value)
  1294. }
  1295. }
  1296. extension Zlib.Method {
  1297. init?(encoding: CompressionAlgorithm) {
  1298. switch encoding {
  1299. case .identity:
  1300. return nil
  1301. case .deflate:
  1302. self = .deflate
  1303. case .gzip:
  1304. self = .gzip
  1305. default:
  1306. return nil
  1307. }
  1308. }
  1309. }
  1310. extension Metadata {
  1311. init(headers: HPACKHeaders) {
  1312. var metadata = Metadata()
  1313. metadata.reserveCapacity(headers.count)
  1314. for header in headers {
  1315. if header.name.hasSuffix("-bin") {
  1316. do {
  1317. let decodedBinary = try header.value.base64Decoded()
  1318. metadata.addBinary(decodedBinary, forKey: header.name)
  1319. } catch {
  1320. metadata.addString(header.value, forKey: header.name)
  1321. }
  1322. } else {
  1323. metadata.addString(header.value, forKey: header.name)
  1324. }
  1325. }
  1326. self = metadata
  1327. }
  1328. }
  1329. extension Status.Code {
  1330. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  1331. init(httpStatusCode: HTTPResponseStatus) {
  1332. switch httpStatusCode {
  1333. case .badRequest:
  1334. self = .internalError
  1335. case .unauthorized:
  1336. self = .unauthenticated
  1337. case .forbidden:
  1338. self = .permissionDenied
  1339. case .notFound:
  1340. self = .unimplemented
  1341. case .tooManyRequests, .badGateway, .serviceUnavailable, .gatewayTimeout:
  1342. self = .unavailable
  1343. default:
  1344. self = .unknown
  1345. }
  1346. }
  1347. }