GRPCStreamStateMachine.swift 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. enum Scheme: String {
  21. case http
  22. case https
  23. }
  24. enum GRPCStreamStateMachineConfiguration {
  25. case client(ClientConfiguration)
  26. case server(ServerConfiguration)
  27. struct ClientConfiguration {
  28. var methodDescriptor: MethodDescriptor
  29. var scheme: Scheme
  30. var outboundEncoding: CompressionAlgorithm
  31. var acceptedEncodings: [CompressionAlgorithm]
  32. }
  33. struct ServerConfiguration {
  34. var scheme: Scheme
  35. var acceptedEncodings: [CompressionAlgorithm]
  36. }
  37. }
  38. private enum GRPCStreamStateMachineState {
  39. case clientIdleServerIdle(ClientIdleServerIdleState)
  40. case clientOpenServerIdle(ClientOpenServerIdleState)
  41. case clientOpenServerOpen(ClientOpenServerOpenState)
  42. case clientOpenServerClosed(ClientOpenServerClosedState)
  43. case clientClosedServerIdle(ClientClosedServerIdleState)
  44. case clientClosedServerOpen(ClientClosedServerOpenState)
  45. case clientClosedServerClosed(ClientClosedServerClosedState)
  46. struct ClientIdleServerIdleState {
  47. let maximumPayloadSize: Int
  48. }
  49. struct ClientOpenServerIdleState {
  50. let maximumPayloadSize: Int
  51. var framer: GRPCMessageFramer
  52. var compressor: Zlib.Compressor?
  53. var outboundCompression: CompressionAlgorithm?
  54. // The deframer must be optional because the client will not have one configured
  55. // until the server opens and sends a grpc-encoding header.
  56. // It will be present for the server though, because even though it's idle,
  57. // it can still receive compressed messages from the client.
  58. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  59. var decompressor: Zlib.Decompressor?
  60. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  61. init(
  62. previousState: ClientIdleServerIdleState,
  63. compressor: Zlib.Compressor?,
  64. framer: GRPCMessageFramer,
  65. decompressor: Zlib.Decompressor?,
  66. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  67. ) {
  68. self.maximumPayloadSize = previousState.maximumPayloadSize
  69. self.compressor = compressor
  70. self.framer = framer
  71. self.decompressor = decompressor
  72. self.deframer = deframer
  73. self.inboundMessageBuffer = .init()
  74. }
  75. }
  76. struct ClientOpenServerOpenState {
  77. var framer: GRPCMessageFramer
  78. var compressor: Zlib.Compressor?
  79. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>
  80. var decompressor: Zlib.Decompressor?
  81. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  82. init(
  83. previousState: ClientOpenServerIdleState,
  84. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
  85. decompressor: Zlib.Decompressor?
  86. ) {
  87. self.framer = previousState.framer
  88. self.compressor = previousState.compressor
  89. self.deframer = deframer
  90. self.decompressor = decompressor
  91. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  92. }
  93. }
  94. struct ClientOpenServerClosedState {
  95. var framer: GRPCMessageFramer?
  96. var compressor: Zlib.Compressor?
  97. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  98. var decompressor: Zlib.Decompressor?
  99. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  100. // This transition should only happen on the server-side when, upon receiving
  101. // initial client metadata, some of the headers are invalid and we must reject
  102. // the RPC.
  103. // We will mark the client as open (because it sent initial metadata albeit
  104. // invalid) but we'll close the server, meaning all future messages sent from
  105. // the client will be ignored. Because of this, we won't need to frame or
  106. // deframe any messages, as we won't be reading or writing any messages.
  107. init(previousState: ClientIdleServerIdleState) {
  108. self.framer = nil
  109. self.compressor = nil
  110. self.deframer = nil
  111. self.decompressor = nil
  112. self.inboundMessageBuffer = .init()
  113. }
  114. init(previousState: ClientOpenServerOpenState) {
  115. self.framer = previousState.framer
  116. self.compressor = previousState.compressor
  117. self.deframer = previousState.deframer
  118. self.decompressor = previousState.decompressor
  119. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  120. }
  121. init(previousState: ClientOpenServerIdleState) {
  122. self.framer = previousState.framer
  123. self.compressor = previousState.compressor
  124. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  125. // The server went directly from idle to closed - this means it sent a
  126. // trailers-only response:
  127. // - if we're the client, the previous state was a nil deframer, but that
  128. // is okay because we don't need a deframer as the server won't be sending
  129. // any messages;
  130. // - if we're the server, we'll keep whatever deframer we had.
  131. self.deframer = previousState.deframer
  132. self.decompressor = previousState.decompressor
  133. }
  134. }
  135. struct ClientClosedServerIdleState {
  136. let maximumPayloadSize: Int
  137. var framer: GRPCMessageFramer
  138. var compressor: Zlib.Compressor?
  139. var outboundCompression: CompressionAlgorithm?
  140. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  141. var decompressor: Zlib.Decompressor?
  142. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  143. /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
  144. /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
  145. /// any more messages from the client anyways, as it's closed.
  146. init(
  147. previousState: ClientIdleServerIdleState,
  148. compressionAlgorithm: CompressionAlgorithm
  149. ) {
  150. self.maximumPayloadSize = previousState.maximumPayloadSize
  151. if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) {
  152. self.compressor = Zlib.Compressor(method: zlibMethod)
  153. }
  154. self.framer = GRPCMessageFramer()
  155. self.outboundCompression = compressionAlgorithm
  156. // We don't need a deframer since we won't receive any messages from the
  157. // client: it's closed.
  158. self.deframer = nil
  159. self.inboundMessageBuffer = .init()
  160. }
  161. init(previousState: ClientOpenServerIdleState) {
  162. self.maximumPayloadSize = previousState.maximumPayloadSize
  163. self.framer = previousState.framer
  164. self.compressor = previousState.compressor
  165. self.outboundCompression = previousState.outboundCompression
  166. self.deframer = previousState.deframer
  167. self.decompressor = previousState.decompressor
  168. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  169. }
  170. }
  171. struct ClientClosedServerOpenState {
  172. var framer: GRPCMessageFramer
  173. var compressor: Zlib.Compressor?
  174. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  175. var decompressor: Zlib.Decompressor?
  176. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  177. init(previousState: ClientOpenServerOpenState) {
  178. self.framer = previousState.framer
  179. self.compressor = previousState.compressor
  180. self.deframer = previousState.deframer
  181. self.decompressor = previousState.decompressor
  182. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  183. }
  184. /// This should be called from the server path, as the deframer will already be configured in this scenario.
  185. init(previousState: ClientClosedServerIdleState) {
  186. self.framer = previousState.framer
  187. self.compressor = previousState.compressor
  188. // In the case of the server, we don't need to deframe/decompress any more
  189. // messages, since the client's closed.
  190. self.deframer = nil
  191. self.decompressor = nil
  192. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  193. }
  194. /// This should only be called from the client path, as the deframer has not yet been set up.
  195. init(
  196. previousState: ClientClosedServerIdleState,
  197. decompressionAlgorithm: CompressionAlgorithm
  198. ) {
  199. self.framer = previousState.framer
  200. self.compressor = previousState.compressor
  201. // In the case of the client, it will only be able to set up the deframer
  202. // after it receives the chosen encoding from the server.
  203. if let zlibMethod = Zlib.Method(encoding: decompressionAlgorithm) {
  204. self.decompressor = Zlib.Decompressor(method: zlibMethod)
  205. }
  206. let decoder = GRPCMessageDeframer(
  207. maximumPayloadSize: previousState.maximumPayloadSize,
  208. decompressor: self.decompressor
  209. )
  210. self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
  211. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  212. }
  213. }
  214. struct ClientClosedServerClosedState {
  215. // We still need the framer and compressor in case the server has closed
  216. // but its buffer is not yet empty and still needs to send messages out to
  217. // the client.
  218. var framer: GRPCMessageFramer?
  219. var compressor: Zlib.Compressor?
  220. // These are already deframed, so we don't need the deframer anymore.
  221. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  222. // This transition should only happen on the server-side when, upon receiving
  223. // initial client metadata, some of the headers are invalid and we must reject
  224. // the RPC.
  225. // We will mark the client as closed (because it set the EOS flag, even if
  226. // the initial metadata was invalid) and we'll close the server too.
  227. // Because of this, we won't need to frame any messages, as we
  228. // won't be writing any messages.
  229. init(previousState: ClientIdleServerIdleState) {
  230. self.framer = nil
  231. self.compressor = nil
  232. self.inboundMessageBuffer = .init()
  233. }
  234. init(previousState: ClientClosedServerOpenState) {
  235. self.framer = previousState.framer
  236. self.compressor = previousState.compressor
  237. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  238. }
  239. init(previousState: ClientClosedServerIdleState) {
  240. self.framer = previousState.framer
  241. self.compressor = previousState.compressor
  242. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  243. }
  244. init(previousState: ClientOpenServerIdleState) {
  245. self.framer = previousState.framer
  246. self.compressor = previousState.compressor
  247. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  248. }
  249. init(previousState: ClientOpenServerClosedState) {
  250. self.framer = previousState.framer
  251. self.compressor = previousState.compressor
  252. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  253. }
  254. }
  255. }
  256. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  257. struct GRPCStreamStateMachine {
  258. private var state: GRPCStreamStateMachineState
  259. private var configuration: GRPCStreamStateMachineConfiguration
  260. private var skipAssertions: Bool
  261. init(
  262. configuration: GRPCStreamStateMachineConfiguration,
  263. maximumPayloadSize: Int,
  264. skipAssertions: Bool = false
  265. ) {
  266. self.state = .clientIdleServerIdle(.init(maximumPayloadSize: maximumPayloadSize))
  267. self.configuration = configuration
  268. self.skipAssertions = skipAssertions
  269. }
  270. mutating func send(metadata: Metadata) throws -> HPACKHeaders {
  271. switch self.configuration {
  272. case .client(let clientConfiguration):
  273. return try self.clientSend(metadata: metadata, configuration: clientConfiguration)
  274. case .server(let serverConfiguration):
  275. return try self.serverSend(metadata: metadata, configuration: serverConfiguration)
  276. }
  277. }
  278. mutating func send(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  279. switch self.configuration {
  280. case .client:
  281. try self.clientSend(message: message, promise: promise)
  282. case .server:
  283. try self.serverSend(message: message, promise: promise)
  284. }
  285. }
  286. mutating func closeOutbound() throws {
  287. switch self.configuration {
  288. case .client:
  289. try self.clientCloseOutbound()
  290. case .server:
  291. try self.invalidState("Server cannot call close: it must send status and trailers.")
  292. }
  293. }
  294. mutating func send(
  295. status: Status,
  296. metadata: Metadata
  297. ) throws -> HPACKHeaders {
  298. switch self.configuration {
  299. case .client:
  300. try self.invalidState(
  301. "Client cannot send status and trailer."
  302. )
  303. case .server:
  304. return try self.serverSend(
  305. status: status,
  306. customMetadata: metadata
  307. )
  308. }
  309. }
  310. enum OnMetadataReceived: Equatable {
  311. case receivedMetadata(Metadata)
  312. // Client-specific actions
  313. case receivedStatusAndMetadata(status: Status, metadata: Metadata)
  314. case doNothing
  315. // Server-specific actions
  316. case rejectRPC(trailers: HPACKHeaders)
  317. }
  318. mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
  319. switch self.configuration {
  320. case .client(let clientConfiguration):
  321. return try self.clientReceive(
  322. headers: headers,
  323. endStream: endStream,
  324. configuration: clientConfiguration
  325. )
  326. case .server(let serverConfiguration):
  327. return try self.serverReceive(
  328. headers: headers,
  329. endStream: endStream,
  330. configuration: serverConfiguration
  331. )
  332. }
  333. }
  334. mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
  335. switch self.configuration {
  336. case .client:
  337. try self.clientReceive(buffer: buffer, endStream: endStream)
  338. case .server:
  339. try self.serverReceive(buffer: buffer, endStream: endStream)
  340. }
  341. }
  342. /// The result of requesting the next outbound frame, which may contain multiple messages.
  343. enum OnNextOutboundFrame {
  344. /// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done
  345. /// writing messages (i.e. we are now closed).
  346. case noMoreMessages
  347. /// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying.
  348. case awaitMoreMessages
  349. /// A frame is ready to be sent.
  350. case sendFrame(
  351. frame: ByteBuffer,
  352. promise: EventLoopPromise<Void>?
  353. )
  354. }
  355. mutating func nextOutboundFrame() throws -> OnNextOutboundFrame {
  356. switch self.configuration {
  357. case .client:
  358. return try self.clientNextOutboundFrame()
  359. case .server:
  360. return try self.serverNextOutboundFrame()
  361. }
  362. }
  363. /// The result of requesting the next inbound message.
  364. enum OnNextInboundMessage: Equatable {
  365. /// The sender is done writing messages and there are no more messages to be received.
  366. case noMoreMessages
  367. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  368. case awaitMoreMessages
  369. /// A message has been received.
  370. case receiveMessage([UInt8])
  371. }
  372. mutating func nextInboundMessage() -> OnNextInboundMessage {
  373. switch self.configuration {
  374. case .client:
  375. return self.clientNextInboundMessage()
  376. case .server:
  377. return self.serverNextInboundMessage()
  378. }
  379. }
  380. mutating func tearDown() {
  381. switch self.state {
  382. case .clientIdleServerIdle:
  383. ()
  384. case .clientOpenServerIdle(let state):
  385. state.compressor?.end()
  386. state.decompressor?.end()
  387. case .clientOpenServerOpen(let state):
  388. state.compressor?.end()
  389. state.decompressor?.end()
  390. case .clientOpenServerClosed(let state):
  391. state.compressor?.end()
  392. state.decompressor?.end()
  393. case .clientClosedServerIdle(let state):
  394. state.compressor?.end()
  395. state.decompressor?.end()
  396. case .clientClosedServerOpen(let state):
  397. state.compressor?.end()
  398. state.decompressor?.end()
  399. case .clientClosedServerClosed(let state):
  400. state.compressor?.end()
  401. }
  402. }
  403. }
  404. // - MARK: Client
  405. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  406. extension GRPCStreamStateMachine {
  407. private func makeClientHeaders(
  408. methodDescriptor: MethodDescriptor,
  409. scheme: Scheme,
  410. outboundEncoding: CompressionAlgorithm?,
  411. acceptedEncodings: [CompressionAlgorithm],
  412. customMetadata: Metadata
  413. ) -> HPACKHeaders {
  414. var headers = HPACKHeaders()
  415. headers.reserveCapacity(7 + customMetadata.count)
  416. // Add required headers.
  417. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  418. // The order is important here: reserved HTTP2 headers (those starting with `:`)
  419. // must come before all other headers.
  420. headers.add("POST", forKey: .method)
  421. headers.add(scheme.rawValue, forKey: .scheme)
  422. headers.add(methodDescriptor.fullyQualifiedMethod, forKey: .path)
  423. // Add required gRPC headers.
  424. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  425. headers.add("trailers", forKey: .te) // Used to detect incompatible proxies
  426. if let encoding = outboundEncoding, encoding != .identity {
  427. headers.add(encoding.name, forKey: .encoding)
  428. }
  429. for acceptedEncoding in acceptedEncodings {
  430. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  431. }
  432. for metadataPair in customMetadata {
  433. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  434. }
  435. return headers
  436. }
  437. private mutating func clientSend(
  438. metadata: Metadata,
  439. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  440. ) throws -> HPACKHeaders {
  441. // Client sends metadata only when opening the stream.
  442. switch self.state {
  443. case .clientIdleServerIdle(let state):
  444. let outboundEncoding = configuration.outboundEncoding
  445. let compressor = Zlib.Method(encoding: outboundEncoding)
  446. .flatMap { Zlib.Compressor(method: $0) }
  447. self.state = .clientOpenServerIdle(
  448. .init(
  449. previousState: state,
  450. compressor: compressor,
  451. framer: GRPCMessageFramer(),
  452. decompressor: nil,
  453. deframer: nil
  454. )
  455. )
  456. return self.makeClientHeaders(
  457. methodDescriptor: configuration.methodDescriptor,
  458. scheme: configuration.scheme,
  459. outboundEncoding: configuration.outboundEncoding,
  460. acceptedEncodings: configuration.acceptedEncodings,
  461. customMetadata: metadata
  462. )
  463. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  464. try self.invalidState(
  465. "Client is already open: shouldn't be sending metadata."
  466. )
  467. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  468. try self.invalidState(
  469. "Client is closed: can't send metadata."
  470. )
  471. }
  472. }
  473. private mutating func clientSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  474. switch self.state {
  475. case .clientIdleServerIdle:
  476. try self.invalidState("Client not yet open.")
  477. case .clientOpenServerIdle(var state):
  478. state.framer.append(message, promise: promise)
  479. self.state = .clientOpenServerIdle(state)
  480. case .clientOpenServerOpen(var state):
  481. state.framer.append(message, promise: promise)
  482. self.state = .clientOpenServerOpen(state)
  483. case .clientOpenServerClosed:
  484. // The server has closed, so it makes no sense to send the rest of the request.
  485. ()
  486. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  487. try self.invalidState(
  488. "Client is closed, cannot send a message."
  489. )
  490. }
  491. }
  492. private mutating func clientCloseOutbound() throws {
  493. switch self.state {
  494. case .clientIdleServerIdle:
  495. try self.invalidState("Client not yet open.")
  496. case .clientOpenServerIdle(let state):
  497. self.state = .clientClosedServerIdle(.init(previousState: state))
  498. case .clientOpenServerOpen(let state):
  499. self.state = .clientClosedServerOpen(.init(previousState: state))
  500. case .clientOpenServerClosed(let state):
  501. self.state = .clientClosedServerClosed(.init(previousState: state))
  502. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  503. try self.invalidState("Client is already closed.")
  504. }
  505. }
  506. /// Returns the client's next request to the server.
  507. /// - Returns: The request to be made to the server.
  508. private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame {
  509. switch self.state {
  510. case .clientIdleServerIdle:
  511. try self.invalidState("Client is not open yet.")
  512. case .clientOpenServerIdle(var state):
  513. let request = try state.framer.next(compressor: state.compressor)
  514. self.state = .clientOpenServerIdle(state)
  515. return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  516. ?? .awaitMoreMessages
  517. case .clientOpenServerOpen(var state):
  518. let request = try state.framer.next(compressor: state.compressor)
  519. self.state = .clientOpenServerOpen(state)
  520. return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  521. ?? .awaitMoreMessages
  522. case .clientClosedServerIdle(var state):
  523. let request = try state.framer.next(compressor: state.compressor)
  524. self.state = .clientClosedServerIdle(state)
  525. if let request {
  526. return .sendFrame(frame: request.bytes, promise: request.promise)
  527. } else {
  528. return .noMoreMessages
  529. }
  530. case .clientClosedServerOpen(var state):
  531. let request = try state.framer.next(compressor: state.compressor)
  532. self.state = .clientClosedServerOpen(state)
  533. if let request {
  534. return .sendFrame(frame: request.bytes, promise: request.promise)
  535. } else {
  536. return .noMoreMessages
  537. }
  538. case .clientOpenServerClosed, .clientClosedServerClosed:
  539. // No point in sending any more requests if the server is closed.
  540. return .noMoreMessages
  541. }
  542. }
  543. private enum ServerHeadersValidationResult {
  544. case valid
  545. case invalid(OnMetadataReceived)
  546. }
  547. private mutating func clientValidateHeadersReceivedFromServer(
  548. _ metadata: HPACKHeaders
  549. ) -> ServerHeadersValidationResult {
  550. var httpStatus: String? {
  551. metadata.firstString(forKey: .status)
  552. }
  553. var grpcStatus: Status.Code? {
  554. metadata.firstString(forKey: .grpcStatus)
  555. .flatMap { Int($0) }
  556. .flatMap { Status.Code(rawValue: $0) }
  557. }
  558. guard httpStatus == "200" || grpcStatus != nil else {
  559. let httpStatusCode =
  560. httpStatus
  561. .flatMap { Int($0) }
  562. .map { HTTPResponseStatus(statusCode: $0) }
  563. guard let httpStatusCode else {
  564. return .invalid(
  565. .receivedStatusAndMetadata(
  566. status: .init(code: .unknown, message: "HTTP Status Code is missing."),
  567. metadata: Metadata(headers: metadata)
  568. )
  569. )
  570. }
  571. if (100 ... 199).contains(httpStatusCode.code) {
  572. // For 1xx status codes, the entire header should be skipped and a
  573. // subsequent header should be read.
  574. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  575. return .invalid(.doNothing)
  576. }
  577. // Forward the mapped status code.
  578. return .invalid(
  579. .receivedStatusAndMetadata(
  580. status: .init(
  581. code: Status.Code(httpStatusCode: httpStatusCode),
  582. message: "Unexpected non-200 HTTP Status Code."
  583. ),
  584. metadata: Metadata(headers: metadata)
  585. )
  586. )
  587. }
  588. let contentTypeHeader = metadata.first(name: GRPCHTTP2Keys.contentType.rawValue)
  589. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  590. return .invalid(
  591. .receivedStatusAndMetadata(
  592. status: .init(
  593. code: .internalError,
  594. message: "Missing \(GRPCHTTP2Keys.contentType.rawValue) header"
  595. ),
  596. metadata: Metadata(headers: metadata)
  597. )
  598. )
  599. }
  600. return .valid
  601. }
  602. private enum ProcessInboundEncodingResult {
  603. case error(OnMetadataReceived)
  604. case success(CompressionAlgorithm)
  605. }
  606. private func processInboundEncoding(
  607. headers: HPACKHeaders,
  608. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  609. ) -> ProcessInboundEncodingResult {
  610. let inboundEncoding: CompressionAlgorithm
  611. if let serverEncoding = headers.first(name: GRPCHTTP2Keys.encoding.rawValue) {
  612. guard let parsedEncoding = CompressionAlgorithm(rawValue: serverEncoding),
  613. configuration.acceptedEncodings.contains(parsedEncoding)
  614. else {
  615. return .error(
  616. .receivedStatusAndMetadata(
  617. status: .init(
  618. code: .internalError,
  619. message:
  620. "The server picked a compression algorithm ('\(serverEncoding)') the client does not know about."
  621. ),
  622. metadata: Metadata(headers: headers)
  623. )
  624. )
  625. }
  626. inboundEncoding = parsedEncoding
  627. } else {
  628. inboundEncoding = .identity
  629. }
  630. return .success(inboundEncoding)
  631. }
  632. private func validateAndReturnStatusAndMetadata(
  633. _ metadata: HPACKHeaders
  634. ) throws -> OnMetadataReceived {
  635. let rawStatusCode = metadata.firstString(forKey: .grpcStatus)
  636. guard let rawStatusCode,
  637. let intStatusCode = Int(rawStatusCode),
  638. let statusCode = Status.Code(rawValue: intStatusCode)
  639. else {
  640. let message =
  641. "Non-initial metadata must be a trailer containing a valid grpc-status"
  642. + (rawStatusCode.flatMap { "but was \($0)" } ?? "")
  643. throw RPCError(code: .unknown, message: message)
  644. }
  645. let statusMessage =
  646. metadata.firstString(forKey: .grpcStatusMessage)
  647. .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
  648. var convertedMetadata = Metadata(headers: metadata)
  649. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  650. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  651. return .receivedStatusAndMetadata(
  652. status: Status(code: statusCode, message: statusMessage),
  653. metadata: convertedMetadata
  654. )
  655. }
  656. private mutating func clientReceive(
  657. headers: HPACKHeaders,
  658. endStream: Bool,
  659. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  660. ) throws -> OnMetadataReceived {
  661. switch self.state {
  662. case .clientOpenServerIdle(let state):
  663. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  664. case (.invalid(let action), true):
  665. // The headers are invalid, but the server signalled that it was
  666. // closing the stream, so close both client and server.
  667. self.state = .clientClosedServerClosed(.init(previousState: state))
  668. return action
  669. case (.invalid(let action), false):
  670. self.state = .clientClosedServerIdle(.init(previousState: state))
  671. return action
  672. case (.valid, true):
  673. // This is a trailers-only response: close server.
  674. self.state = .clientOpenServerClosed(.init(previousState: state))
  675. return try self.validateAndReturnStatusAndMetadata(headers)
  676. case (.valid, false):
  677. switch self.processInboundEncoding(headers: headers, configuration: configuration) {
  678. case .error(let failure):
  679. return failure
  680. case .success(let inboundEncoding):
  681. let decompressor = Zlib.Method(encoding: inboundEncoding)
  682. .flatMap { Zlib.Decompressor(method: $0) }
  683. let deframer = GRPCMessageDeframer(
  684. maximumPayloadSize: state.maximumPayloadSize,
  685. decompressor: decompressor
  686. )
  687. self.state = .clientOpenServerOpen(
  688. .init(
  689. previousState: state,
  690. deframer: NIOSingleStepByteToMessageProcessor(deframer),
  691. decompressor: decompressor
  692. )
  693. )
  694. return .receivedMetadata(Metadata(headers: headers))
  695. }
  696. }
  697. case .clientOpenServerOpen(let state):
  698. // This state is valid even if endStream is not set: server can send
  699. // trailing metadata without END_STREAM set, and follow it with an
  700. // empty message frame where it is set.
  701. // However, we must make sure that grpc-status is set, otherwise this
  702. // is an invalid state.
  703. if endStream {
  704. self.state = .clientOpenServerClosed(.init(previousState: state))
  705. }
  706. return try self.validateAndReturnStatusAndMetadata(headers)
  707. case .clientClosedServerIdle(let state):
  708. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  709. case (.invalid(let action), true):
  710. // The headers are invalid, but the server signalled that it was
  711. // closing the stream, so close the server side too.
  712. self.state = .clientClosedServerClosed(.init(previousState: state))
  713. return action
  714. case (.invalid(let action), false):
  715. // Client is already closed, so we don't need to update our state.
  716. return action
  717. case (.valid, true):
  718. // This is a trailers-only response: close server.
  719. self.state = .clientClosedServerClosed(.init(previousState: state))
  720. return try self.validateAndReturnStatusAndMetadata(headers)
  721. case (.valid, false):
  722. switch self.processInboundEncoding(headers: headers, configuration: configuration) {
  723. case .error(let failure):
  724. return failure
  725. case .success(let inboundEncoding):
  726. self.state = .clientClosedServerOpen(
  727. .init(
  728. previousState: state,
  729. decompressionAlgorithm: inboundEncoding
  730. )
  731. )
  732. return .receivedMetadata(Metadata(headers: headers))
  733. }
  734. }
  735. case .clientClosedServerOpen(let state):
  736. // This state is valid even if endStream is not set: server can send
  737. // trailing metadata without END_STREAM set, and follow it with an
  738. // empty message frame where it is set.
  739. // However, we must make sure that grpc-status is set, otherwise this
  740. // is an invalid state.
  741. if endStream {
  742. self.state = .clientClosedServerClosed(.init(previousState: state))
  743. }
  744. return try self.validateAndReturnStatusAndMetadata(headers)
  745. case .clientClosedServerClosed:
  746. // We could end up here if we received a grpc-status header in a previous
  747. // frame (which would have already close the server) and then we receive
  748. // an empty frame with EOS set.
  749. // We wouldn't want to throw in that scenario, so we just ignore it.
  750. // Note that we don't want to ignore it if EOS is not set here though, as
  751. // then it would be an invalid payload.
  752. if !endStream || headers.count > 0 {
  753. try self.invalidState(
  754. "Server is closed, nothing could have been sent."
  755. )
  756. }
  757. return .doNothing
  758. case .clientIdleServerIdle:
  759. try self.invalidState(
  760. "Server cannot have sent metadata if the client is idle."
  761. )
  762. case .clientOpenServerClosed:
  763. try self.invalidState(
  764. "Server is closed, nothing could have been sent."
  765. )
  766. }
  767. }
  768. private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
  769. // This is a message received by the client, from the server.
  770. switch self.state {
  771. case .clientIdleServerIdle:
  772. try self.invalidState(
  773. "Cannot have received anything from server if client is not yet open."
  774. )
  775. case .clientOpenServerIdle, .clientClosedServerIdle:
  776. try self.invalidState(
  777. "Server cannot have sent a message before sending the initial metadata."
  778. )
  779. case .clientOpenServerOpen(var state):
  780. try state.deframer.process(buffer: buffer) { deframedMessage in
  781. state.inboundMessageBuffer.append(deframedMessage)
  782. }
  783. if endStream {
  784. self.state = .clientOpenServerClosed(.init(previousState: state))
  785. } else {
  786. self.state = .clientOpenServerOpen(state)
  787. }
  788. case .clientClosedServerOpen(var state):
  789. // The client may have sent the end stream and thus it's closed,
  790. // but the server may still be responding.
  791. // The client must have a deframer set up, so force-unwrap is okay.
  792. try state.deframer!.process(buffer: buffer) { deframedMessage in
  793. state.inboundMessageBuffer.append(deframedMessage)
  794. }
  795. if endStream {
  796. self.state = .clientClosedServerClosed(.init(previousState: state))
  797. } else {
  798. self.state = .clientClosedServerOpen(state)
  799. }
  800. case .clientOpenServerClosed, .clientClosedServerClosed:
  801. try self.invalidState(
  802. "Cannot have received anything from a closed server."
  803. )
  804. }
  805. }
  806. private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
  807. switch self.state {
  808. case .clientOpenServerOpen(var state):
  809. let message = state.inboundMessageBuffer.pop()
  810. self.state = .clientOpenServerOpen(state)
  811. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  812. case .clientOpenServerClosed(var state):
  813. let message = state.inboundMessageBuffer.pop()
  814. self.state = .clientOpenServerClosed(state)
  815. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  816. case .clientClosedServerOpen(var state):
  817. let message = state.inboundMessageBuffer.pop()
  818. self.state = .clientClosedServerOpen(state)
  819. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  820. case .clientClosedServerClosed(var state):
  821. let message = state.inboundMessageBuffer.pop()
  822. self.state = .clientClosedServerClosed(state)
  823. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  824. case .clientIdleServerIdle,
  825. .clientOpenServerIdle,
  826. .clientClosedServerIdle:
  827. return .awaitMoreMessages
  828. }
  829. }
  830. private func invalidState(_ message: String, line: UInt = #line) throws -> Never {
  831. if !self.skipAssertions {
  832. assertionFailure(message, line: line)
  833. }
  834. throw RPCError(code: .internalError, message: message)
  835. }
  836. }
  837. // - MARK: Server
  838. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  839. extension GRPCStreamStateMachine {
  840. private func makeResponseHeaders(
  841. outboundEncoding: CompressionAlgorithm?,
  842. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
  843. customMetadata: Metadata
  844. ) -> HPACKHeaders {
  845. // Response headers always contain :status (HTTP Status 200) and content-type.
  846. // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
  847. var headers = HPACKHeaders()
  848. headers.reserveCapacity(4 + customMetadata.count)
  849. headers.add("200", forKey: .status)
  850. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  851. if let outboundEncoding, outboundEncoding != .identity {
  852. headers.add(outboundEncoding.name, forKey: .encoding)
  853. }
  854. for acceptedEncoding in configuration.acceptedEncodings {
  855. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  856. }
  857. for metadataPair in customMetadata {
  858. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  859. }
  860. return headers
  861. }
  862. private mutating func serverSend(
  863. metadata: Metadata,
  864. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  865. ) throws -> HPACKHeaders {
  866. // Server sends initial metadata
  867. switch self.state {
  868. case .clientOpenServerIdle(let state):
  869. self.state = .clientOpenServerOpen(
  870. .init(
  871. previousState: state,
  872. // In the case of the server, it will already have a deframer set up,
  873. // because it already knows what encoding the client is using:
  874. // it's okay to force-unwrap.
  875. deframer: state.deframer!,
  876. decompressor: state.decompressor
  877. )
  878. )
  879. return self.makeResponseHeaders(
  880. outboundEncoding: state.outboundCompression,
  881. configuration: configuration,
  882. customMetadata: metadata
  883. )
  884. case .clientClosedServerIdle(let state):
  885. self.state = .clientClosedServerOpen(.init(previousState: state))
  886. return self.makeResponseHeaders(
  887. outboundEncoding: state.outboundCompression,
  888. configuration: configuration,
  889. customMetadata: metadata
  890. )
  891. case .clientIdleServerIdle:
  892. try self.invalidState(
  893. "Client cannot be idle if server is sending initial metadata: it must have opened."
  894. )
  895. case .clientOpenServerClosed, .clientClosedServerClosed:
  896. try self.invalidState(
  897. "Server cannot send metadata if closed."
  898. )
  899. case .clientOpenServerOpen, .clientClosedServerOpen:
  900. try self.invalidState(
  901. "Server has already sent initial metadata."
  902. )
  903. }
  904. }
  905. private mutating func serverSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  906. switch self.state {
  907. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  908. try self.invalidState(
  909. "Server must have sent initial metadata before sending a message."
  910. )
  911. case .clientOpenServerOpen(var state):
  912. state.framer.append(message, promise: promise)
  913. self.state = .clientOpenServerOpen(state)
  914. case .clientClosedServerOpen(var state):
  915. state.framer.append(message, promise: promise)
  916. self.state = .clientClosedServerOpen(state)
  917. case .clientOpenServerClosed, .clientClosedServerClosed:
  918. try self.invalidState(
  919. "Server can't send a message if it's closed."
  920. )
  921. }
  922. }
  923. private func makeTrailers(
  924. status: Status,
  925. customMetadata: Metadata?,
  926. trailersOnly: Bool
  927. ) -> HPACKHeaders {
  928. // Trailers always contain the grpc-status header, and optionally,
  929. // grpc-status-message, and custom metadata.
  930. // If it's a trailers-only response, they will also contain :status and
  931. // content-type.
  932. var headers = HPACKHeaders()
  933. let customMetadataCount = customMetadata?.count ?? 0
  934. if trailersOnly {
  935. // Reserve 4 for capacity: 3 for the required headers, and 1 for the
  936. // optional status message.
  937. headers.reserveCapacity(4 + customMetadataCount)
  938. headers.add("200", forKey: .status)
  939. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  940. } else {
  941. // Reserve 2 for capacity: one for the required grpc-status, and
  942. // one for the optional message.
  943. headers.reserveCapacity(2 + customMetadataCount)
  944. }
  945. headers.add(String(status.code.rawValue), forKey: .grpcStatus)
  946. if !status.message.isEmpty {
  947. if let percentEncodedMessage = GRPCStatusMessageMarshaller.marshall(status.message) {
  948. headers.add(percentEncodedMessage, forKey: .grpcStatusMessage)
  949. }
  950. }
  951. if let customMetadata {
  952. for metadataPair in customMetadata {
  953. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  954. }
  955. }
  956. return headers
  957. }
  958. private mutating func serverSend(
  959. status: Status,
  960. customMetadata: Metadata
  961. ) throws -> HPACKHeaders {
  962. // Close the server.
  963. switch self.state {
  964. case .clientOpenServerOpen(let state):
  965. self.state = .clientOpenServerClosed(.init(previousState: state))
  966. return self.makeTrailers(
  967. status: status,
  968. customMetadata: customMetadata,
  969. trailersOnly: false
  970. )
  971. case .clientClosedServerOpen(let state):
  972. self.state = .clientClosedServerClosed(.init(previousState: state))
  973. return self.makeTrailers(
  974. status: status,
  975. customMetadata: customMetadata,
  976. trailersOnly: false
  977. )
  978. case .clientOpenServerIdle(let state):
  979. self.state = .clientOpenServerClosed(.init(previousState: state))
  980. return self.makeTrailers(
  981. status: status,
  982. customMetadata: customMetadata,
  983. trailersOnly: true
  984. )
  985. case .clientClosedServerIdle(let state):
  986. self.state = .clientClosedServerClosed(.init(previousState: state))
  987. return self.makeTrailers(
  988. status: status,
  989. customMetadata: customMetadata,
  990. trailersOnly: true
  991. )
  992. case .clientIdleServerIdle:
  993. try self.invalidState(
  994. "Server can't send status if client is idle."
  995. )
  996. case .clientOpenServerClosed, .clientClosedServerClosed:
  997. try self.invalidState(
  998. "Server can't send anything if closed."
  999. )
  1000. }
  1001. }
  1002. mutating private func closeServerAndBuildRejectRPCAction(
  1003. currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
  1004. endStream: Bool,
  1005. rejectWithStatus status: Status
  1006. ) -> OnMetadataReceived {
  1007. if endStream {
  1008. self.state = .clientClosedServerClosed(.init(previousState: currentState))
  1009. } else {
  1010. self.state = .clientOpenServerClosed(.init(previousState: currentState))
  1011. }
  1012. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1013. return .rejectRPC(trailers: trailers)
  1014. }
  1015. private mutating func serverReceive(
  1016. headers: HPACKHeaders,
  1017. endStream: Bool,
  1018. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  1019. ) throws -> OnMetadataReceived {
  1020. switch self.state {
  1021. case .clientIdleServerIdle(let state):
  1022. let contentType = headers.firstString(forKey: .contentType)
  1023. .flatMap { ContentType(value: $0) }
  1024. if contentType == nil {
  1025. self.state = .clientOpenServerClosed(.init(previousState: state))
  1026. // Respond with HTTP-level Unsupported Media Type status code.
  1027. var trailers = HPACKHeaders()
  1028. trailers.add("415", forKey: .status)
  1029. return .rejectRPC(trailers: trailers)
  1030. }
  1031. let path = headers.firstString(forKey: .path)
  1032. .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
  1033. if path == nil {
  1034. return self.closeServerAndBuildRejectRPCAction(
  1035. currentState: state,
  1036. endStream: endStream,
  1037. rejectWithStatus: Status(
  1038. code: .unimplemented,
  1039. message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
  1040. )
  1041. )
  1042. }
  1043. let scheme = headers.firstString(forKey: .scheme)
  1044. .flatMap { Scheme(rawValue: $0) }
  1045. if scheme == nil {
  1046. return self.closeServerAndBuildRejectRPCAction(
  1047. currentState: state,
  1048. endStream: endStream,
  1049. rejectWithStatus: Status(
  1050. code: .invalidArgument,
  1051. message: ":scheme header must be present and one of \"http\" or \"https\"."
  1052. )
  1053. )
  1054. }
  1055. guard let method = headers.firstString(forKey: .method), method == "POST" else {
  1056. return self.closeServerAndBuildRejectRPCAction(
  1057. currentState: state,
  1058. endStream: endStream,
  1059. rejectWithStatus: Status(
  1060. code: .invalidArgument,
  1061. message: ":method header is expected to be present and have a value of \"POST\"."
  1062. )
  1063. )
  1064. }
  1065. guard let te = headers.firstString(forKey: .te), te == "trailers" else {
  1066. return self.closeServerAndBuildRejectRPCAction(
  1067. currentState: state,
  1068. endStream: endStream,
  1069. rejectWithStatus: Status(
  1070. code: .invalidArgument,
  1071. message: "\"te\" header is expected to be present and have a value of \"trailers\"."
  1072. )
  1073. )
  1074. }
  1075. func isIdentityOrCompatibleEncoding(_ clientEncoding: CompressionAlgorithm) -> Bool {
  1076. clientEncoding == .identity || configuration.acceptedEncodings.contains(clientEncoding)
  1077. }
  1078. // Firstly, find out if we support the client's chosen encoding, and reject
  1079. // the RPC if we don't.
  1080. let inboundEncoding: CompressionAlgorithm
  1081. let encodingValues = headers.values(
  1082. forHeader: GRPCHTTP2Keys.encoding.rawValue,
  1083. canonicalForm: true
  1084. )
  1085. var encodingValuesIterator = encodingValues.makeIterator()
  1086. if let rawEncoding = encodingValuesIterator.next() {
  1087. guard encodingValuesIterator.next() == nil else {
  1088. let status = Status(
  1089. code: .internalError,
  1090. message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
  1091. )
  1092. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1093. return .rejectRPC(trailers: trailers)
  1094. }
  1095. guard let clientEncoding = CompressionAlgorithm(rawValue: String(rawEncoding)),
  1096. isIdentityOrCompatibleEncoding(clientEncoding)
  1097. else {
  1098. let statusMessage: String
  1099. let customMetadata: Metadata?
  1100. if configuration.acceptedEncodings.isEmpty {
  1101. statusMessage = "Compression is not supported"
  1102. customMetadata = nil
  1103. } else {
  1104. statusMessage = """
  1105. \(rawEncoding) compression is not supported; \
  1106. supported algorithms are listed in grpc-accept-encoding
  1107. """
  1108. customMetadata = {
  1109. var trailers = Metadata()
  1110. trailers.reserveCapacity(configuration.acceptedEncodings.count)
  1111. for acceptedEncoding in configuration.acceptedEncodings {
  1112. trailers.addString(
  1113. acceptedEncoding.name,
  1114. forKey: GRPCHTTP2Keys.acceptEncoding.rawValue
  1115. )
  1116. }
  1117. return trailers
  1118. }()
  1119. }
  1120. let trailers = self.makeTrailers(
  1121. status: Status(code: .unimplemented, message: statusMessage),
  1122. customMetadata: customMetadata,
  1123. trailersOnly: true
  1124. )
  1125. return .rejectRPC(trailers: trailers)
  1126. }
  1127. // Server supports client's encoding.
  1128. inboundEncoding = clientEncoding
  1129. } else {
  1130. inboundEncoding = .identity
  1131. }
  1132. // Secondly, find a compatible encoding the server can use to compress outbound messages,
  1133. // based on the encodings the client has advertised.
  1134. var outboundEncoding: CompressionAlgorithm = .identity
  1135. let clientAdvertisedEncodings = headers.values(
  1136. forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue,
  1137. canonicalForm: true
  1138. )
  1139. // Find the preferred encoding and use it to compress responses.
  1140. // If it's identity, just skip it altogether, since we won't be
  1141. // compressing.
  1142. for clientAdvertisedEncoding in clientAdvertisedEncodings {
  1143. if let algorithm = CompressionAlgorithm(rawValue: String(clientAdvertisedEncoding)),
  1144. isIdentityOrCompatibleEncoding(algorithm)
  1145. {
  1146. outboundEncoding = algorithm
  1147. break
  1148. }
  1149. }
  1150. if endStream {
  1151. self.state = .clientClosedServerIdle(
  1152. .init(
  1153. previousState: state,
  1154. compressionAlgorithm: outboundEncoding
  1155. )
  1156. )
  1157. } else {
  1158. let compressor = Zlib.Method(encoding: outboundEncoding)
  1159. .flatMap { Zlib.Compressor(method: $0) }
  1160. let decompressor = Zlib.Method(encoding: inboundEncoding)
  1161. .flatMap { Zlib.Decompressor(method: $0) }
  1162. let deframer = GRPCMessageDeframer(
  1163. maximumPayloadSize: state.maximumPayloadSize,
  1164. decompressor: decompressor
  1165. )
  1166. self.state = .clientOpenServerIdle(
  1167. .init(
  1168. previousState: state,
  1169. compressor: compressor,
  1170. framer: GRPCMessageFramer(),
  1171. decompressor: decompressor,
  1172. deframer: NIOSingleStepByteToMessageProcessor(deframer)
  1173. )
  1174. )
  1175. }
  1176. return .receivedMetadata(Metadata(headers: headers))
  1177. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  1178. try self.invalidState(
  1179. "Client shouldn't have sent metadata twice."
  1180. )
  1181. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1182. try self.invalidState(
  1183. "Client can't have sent metadata if closed."
  1184. )
  1185. }
  1186. }
  1187. private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
  1188. switch self.state {
  1189. case .clientIdleServerIdle:
  1190. try self.invalidState(
  1191. "Can't have received a message if client is idle."
  1192. )
  1193. case .clientOpenServerIdle(var state):
  1194. // Deframer must be present on the server side, as we know the decompression
  1195. // algorithm from the moment the client opens.
  1196. try state.deframer!.process(buffer: buffer) { deframedMessage in
  1197. state.inboundMessageBuffer.append(deframedMessage)
  1198. }
  1199. if endStream {
  1200. self.state = .clientClosedServerIdle(.init(previousState: state))
  1201. } else {
  1202. self.state = .clientOpenServerIdle(state)
  1203. }
  1204. case .clientOpenServerOpen(var state):
  1205. try state.deframer.process(buffer: buffer) { deframedMessage in
  1206. state.inboundMessageBuffer.append(deframedMessage)
  1207. }
  1208. if endStream {
  1209. self.state = .clientClosedServerOpen(.init(previousState: state))
  1210. } else {
  1211. self.state = .clientOpenServerOpen(state)
  1212. }
  1213. case .clientOpenServerClosed(let state):
  1214. // Client is not done sending request, but server has already closed.
  1215. // Ignore the rest of the request: do nothing, unless endStream is set,
  1216. // in which case close the client.
  1217. if endStream {
  1218. self.state = .clientClosedServerClosed(.init(previousState: state))
  1219. }
  1220. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1221. try self.invalidState(
  1222. "Client can't send a message if closed."
  1223. )
  1224. }
  1225. }
  1226. private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
  1227. switch self.state {
  1228. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  1229. try self.invalidState("Server is not open yet.")
  1230. case .clientOpenServerOpen(var state):
  1231. let response = try state.framer.next(compressor: state.compressor)
  1232. self.state = .clientOpenServerOpen(state)
  1233. return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  1234. ?? .awaitMoreMessages
  1235. case .clientClosedServerOpen(var state):
  1236. let response = try state.framer.next(compressor: state.compressor)
  1237. self.state = .clientClosedServerOpen(state)
  1238. return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  1239. ?? .awaitMoreMessages
  1240. case .clientOpenServerClosed(var state):
  1241. let response = try state.framer?.next(compressor: state.compressor)
  1242. self.state = .clientOpenServerClosed(state)
  1243. if let response {
  1244. return .sendFrame(frame: response.bytes, promise: response.promise)
  1245. } else {
  1246. return .noMoreMessages
  1247. }
  1248. case .clientClosedServerClosed(var state):
  1249. let response = try state.framer?.next(compressor: state.compressor)
  1250. self.state = .clientClosedServerClosed(state)
  1251. if let response {
  1252. return .sendFrame(frame: response.bytes, promise: response.promise)
  1253. } else {
  1254. return .noMoreMessages
  1255. }
  1256. }
  1257. }
  1258. private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
  1259. switch self.state {
  1260. case .clientOpenServerIdle(var state):
  1261. let request = state.inboundMessageBuffer.pop()
  1262. self.state = .clientOpenServerIdle(state)
  1263. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1264. case .clientOpenServerOpen(var state):
  1265. let request = state.inboundMessageBuffer.pop()
  1266. self.state = .clientOpenServerOpen(state)
  1267. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1268. case .clientOpenServerClosed(var state):
  1269. let request = state.inboundMessageBuffer.pop()
  1270. self.state = .clientOpenServerClosed(state)
  1271. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1272. case .clientClosedServerOpen(var state):
  1273. let request = state.inboundMessageBuffer.pop()
  1274. self.state = .clientClosedServerOpen(state)
  1275. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1276. case .clientClosedServerClosed(var state):
  1277. let request = state.inboundMessageBuffer.pop()
  1278. self.state = .clientClosedServerClosed(state)
  1279. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1280. case .clientClosedServerIdle:
  1281. return .noMoreMessages
  1282. case .clientIdleServerIdle:
  1283. return .awaitMoreMessages
  1284. }
  1285. }
  1286. }
  1287. extension MethodDescriptor {
  1288. init?(fullyQualifiedMethod: String) {
  1289. let split = fullyQualifiedMethod.split(separator: "/")
  1290. guard split.count == 2 else {
  1291. return nil
  1292. }
  1293. self.init(service: String(split[0]), method: String(split[1]))
  1294. }
  1295. }
  1296. internal enum GRPCHTTP2Keys: String {
  1297. case path = ":path"
  1298. case contentType = "content-type"
  1299. case encoding = "grpc-encoding"
  1300. case acceptEncoding = "grpc-accept-encoding"
  1301. case scheme = ":scheme"
  1302. case method = ":method"
  1303. case te = "te"
  1304. case status = ":status"
  1305. case grpcStatus = "grpc-status"
  1306. case grpcStatusMessage = "grpc-status-message"
  1307. }
  1308. extension HPACKHeaders {
  1309. internal func firstString(forKey key: GRPCHTTP2Keys) -> String? {
  1310. self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map {
  1311. String($0)
  1312. }
  1313. }
  1314. internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
  1315. self.add(name: key.rawValue, value: value)
  1316. }
  1317. }
  1318. extension Zlib.Method {
  1319. init?(encoding: CompressionAlgorithm) {
  1320. switch encoding {
  1321. case .identity:
  1322. return nil
  1323. case .deflate:
  1324. self = .deflate
  1325. case .gzip:
  1326. self = .gzip
  1327. default:
  1328. return nil
  1329. }
  1330. }
  1331. }
  1332. extension Metadata {
  1333. init(headers: HPACKHeaders) {
  1334. var metadata = Metadata()
  1335. metadata.reserveCapacity(headers.count)
  1336. for header in headers {
  1337. if header.name.hasSuffix("-bin") {
  1338. do {
  1339. let decodedBinary = try header.value.base64Decoded()
  1340. metadata.addBinary(decodedBinary, forKey: header.name)
  1341. } catch {
  1342. metadata.addString(header.value, forKey: header.name)
  1343. }
  1344. } else {
  1345. metadata.addString(header.value, forKey: header.name)
  1346. }
  1347. }
  1348. self = metadata
  1349. }
  1350. }
  1351. extension Status.Code {
  1352. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  1353. init(httpStatusCode: HTTPResponseStatus) {
  1354. switch httpStatusCode {
  1355. case .badRequest:
  1356. self = .internalError
  1357. case .unauthorized:
  1358. self = .unauthenticated
  1359. case .forbidden:
  1360. self = .permissionDenied
  1361. case .notFound:
  1362. self = .unimplemented
  1363. case .tooManyRequests, .badGateway, .serviceUnavailable, .gatewayTimeout:
  1364. self = .unavailable
  1365. default:
  1366. self = .unknown
  1367. }
  1368. }
  1369. }