GRPCStreamStateMachine.swift 56 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. enum GRPCStreamStateMachineConfiguration {
  21. case client(ClientConfiguration)
  22. case server(ServerConfiguration)
  23. enum Scheme: String {
  24. case http
  25. case https
  26. }
  27. struct ClientConfiguration {
  28. var methodDescriptor: MethodDescriptor
  29. var scheme: Scheme
  30. var outboundEncoding: CompressionAlgorithm
  31. var acceptedEncodings: CompressionAlgorithmSet
  32. init(
  33. methodDescriptor: MethodDescriptor,
  34. scheme: Scheme,
  35. outboundEncoding: CompressionAlgorithm,
  36. acceptedEncodings: CompressionAlgorithmSet
  37. ) {
  38. self.methodDescriptor = methodDescriptor
  39. self.scheme = scheme
  40. self.outboundEncoding = outboundEncoding
  41. self.acceptedEncodings = acceptedEncodings.union(.none)
  42. }
  43. }
  44. struct ServerConfiguration {
  45. var scheme: Scheme
  46. var acceptedEncodings: CompressionAlgorithmSet
  47. init(scheme: Scheme, acceptedEncodings: CompressionAlgorithmSet) {
  48. self.scheme = scheme
  49. self.acceptedEncodings = acceptedEncodings.union(.none)
  50. }
  51. }
  52. }
  53. private enum GRPCStreamStateMachineState {
  54. case clientIdleServerIdle(ClientIdleServerIdleState)
  55. case clientOpenServerIdle(ClientOpenServerIdleState)
  56. case clientOpenServerOpen(ClientOpenServerOpenState)
  57. case clientOpenServerClosed(ClientOpenServerClosedState)
  58. case clientClosedServerIdle(ClientClosedServerIdleState)
  59. case clientClosedServerOpen(ClientClosedServerOpenState)
  60. case clientClosedServerClosed(ClientClosedServerClosedState)
  61. struct ClientIdleServerIdleState {
  62. let maximumPayloadSize: Int
  63. }
  64. struct ClientOpenServerIdleState {
  65. let maximumPayloadSize: Int
  66. var framer: GRPCMessageFramer
  67. var compressor: Zlib.Compressor?
  68. var outboundCompression: CompressionAlgorithm
  69. // The deframer must be optional because the client will not have one configured
  70. // until the server opens and sends a grpc-encoding header.
  71. // It will be present for the server though, because even though it's idle,
  72. // it can still receive compressed messages from the client.
  73. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  74. var decompressor: Zlib.Decompressor?
  75. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  76. init(
  77. previousState: ClientIdleServerIdleState,
  78. compressor: Zlib.Compressor?,
  79. outboundCompression: CompressionAlgorithm,
  80. framer: GRPCMessageFramer,
  81. decompressor: Zlib.Decompressor?,
  82. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  83. ) {
  84. self.maximumPayloadSize = previousState.maximumPayloadSize
  85. self.compressor = compressor
  86. self.outboundCompression = outboundCompression
  87. self.framer = framer
  88. self.decompressor = decompressor
  89. self.deframer = deframer
  90. self.inboundMessageBuffer = .init()
  91. }
  92. }
  93. struct ClientOpenServerOpenState {
  94. var framer: GRPCMessageFramer
  95. var compressor: Zlib.Compressor?
  96. var outboundCompression: CompressionAlgorithm
  97. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>
  98. var decompressor: Zlib.Decompressor?
  99. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  100. init(
  101. previousState: ClientOpenServerIdleState,
  102. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
  103. decompressor: Zlib.Decompressor?
  104. ) {
  105. self.framer = previousState.framer
  106. self.compressor = previousState.compressor
  107. self.outboundCompression = previousState.outboundCompression
  108. self.deframer = deframer
  109. self.decompressor = decompressor
  110. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  111. }
  112. }
  113. struct ClientOpenServerClosedState {
  114. var framer: GRPCMessageFramer?
  115. var compressor: Zlib.Compressor?
  116. var outboundCompression: CompressionAlgorithm
  117. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  118. var decompressor: Zlib.Decompressor?
  119. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  120. // This transition should only happen on the server-side when, upon receiving
  121. // initial client metadata, some of the headers are invalid and we must reject
  122. // the RPC.
  123. // We will mark the client as open (because it sent initial metadata albeit
  124. // invalid) but we'll close the server, meaning all future messages sent from
  125. // the client will be ignored. Because of this, we won't need to frame or
  126. // deframe any messages, as we won't be reading or writing any messages.
  127. init(previousState: ClientIdleServerIdleState) {
  128. self.framer = nil
  129. self.compressor = nil
  130. self.outboundCompression = .none
  131. self.deframer = nil
  132. self.decompressor = nil
  133. self.inboundMessageBuffer = .init()
  134. }
  135. init(previousState: ClientOpenServerOpenState) {
  136. self.framer = previousState.framer
  137. self.compressor = previousState.compressor
  138. self.outboundCompression = previousState.outboundCompression
  139. self.deframer = previousState.deframer
  140. self.decompressor = previousState.decompressor
  141. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  142. }
  143. init(previousState: ClientOpenServerIdleState) {
  144. self.framer = previousState.framer
  145. self.compressor = previousState.compressor
  146. self.outboundCompression = previousState.outboundCompression
  147. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  148. // The server went directly from idle to closed - this means it sent a
  149. // trailers-only response:
  150. // - if we're the client, the previous state was a nil deframer, but that
  151. // is okay because we don't need a deframer as the server won't be sending
  152. // any messages;
  153. // - if we're the server, we'll keep whatever deframer we had.
  154. self.deframer = previousState.deframer
  155. self.decompressor = previousState.decompressor
  156. }
  157. }
  158. struct ClientClosedServerIdleState {
  159. let maximumPayloadSize: Int
  160. var framer: GRPCMessageFramer
  161. var compressor: Zlib.Compressor?
  162. var outboundCompression: CompressionAlgorithm
  163. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  164. var decompressor: Zlib.Decompressor?
  165. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  166. /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
  167. /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
  168. /// any more messages from the client anyways, as it's closed.
  169. init(
  170. previousState: ClientIdleServerIdleState,
  171. compressionAlgorithm: CompressionAlgorithm
  172. ) {
  173. self.maximumPayloadSize = previousState.maximumPayloadSize
  174. if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) {
  175. self.compressor = Zlib.Compressor(method: zlibMethod)
  176. self.outboundCompression = compressionAlgorithm
  177. } else {
  178. self.compressor = nil
  179. self.outboundCompression = .none
  180. }
  181. self.framer = GRPCMessageFramer()
  182. // We don't need a deframer since we won't receive any messages from the
  183. // client: it's closed.
  184. self.deframer = nil
  185. self.inboundMessageBuffer = .init()
  186. }
  187. init(previousState: ClientOpenServerIdleState) {
  188. self.maximumPayloadSize = previousState.maximumPayloadSize
  189. self.framer = previousState.framer
  190. self.compressor = previousState.compressor
  191. self.outboundCompression = previousState.outboundCompression
  192. self.deframer = previousState.deframer
  193. self.decompressor = previousState.decompressor
  194. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  195. }
  196. }
  197. struct ClientClosedServerOpenState {
  198. var framer: GRPCMessageFramer
  199. var compressor: Zlib.Compressor?
  200. var outboundCompression: CompressionAlgorithm
  201. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  202. var decompressor: Zlib.Decompressor?
  203. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  204. init(previousState: ClientOpenServerOpenState) {
  205. self.framer = previousState.framer
  206. self.compressor = previousState.compressor
  207. self.outboundCompression = previousState.outboundCompression
  208. self.deframer = previousState.deframer
  209. self.decompressor = previousState.decompressor
  210. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  211. }
  212. /// This should be called from the server path, as the deframer will already be configured in this scenario.
  213. init(previousState: ClientClosedServerIdleState) {
  214. self.framer = previousState.framer
  215. self.compressor = previousState.compressor
  216. self.outboundCompression = previousState.outboundCompression
  217. // In the case of the server, we don't need to deframe/decompress any more
  218. // messages, since the client's closed.
  219. self.deframer = nil
  220. self.decompressor = nil
  221. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  222. }
  223. /// This should only be called from the client path, as the deframer has not yet been set up.
  224. init(
  225. previousState: ClientClosedServerIdleState,
  226. decompressionAlgorithm: CompressionAlgorithm
  227. ) {
  228. self.framer = previousState.framer
  229. self.compressor = previousState.compressor
  230. self.outboundCompression = previousState.outboundCompression
  231. // In the case of the client, it will only be able to set up the deframer
  232. // after it receives the chosen encoding from the server.
  233. if let zlibMethod = Zlib.Method(encoding: decompressionAlgorithm) {
  234. self.decompressor = Zlib.Decompressor(method: zlibMethod)
  235. }
  236. let decoder = GRPCMessageDeframer(
  237. maximumPayloadSize: previousState.maximumPayloadSize,
  238. decompressor: self.decompressor
  239. )
  240. self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
  241. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  242. }
  243. }
  244. struct ClientClosedServerClosedState {
  245. // We still need the framer and compressor in case the server has closed
  246. // but its buffer is not yet empty and still needs to send messages out to
  247. // the client.
  248. var framer: GRPCMessageFramer?
  249. var compressor: Zlib.Compressor?
  250. var outboundCompression: CompressionAlgorithm
  251. // These are already deframed, so we don't need the deframer anymore.
  252. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  253. // This transition should only happen on the server-side when, upon receiving
  254. // initial client metadata, some of the headers are invalid and we must reject
  255. // the RPC.
  256. // We will mark the client as closed (because it set the EOS flag, even if
  257. // the initial metadata was invalid) and we'll close the server too.
  258. // Because of this, we won't need to frame any messages, as we
  259. // won't be writing any messages.
  260. init(previousState: ClientIdleServerIdleState) {
  261. self.framer = nil
  262. self.compressor = nil
  263. self.outboundCompression = .none
  264. self.inboundMessageBuffer = .init()
  265. }
  266. init(previousState: ClientClosedServerOpenState) {
  267. self.framer = previousState.framer
  268. self.compressor = previousState.compressor
  269. self.outboundCompression = previousState.outboundCompression
  270. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  271. }
  272. init(previousState: ClientClosedServerIdleState) {
  273. self.framer = previousState.framer
  274. self.compressor = previousState.compressor
  275. self.outboundCompression = previousState.outboundCompression
  276. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  277. }
  278. init(previousState: ClientOpenServerIdleState) {
  279. self.framer = previousState.framer
  280. self.compressor = previousState.compressor
  281. self.outboundCompression = previousState.outboundCompression
  282. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  283. }
  284. init(previousState: ClientOpenServerOpenState) {
  285. self.framer = previousState.framer
  286. self.compressor = previousState.compressor
  287. self.outboundCompression = previousState.outboundCompression
  288. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  289. }
  290. init(previousState: ClientOpenServerClosedState) {
  291. self.framer = previousState.framer
  292. self.compressor = previousState.compressor
  293. self.outboundCompression = previousState.outboundCompression
  294. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  295. }
  296. }
  297. }
  298. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  299. struct GRPCStreamStateMachine {
  300. private var state: GRPCStreamStateMachineState
  301. private var configuration: GRPCStreamStateMachineConfiguration
  302. private var skipAssertions: Bool
  303. init(
  304. configuration: GRPCStreamStateMachineConfiguration,
  305. maximumPayloadSize: Int,
  306. skipAssertions: Bool = false
  307. ) {
  308. self.state = .clientIdleServerIdle(.init(maximumPayloadSize: maximumPayloadSize))
  309. self.configuration = configuration
  310. self.skipAssertions = skipAssertions
  311. }
  312. mutating func send(metadata: Metadata) throws -> HPACKHeaders {
  313. switch self.configuration {
  314. case .client(let clientConfiguration):
  315. return try self.clientSend(metadata: metadata, configuration: clientConfiguration)
  316. case .server(let serverConfiguration):
  317. return try self.serverSend(metadata: metadata, configuration: serverConfiguration)
  318. }
  319. }
  320. mutating func send(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  321. switch self.configuration {
  322. case .client:
  323. try self.clientSend(message: message, promise: promise)
  324. case .server:
  325. try self.serverSend(message: message, promise: promise)
  326. }
  327. }
  328. mutating func closeOutbound() throws {
  329. switch self.configuration {
  330. case .client:
  331. try self.clientCloseOutbound()
  332. case .server:
  333. try self.invalidState("Server cannot call close: it must send status and trailers.")
  334. }
  335. }
  336. mutating func send(
  337. status: Status,
  338. metadata: Metadata
  339. ) throws -> HPACKHeaders {
  340. switch self.configuration {
  341. case .client:
  342. try self.invalidState(
  343. "Client cannot send status and trailer."
  344. )
  345. case .server:
  346. return try self.serverSend(
  347. status: status,
  348. customMetadata: metadata
  349. )
  350. }
  351. }
  352. enum OnMetadataReceived: Equatable {
  353. case receivedMetadata(Metadata, MethodDescriptor?)
  354. // Client-specific actions
  355. case receivedStatusAndMetadata(status: Status, metadata: Metadata)
  356. case doNothing
  357. // Server-specific actions
  358. case rejectRPC(trailers: HPACKHeaders)
  359. case protocolViolation
  360. }
  361. mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
  362. switch self.configuration {
  363. case .client(let clientConfiguration):
  364. return try self.clientReceive(
  365. headers: headers,
  366. endStream: endStream,
  367. configuration: clientConfiguration
  368. )
  369. case .server(let serverConfiguration):
  370. return try self.serverReceive(
  371. headers: headers,
  372. endStream: endStream,
  373. configuration: serverConfiguration
  374. )
  375. }
  376. }
  377. enum OnBufferReceivedAction: Equatable {
  378. case readInbound
  379. case doNothing
  380. // Client-specific actions
  381. // This will be returned when the server sends a data frame with EOS set.
  382. // This is invalid as per the protocol specification, because the server
  383. // can only close by sending trailers, not by setting EOS when sending
  384. // a message.
  385. case endRPCAndForwardErrorStatus(Status)
  386. }
  387. mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction {
  388. switch self.configuration {
  389. case .client:
  390. return try self.clientReceive(buffer: buffer, endStream: endStream)
  391. case .server:
  392. return try self.serverReceive(buffer: buffer, endStream: endStream)
  393. }
  394. }
  395. /// The result of requesting the next outbound frame, which may contain multiple messages.
  396. enum OnNextOutboundFrame {
  397. /// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done
  398. /// writing messages (i.e. we are now closed).
  399. case noMoreMessages
  400. /// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying.
  401. case awaitMoreMessages
  402. /// A frame is ready to be sent.
  403. case sendFrame(
  404. frame: ByteBuffer,
  405. promise: EventLoopPromise<Void>?
  406. )
  407. }
  408. mutating func nextOutboundFrame() throws -> OnNextOutboundFrame {
  409. switch self.configuration {
  410. case .client:
  411. return try self.clientNextOutboundFrame()
  412. case .server:
  413. return try self.serverNextOutboundFrame()
  414. }
  415. }
  416. /// The result of requesting the next inbound message.
  417. enum OnNextInboundMessage: Equatable {
  418. /// The sender is done writing messages and there are no more messages to be received.
  419. case noMoreMessages
  420. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  421. case awaitMoreMessages
  422. /// A message has been received.
  423. case receiveMessage([UInt8])
  424. }
  425. mutating func nextInboundMessage() -> OnNextInboundMessage {
  426. switch self.configuration {
  427. case .client:
  428. return self.clientNextInboundMessage()
  429. case .server:
  430. return self.serverNextInboundMessage()
  431. }
  432. }
  433. mutating func tearDown() {
  434. switch self.state {
  435. case .clientIdleServerIdle:
  436. ()
  437. case .clientOpenServerIdle(let state):
  438. state.compressor?.end()
  439. state.decompressor?.end()
  440. case .clientOpenServerOpen(let state):
  441. state.compressor?.end()
  442. state.decompressor?.end()
  443. case .clientOpenServerClosed(let state):
  444. state.compressor?.end()
  445. state.decompressor?.end()
  446. case .clientClosedServerIdle(let state):
  447. state.compressor?.end()
  448. state.decompressor?.end()
  449. case .clientClosedServerOpen(let state):
  450. state.compressor?.end()
  451. state.decompressor?.end()
  452. case .clientClosedServerClosed(let state):
  453. state.compressor?.end()
  454. }
  455. }
  456. }
  457. // - MARK: Client
  458. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  459. extension GRPCStreamStateMachine {
  460. private func makeClientHeaders(
  461. methodDescriptor: MethodDescriptor,
  462. scheme: GRPCStreamStateMachineConfiguration.Scheme,
  463. outboundEncoding: CompressionAlgorithm?,
  464. acceptedEncodings: CompressionAlgorithmSet,
  465. customMetadata: Metadata
  466. ) -> HPACKHeaders {
  467. var headers = HPACKHeaders()
  468. headers.reserveCapacity(7 + customMetadata.count)
  469. // Add required headers.
  470. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  471. // The order is important here: reserved HTTP2 headers (those starting with `:`)
  472. // must come before all other headers.
  473. headers.add("POST", forKey: .method)
  474. headers.add(scheme.rawValue, forKey: .scheme)
  475. headers.add(methodDescriptor.path, forKey: .path)
  476. // Add required gRPC headers.
  477. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  478. headers.add("trailers", forKey: .te) // Used to detect incompatible proxies
  479. if let encoding = outboundEncoding, encoding != .none {
  480. headers.add(encoding.name, forKey: .encoding)
  481. }
  482. for encoding in acceptedEncodings.elements.filter({ $0 != .none }) {
  483. headers.add(encoding.name, forKey: .acceptEncoding)
  484. }
  485. for metadataPair in customMetadata {
  486. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  487. }
  488. return headers
  489. }
  490. private mutating func clientSend(
  491. metadata: Metadata,
  492. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  493. ) throws -> HPACKHeaders {
  494. // Client sends metadata only when opening the stream.
  495. switch self.state {
  496. case .clientIdleServerIdle(let state):
  497. let outboundEncoding = configuration.outboundEncoding
  498. let compressor = Zlib.Method(encoding: outboundEncoding)
  499. .flatMap { Zlib.Compressor(method: $0) }
  500. self.state = .clientOpenServerIdle(
  501. .init(
  502. previousState: state,
  503. compressor: compressor,
  504. outboundCompression: outboundEncoding,
  505. framer: GRPCMessageFramer(),
  506. decompressor: nil,
  507. deframer: nil
  508. )
  509. )
  510. return self.makeClientHeaders(
  511. methodDescriptor: configuration.methodDescriptor,
  512. scheme: configuration.scheme,
  513. outboundEncoding: configuration.outboundEncoding,
  514. acceptedEncodings: configuration.acceptedEncodings,
  515. customMetadata: metadata
  516. )
  517. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  518. try self.invalidState(
  519. "Client is already open: shouldn't be sending metadata."
  520. )
  521. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  522. try self.invalidState(
  523. "Client is closed: can't send metadata."
  524. )
  525. }
  526. }
  527. private mutating func clientSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  528. switch self.state {
  529. case .clientIdleServerIdle:
  530. try self.invalidState("Client not yet open.")
  531. case .clientOpenServerIdle(var state):
  532. state.framer.append(message, promise: promise)
  533. self.state = .clientOpenServerIdle(state)
  534. case .clientOpenServerOpen(var state):
  535. state.framer.append(message, promise: promise)
  536. self.state = .clientOpenServerOpen(state)
  537. case .clientOpenServerClosed:
  538. // The server has closed, so it makes no sense to send the rest of the request.
  539. ()
  540. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  541. try self.invalidState(
  542. "Client is closed, cannot send a message."
  543. )
  544. }
  545. }
  546. private mutating func clientCloseOutbound() throws {
  547. switch self.state {
  548. case .clientIdleServerIdle:
  549. try self.invalidState("Client not yet open.")
  550. case .clientOpenServerIdle(let state):
  551. self.state = .clientClosedServerIdle(.init(previousState: state))
  552. case .clientOpenServerOpen(let state):
  553. self.state = .clientClosedServerOpen(.init(previousState: state))
  554. case .clientOpenServerClosed(let state):
  555. self.state = .clientClosedServerClosed(.init(previousState: state))
  556. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  557. // Client is already closed - nothing to do.
  558. ()
  559. }
  560. }
  561. /// Returns the client's next request to the server.
  562. /// - Returns: The request to be made to the server.
  563. private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame {
  564. switch self.state {
  565. case .clientIdleServerIdle:
  566. try self.invalidState("Client is not open yet.")
  567. case .clientOpenServerIdle(var state):
  568. let request = try state.framer.next(compressor: state.compressor)
  569. self.state = .clientOpenServerIdle(state)
  570. return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  571. ?? .awaitMoreMessages
  572. case .clientOpenServerOpen(var state):
  573. let request = try state.framer.next(compressor: state.compressor)
  574. self.state = .clientOpenServerOpen(state)
  575. return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  576. ?? .awaitMoreMessages
  577. case .clientClosedServerIdle(var state):
  578. let request = try state.framer.next(compressor: state.compressor)
  579. self.state = .clientClosedServerIdle(state)
  580. if let request {
  581. return .sendFrame(frame: request.bytes, promise: request.promise)
  582. } else {
  583. return .noMoreMessages
  584. }
  585. case .clientClosedServerOpen(var state):
  586. let request = try state.framer.next(compressor: state.compressor)
  587. self.state = .clientClosedServerOpen(state)
  588. if let request {
  589. return .sendFrame(frame: request.bytes, promise: request.promise)
  590. } else {
  591. return .noMoreMessages
  592. }
  593. case .clientOpenServerClosed, .clientClosedServerClosed:
  594. // No point in sending any more requests if the server is closed.
  595. return .noMoreMessages
  596. }
  597. }
  598. private enum ServerHeadersValidationResult {
  599. case valid
  600. case invalid(OnMetadataReceived)
  601. }
  602. private mutating func clientValidateHeadersReceivedFromServer(
  603. _ metadata: HPACKHeaders
  604. ) -> ServerHeadersValidationResult {
  605. var httpStatus: String? {
  606. metadata.firstString(forKey: .status)
  607. }
  608. var grpcStatus: Status.Code? {
  609. metadata.firstString(forKey: .grpcStatus)
  610. .flatMap { Int($0) }
  611. .flatMap { Status.Code(rawValue: $0) }
  612. }
  613. guard httpStatus == "200" || grpcStatus != nil else {
  614. let httpStatusCode =
  615. httpStatus
  616. .flatMap { Int($0) }
  617. .map { HTTPResponseStatus(statusCode: $0) }
  618. guard let httpStatusCode else {
  619. return .invalid(
  620. .receivedStatusAndMetadata(
  621. status: .init(code: .unknown, message: "HTTP Status Code is missing."),
  622. metadata: Metadata(headers: metadata)
  623. )
  624. )
  625. }
  626. if (100 ... 199).contains(httpStatusCode.code) {
  627. // For 1xx status codes, the entire header should be skipped and a
  628. // subsequent header should be read.
  629. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  630. return .invalid(.doNothing)
  631. }
  632. // Forward the mapped status code.
  633. return .invalid(
  634. .receivedStatusAndMetadata(
  635. status: .init(
  636. code: Status.Code(httpStatusCode: httpStatusCode),
  637. message: "Unexpected non-200 HTTP Status Code."
  638. ),
  639. metadata: Metadata(headers: metadata)
  640. )
  641. )
  642. }
  643. let contentTypeHeader = metadata.first(name: GRPCHTTP2Keys.contentType.rawValue)
  644. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  645. return .invalid(
  646. .receivedStatusAndMetadata(
  647. status: .init(
  648. code: .internalError,
  649. message: "Missing \(GRPCHTTP2Keys.contentType.rawValue) header"
  650. ),
  651. metadata: Metadata(headers: metadata)
  652. )
  653. )
  654. }
  655. return .valid
  656. }
  657. private enum ProcessInboundEncodingResult {
  658. case error(OnMetadataReceived)
  659. case success(CompressionAlgorithm)
  660. }
  661. private func processInboundEncoding(
  662. headers: HPACKHeaders,
  663. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  664. ) -> ProcessInboundEncodingResult {
  665. let inboundEncoding: CompressionAlgorithm
  666. if let serverEncoding = headers.first(name: GRPCHTTP2Keys.encoding.rawValue) {
  667. guard let parsedEncoding = CompressionAlgorithm(name: serverEncoding),
  668. configuration.acceptedEncodings.contains(parsedEncoding)
  669. else {
  670. return .error(
  671. .receivedStatusAndMetadata(
  672. status: .init(
  673. code: .internalError,
  674. message:
  675. "The server picked a compression algorithm ('\(serverEncoding)') the client does not know about."
  676. ),
  677. metadata: Metadata(headers: headers)
  678. )
  679. )
  680. }
  681. inboundEncoding = parsedEncoding
  682. } else {
  683. inboundEncoding = .none
  684. }
  685. return .success(inboundEncoding)
  686. }
  687. private func validateAndReturnStatusAndMetadata(
  688. _ metadata: HPACKHeaders
  689. ) throws -> OnMetadataReceived {
  690. let rawStatusCode = metadata.firstString(forKey: .grpcStatus)
  691. guard let rawStatusCode,
  692. let intStatusCode = Int(rawStatusCode),
  693. let statusCode = Status.Code(rawValue: intStatusCode)
  694. else {
  695. let message =
  696. "Non-initial metadata must be a trailer containing a valid grpc-status"
  697. + (rawStatusCode.flatMap { "but was \($0)" } ?? "")
  698. throw RPCError(code: .unknown, message: message)
  699. }
  700. let statusMessage =
  701. metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false)
  702. .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
  703. var convertedMetadata = Metadata(headers: metadata)
  704. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  705. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  706. return .receivedStatusAndMetadata(
  707. status: Status(code: statusCode, message: statusMessage),
  708. metadata: convertedMetadata
  709. )
  710. }
  711. private mutating func clientReceive(
  712. headers: HPACKHeaders,
  713. endStream: Bool,
  714. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  715. ) throws -> OnMetadataReceived {
  716. switch self.state {
  717. case .clientOpenServerIdle(let state):
  718. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  719. case (.invalid(let action), true):
  720. // The headers are invalid, but the server signalled that it was
  721. // closing the stream, so close both client and server.
  722. self.state = .clientClosedServerClosed(.init(previousState: state))
  723. return action
  724. case (.invalid(let action), false):
  725. self.state = .clientClosedServerIdle(.init(previousState: state))
  726. return action
  727. case (.valid, true):
  728. // This is a trailers-only response: close server.
  729. self.state = .clientOpenServerClosed(.init(previousState: state))
  730. return try self.validateAndReturnStatusAndMetadata(headers)
  731. case (.valid, false):
  732. switch self.processInboundEncoding(headers: headers, configuration: configuration) {
  733. case .error(let failure):
  734. return failure
  735. case .success(let inboundEncoding):
  736. let decompressor = Zlib.Method(encoding: inboundEncoding)
  737. .flatMap { Zlib.Decompressor(method: $0) }
  738. let deframer = GRPCMessageDeframer(
  739. maximumPayloadSize: state.maximumPayloadSize,
  740. decompressor: decompressor
  741. )
  742. self.state = .clientOpenServerOpen(
  743. .init(
  744. previousState: state,
  745. deframer: NIOSingleStepByteToMessageProcessor(deframer),
  746. decompressor: decompressor
  747. )
  748. )
  749. return .receivedMetadata(Metadata(headers: headers), nil)
  750. }
  751. }
  752. case .clientOpenServerOpen(let state):
  753. // This state is valid even if endStream is not set: server can send
  754. // trailing metadata without END_STREAM set, and follow it with an
  755. // empty message frame where it is set.
  756. // However, we must make sure that grpc-status is set, otherwise this
  757. // is an invalid state.
  758. if endStream {
  759. self.state = .clientOpenServerClosed(.init(previousState: state))
  760. }
  761. return try self.validateAndReturnStatusAndMetadata(headers)
  762. case .clientClosedServerIdle(let state):
  763. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  764. case (.invalid(let action), true):
  765. // The headers are invalid, but the server signalled that it was
  766. // closing the stream, so close the server side too.
  767. self.state = .clientClosedServerClosed(.init(previousState: state))
  768. return action
  769. case (.invalid(let action), false):
  770. // Client is already closed, so we don't need to update our state.
  771. return action
  772. case (.valid, true):
  773. // This is a trailers-only response: close server.
  774. self.state = .clientClosedServerClosed(.init(previousState: state))
  775. return try self.validateAndReturnStatusAndMetadata(headers)
  776. case (.valid, false):
  777. switch self.processInboundEncoding(headers: headers, configuration: configuration) {
  778. case .error(let failure):
  779. return failure
  780. case .success(let inboundEncoding):
  781. self.state = .clientClosedServerOpen(
  782. .init(
  783. previousState: state,
  784. decompressionAlgorithm: inboundEncoding
  785. )
  786. )
  787. return .receivedMetadata(Metadata(headers: headers), nil)
  788. }
  789. }
  790. case .clientClosedServerOpen(let state):
  791. // This state is valid even if endStream is not set: server can send
  792. // trailing metadata without END_STREAM set, and follow it with an
  793. // empty message frame where it is set.
  794. // However, we must make sure that grpc-status is set, otherwise this
  795. // is an invalid state.
  796. if endStream {
  797. self.state = .clientClosedServerClosed(.init(previousState: state))
  798. }
  799. return try self.validateAndReturnStatusAndMetadata(headers)
  800. case .clientClosedServerClosed:
  801. // We could end up here if we received a grpc-status header in a previous
  802. // frame (which would have already close the server) and then we receive
  803. // an empty frame with EOS set.
  804. // We wouldn't want to throw in that scenario, so we just ignore it.
  805. // Note that we don't want to ignore it if EOS is not set here though, as
  806. // then it would be an invalid payload.
  807. if !endStream || headers.count > 0 {
  808. try self.invalidState(
  809. "Server is closed, nothing could have been sent."
  810. )
  811. }
  812. return .doNothing
  813. case .clientIdleServerIdle:
  814. try self.invalidState(
  815. "Server cannot have sent metadata if the client is idle."
  816. )
  817. case .clientOpenServerClosed:
  818. try self.invalidState(
  819. "Server is closed, nothing could have been sent."
  820. )
  821. }
  822. }
  823. private mutating func clientReceive(
  824. buffer: ByteBuffer,
  825. endStream: Bool
  826. ) throws -> OnBufferReceivedAction {
  827. // This is a message received by the client, from the server.
  828. switch self.state {
  829. case .clientIdleServerIdle:
  830. try self.invalidState(
  831. "Cannot have received anything from server if client is not yet open."
  832. )
  833. case .clientOpenServerIdle, .clientClosedServerIdle:
  834. try self.invalidState(
  835. "Server cannot have sent a message before sending the initial metadata."
  836. )
  837. case .clientOpenServerOpen(var state):
  838. if endStream {
  839. // This is invalid as per the protocol specification, because the server
  840. // can only close by sending trailers, not by setting EOS when sending
  841. // a message.
  842. self.state = .clientClosedServerClosed(.init(previousState: state))
  843. return .endRPCAndForwardErrorStatus(
  844. Status(
  845. code: .internalError,
  846. message: """
  847. Server sent EOS alongside a data frame, but server is only allowed \
  848. to close by sending status and trailers.
  849. """
  850. )
  851. )
  852. }
  853. try state.deframer.process(buffer: buffer) { deframedMessage in
  854. state.inboundMessageBuffer.append(deframedMessage)
  855. }
  856. self.state = .clientOpenServerOpen(state)
  857. return .readInbound
  858. case .clientClosedServerOpen(var state):
  859. if endStream {
  860. self.state = .clientClosedServerClosed(.init(previousState: state))
  861. return .endRPCAndForwardErrorStatus(
  862. Status(
  863. code: .internalError,
  864. message: """
  865. Server sent EOS alongside a data frame, but server is only allowed \
  866. to close by sending status and trailers.
  867. """
  868. )
  869. )
  870. }
  871. // The client may have sent the end stream and thus it's closed,
  872. // but the server may still be responding.
  873. // The client must have a deframer set up, so force-unwrap is okay.
  874. try state.deframer!.process(buffer: buffer) { deframedMessage in
  875. state.inboundMessageBuffer.append(deframedMessage)
  876. }
  877. self.state = .clientClosedServerOpen(state)
  878. return .readInbound
  879. case .clientOpenServerClosed, .clientClosedServerClosed:
  880. try self.invalidState(
  881. "Cannot have received anything from a closed server."
  882. )
  883. }
  884. }
  885. private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
  886. switch self.state {
  887. case .clientOpenServerOpen(var state):
  888. let message = state.inboundMessageBuffer.pop()
  889. self.state = .clientOpenServerOpen(state)
  890. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  891. case .clientOpenServerClosed(var state):
  892. let message = state.inboundMessageBuffer.pop()
  893. self.state = .clientOpenServerClosed(state)
  894. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  895. case .clientClosedServerOpen(var state):
  896. let message = state.inboundMessageBuffer.pop()
  897. self.state = .clientClosedServerOpen(state)
  898. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  899. case .clientClosedServerClosed(var state):
  900. let message = state.inboundMessageBuffer.pop()
  901. self.state = .clientClosedServerClosed(state)
  902. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  903. case .clientIdleServerIdle,
  904. .clientOpenServerIdle,
  905. .clientClosedServerIdle:
  906. return .awaitMoreMessages
  907. }
  908. }
  909. private func invalidState(_ message: String, line: UInt = #line) throws -> Never {
  910. if !self.skipAssertions {
  911. assertionFailure(message, line: line)
  912. }
  913. throw RPCError(code: .internalError, message: message)
  914. }
  915. }
  916. // - MARK: Server
  917. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  918. extension GRPCStreamStateMachine {
  919. private func makeResponseHeaders(
  920. outboundEncoding: CompressionAlgorithm?,
  921. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
  922. customMetadata: Metadata
  923. ) -> HPACKHeaders {
  924. // Response headers always contain :status (HTTP Status 200) and content-type.
  925. // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
  926. var headers = HPACKHeaders()
  927. headers.reserveCapacity(4 + customMetadata.count)
  928. headers.add("200", forKey: .status)
  929. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  930. if let outboundEncoding, outboundEncoding != .none {
  931. headers.add(outboundEncoding.name, forKey: .encoding)
  932. }
  933. for metadataPair in customMetadata {
  934. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  935. }
  936. return headers
  937. }
  938. private mutating func serverSend(
  939. metadata: Metadata,
  940. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  941. ) throws -> HPACKHeaders {
  942. // Server sends initial metadata
  943. switch self.state {
  944. case .clientOpenServerIdle(let state):
  945. let outboundEncoding = state.outboundCompression
  946. self.state = .clientOpenServerOpen(
  947. .init(
  948. previousState: state,
  949. // In the case of the server, it will already have a deframer set up,
  950. // because it already knows what encoding the client is using:
  951. // it's okay to force-unwrap.
  952. deframer: state.deframer!,
  953. decompressor: state.decompressor
  954. )
  955. )
  956. return self.makeResponseHeaders(
  957. outboundEncoding: outboundEncoding,
  958. configuration: configuration,
  959. customMetadata: metadata
  960. )
  961. case .clientClosedServerIdle(let state):
  962. let outboundEncoding = state.outboundCompression
  963. self.state = .clientClosedServerOpen(.init(previousState: state))
  964. return self.makeResponseHeaders(
  965. outboundEncoding: outboundEncoding,
  966. configuration: configuration,
  967. customMetadata: metadata
  968. )
  969. case .clientIdleServerIdle:
  970. try self.invalidState(
  971. "Client cannot be idle if server is sending initial metadata: it must have opened."
  972. )
  973. case .clientOpenServerClosed, .clientClosedServerClosed:
  974. try self.invalidState(
  975. "Server cannot send metadata if closed."
  976. )
  977. case .clientOpenServerOpen, .clientClosedServerOpen:
  978. try self.invalidState(
  979. "Server has already sent initial metadata."
  980. )
  981. }
  982. }
  983. private mutating func serverSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  984. switch self.state {
  985. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  986. try self.invalidState(
  987. "Server must have sent initial metadata before sending a message."
  988. )
  989. case .clientOpenServerOpen(var state):
  990. state.framer.append(message, promise: promise)
  991. self.state = .clientOpenServerOpen(state)
  992. case .clientClosedServerOpen(var state):
  993. state.framer.append(message, promise: promise)
  994. self.state = .clientClosedServerOpen(state)
  995. case .clientOpenServerClosed, .clientClosedServerClosed:
  996. try self.invalidState(
  997. "Server can't send a message if it's closed."
  998. )
  999. }
  1000. }
  1001. private mutating func serverSend(
  1002. status: Status,
  1003. customMetadata: Metadata
  1004. ) throws -> HPACKHeaders {
  1005. // Close the server.
  1006. switch self.state {
  1007. case .clientOpenServerOpen(let state):
  1008. self.state = .clientOpenServerClosed(.init(previousState: state))
  1009. return .trailers(status: status, metadata: customMetadata)
  1010. case .clientClosedServerOpen(let state):
  1011. self.state = .clientClosedServerClosed(.init(previousState: state))
  1012. return .trailers(status: status, metadata: customMetadata)
  1013. case .clientOpenServerIdle(let state):
  1014. self.state = .clientOpenServerClosed(.init(previousState: state))
  1015. return .trailersOnly(status: status, metadata: customMetadata)
  1016. case .clientClosedServerIdle(let state):
  1017. self.state = .clientClosedServerClosed(.init(previousState: state))
  1018. return .trailersOnly(status: status, metadata: customMetadata)
  1019. case .clientIdleServerIdle:
  1020. try self.invalidState(
  1021. "Server can't send status if client is idle."
  1022. )
  1023. case .clientOpenServerClosed, .clientClosedServerClosed:
  1024. try self.invalidState(
  1025. "Server can't send anything if closed."
  1026. )
  1027. }
  1028. }
  1029. private mutating func serverReceive(
  1030. headers: HPACKHeaders,
  1031. endStream: Bool,
  1032. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  1033. ) throws -> OnMetadataReceived {
  1034. func closeServer(
  1035. from state: GRPCStreamStateMachineState.ClientIdleServerIdleState,
  1036. endStream: Bool
  1037. ) -> GRPCStreamStateMachineState {
  1038. if endStream {
  1039. return .clientClosedServerClosed(.init(previousState: state))
  1040. } else {
  1041. return .clientOpenServerClosed(.init(previousState: state))
  1042. }
  1043. }
  1044. switch self.state {
  1045. case .clientIdleServerIdle(let state):
  1046. let contentType = headers.firstString(forKey: .contentType)
  1047. .flatMap { ContentType(value: $0) }
  1048. if contentType == nil {
  1049. self.state = .clientOpenServerClosed(.init(previousState: state))
  1050. // Respond with HTTP-level Unsupported Media Type status code.
  1051. var trailers = HPACKHeaders()
  1052. trailers.add("415", forKey: .status)
  1053. return .rejectRPC(trailers: trailers)
  1054. }
  1055. guard let pathHeader = headers.firstString(forKey: .path) else {
  1056. self.state = closeServer(from: state, endStream: endStream)
  1057. return .rejectRPC(
  1058. trailers: .trailersOnly(
  1059. code: .invalidArgument,
  1060. message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
  1061. )
  1062. )
  1063. }
  1064. guard let path = MethodDescriptor(path: pathHeader) else {
  1065. self.state = closeServer(from: state, endStream: endStream)
  1066. return .rejectRPC(
  1067. trailers: .trailersOnly(
  1068. code: .unimplemented,
  1069. message:
  1070. "The given \(GRPCHTTP2Keys.path.rawValue) (\(pathHeader)) does not correspond to a valid method."
  1071. )
  1072. )
  1073. }
  1074. let scheme = headers.firstString(forKey: .scheme)
  1075. .flatMap { GRPCStreamStateMachineConfiguration.Scheme(rawValue: $0) }
  1076. if scheme == nil {
  1077. self.state = closeServer(from: state, endStream: endStream)
  1078. return .rejectRPC(
  1079. trailers: .trailersOnly(
  1080. code: .invalidArgument,
  1081. message: ":scheme header must be present and one of \"http\" or \"https\"."
  1082. )
  1083. )
  1084. }
  1085. guard let method = headers.firstString(forKey: .method), method == "POST" else {
  1086. self.state = closeServer(from: state, endStream: endStream)
  1087. return .rejectRPC(
  1088. trailers: .trailersOnly(
  1089. code: .invalidArgument,
  1090. message: ":method header is expected to be present and have a value of \"POST\"."
  1091. )
  1092. )
  1093. }
  1094. guard let te = headers.firstString(forKey: .te), te == "trailers" else {
  1095. self.state = closeServer(from: state, endStream: endStream)
  1096. return .rejectRPC(
  1097. trailers: .trailersOnly(
  1098. code: .invalidArgument,
  1099. message: "\"te\" header is expected to be present and have a value of \"trailers\"."
  1100. )
  1101. )
  1102. }
  1103. // Firstly, find out if we support the client's chosen encoding, and reject
  1104. // the RPC if we don't.
  1105. let inboundEncoding: CompressionAlgorithm
  1106. let encodingValues = headers.values(
  1107. forHeader: GRPCHTTP2Keys.encoding.rawValue,
  1108. canonicalForm: true
  1109. )
  1110. var encodingValuesIterator = encodingValues.makeIterator()
  1111. if let rawEncoding = encodingValuesIterator.next() {
  1112. guard encodingValuesIterator.next() == nil else {
  1113. self.state = closeServer(from: state, endStream: endStream)
  1114. return .rejectRPC(
  1115. trailers: .trailersOnly(
  1116. code: .internalError,
  1117. message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
  1118. )
  1119. )
  1120. }
  1121. guard let clientEncoding = CompressionAlgorithm(name: rawEncoding),
  1122. configuration.acceptedEncodings.contains(clientEncoding)
  1123. else {
  1124. self.state = closeServer(from: state, endStream: endStream)
  1125. var trailers = HPACKHeaders.trailersOnly(
  1126. code: .unimplemented,
  1127. message: """
  1128. \(rawEncoding) compression is not supported; \
  1129. supported algorithms are listed in grpc-accept-encoding
  1130. """
  1131. )
  1132. for acceptedEncoding in configuration.acceptedEncodings.elements {
  1133. trailers.add(name: GRPCHTTP2Keys.acceptEncoding.rawValue, value: acceptedEncoding.name)
  1134. }
  1135. return .rejectRPC(trailers: trailers)
  1136. }
  1137. // Server supports client's encoding.
  1138. inboundEncoding = clientEncoding
  1139. } else {
  1140. inboundEncoding = .none
  1141. }
  1142. // Secondly, find a compatible encoding the server can use to compress outbound messages,
  1143. // based on the encodings the client has advertised.
  1144. var outboundEncoding: CompressionAlgorithm = .none
  1145. let clientAdvertisedEncodings = headers.values(
  1146. forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue,
  1147. canonicalForm: true
  1148. )
  1149. // Find the preferred encoding and use it to compress responses.
  1150. for clientAdvertisedEncoding in clientAdvertisedEncodings {
  1151. if let algorithm = CompressionAlgorithm(name: clientAdvertisedEncoding),
  1152. configuration.acceptedEncodings.contains(algorithm)
  1153. {
  1154. outboundEncoding = algorithm
  1155. break
  1156. }
  1157. }
  1158. if endStream {
  1159. self.state = .clientClosedServerIdle(
  1160. .init(
  1161. previousState: state,
  1162. compressionAlgorithm: outboundEncoding
  1163. )
  1164. )
  1165. } else {
  1166. let compressor = Zlib.Method(encoding: outboundEncoding)
  1167. .flatMap { Zlib.Compressor(method: $0) }
  1168. let decompressor = Zlib.Method(encoding: inboundEncoding)
  1169. .flatMap { Zlib.Decompressor(method: $0) }
  1170. let deframer = GRPCMessageDeframer(
  1171. maximumPayloadSize: state.maximumPayloadSize,
  1172. decompressor: decompressor
  1173. )
  1174. self.state = .clientOpenServerIdle(
  1175. .init(
  1176. previousState: state,
  1177. compressor: compressor,
  1178. outboundCompression: outboundEncoding,
  1179. framer: GRPCMessageFramer(),
  1180. decompressor: decompressor,
  1181. deframer: NIOSingleStepByteToMessageProcessor(deframer)
  1182. )
  1183. )
  1184. }
  1185. return .receivedMetadata(Metadata(headers: headers), path)
  1186. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  1187. // Metadata has already been received, should only be sent once by clients.
  1188. return .protocolViolation
  1189. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1190. try self.invalidState("Client can't have sent metadata if closed.")
  1191. }
  1192. }
  1193. private mutating func serverReceive(
  1194. buffer: ByteBuffer,
  1195. endStream: Bool
  1196. ) throws -> OnBufferReceivedAction {
  1197. let action: OnBufferReceivedAction
  1198. switch self.state {
  1199. case .clientIdleServerIdle:
  1200. try self.invalidState("Can't have received a message if client is idle.")
  1201. case .clientOpenServerIdle(var state):
  1202. // Deframer must be present on the server side, as we know the decompression
  1203. // algorithm from the moment the client opens.
  1204. try state.deframer!.process(buffer: buffer) { deframedMessage in
  1205. state.inboundMessageBuffer.append(deframedMessage)
  1206. }
  1207. if endStream {
  1208. self.state = .clientClosedServerIdle(.init(previousState: state))
  1209. } else {
  1210. self.state = .clientOpenServerIdle(state)
  1211. }
  1212. action = .readInbound
  1213. case .clientOpenServerOpen(var state):
  1214. try state.deframer.process(buffer: buffer) { deframedMessage in
  1215. state.inboundMessageBuffer.append(deframedMessage)
  1216. }
  1217. if endStream {
  1218. self.state = .clientClosedServerOpen(.init(previousState: state))
  1219. } else {
  1220. self.state = .clientOpenServerOpen(state)
  1221. }
  1222. action = .readInbound
  1223. case .clientOpenServerClosed(let state):
  1224. // Client is not done sending request, but server has already closed.
  1225. // Ignore the rest of the request: do nothing, unless endStream is set,
  1226. // in which case close the client.
  1227. if endStream {
  1228. self.state = .clientClosedServerClosed(.init(previousState: state))
  1229. }
  1230. action = .doNothing
  1231. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1232. try self.invalidState("Client can't send a message if closed.")
  1233. }
  1234. return action
  1235. }
  1236. private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
  1237. switch self.state {
  1238. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  1239. try self.invalidState("Server is not open yet.")
  1240. case .clientOpenServerOpen(var state):
  1241. let response = try state.framer.next(compressor: state.compressor)
  1242. self.state = .clientOpenServerOpen(state)
  1243. return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  1244. ?? .awaitMoreMessages
  1245. case .clientClosedServerOpen(var state):
  1246. let response = try state.framer.next(compressor: state.compressor)
  1247. self.state = .clientClosedServerOpen(state)
  1248. return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  1249. ?? .awaitMoreMessages
  1250. case .clientOpenServerClosed(var state):
  1251. let response = try state.framer?.next(compressor: state.compressor)
  1252. self.state = .clientOpenServerClosed(state)
  1253. if let response {
  1254. return .sendFrame(frame: response.bytes, promise: response.promise)
  1255. } else {
  1256. return .noMoreMessages
  1257. }
  1258. case .clientClosedServerClosed(var state):
  1259. let response = try state.framer?.next(compressor: state.compressor)
  1260. self.state = .clientClosedServerClosed(state)
  1261. if let response {
  1262. return .sendFrame(frame: response.bytes, promise: response.promise)
  1263. } else {
  1264. return .noMoreMessages
  1265. }
  1266. }
  1267. }
  1268. private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
  1269. switch self.state {
  1270. case .clientOpenServerIdle(var state):
  1271. let request = state.inboundMessageBuffer.pop()
  1272. self.state = .clientOpenServerIdle(state)
  1273. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1274. case .clientOpenServerOpen(var state):
  1275. let request = state.inboundMessageBuffer.pop()
  1276. self.state = .clientOpenServerOpen(state)
  1277. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1278. case .clientClosedServerIdle(var state):
  1279. let request = state.inboundMessageBuffer.pop()
  1280. self.state = .clientClosedServerIdle(state)
  1281. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1282. case .clientClosedServerOpen(var state):
  1283. let request = state.inboundMessageBuffer.pop()
  1284. self.state = .clientClosedServerOpen(state)
  1285. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1286. case .clientOpenServerClosed, .clientClosedServerClosed:
  1287. // Server has closed, no need to read.
  1288. return .noMoreMessages
  1289. case .clientIdleServerIdle:
  1290. return .awaitMoreMessages
  1291. }
  1292. }
  1293. }
  1294. extension MethodDescriptor {
  1295. init?(path: String) {
  1296. var view = path[...]
  1297. guard view.popFirst() == "/" else { return nil }
  1298. // Find the index of the "/" separating the service and method names.
  1299. guard var index = view.firstIndex(of: "/") else { return nil }
  1300. let service = String(view[..<index])
  1301. view.formIndex(after: &index)
  1302. let method = String(view[index...])
  1303. self.init(service: service, method: method)
  1304. }
  1305. }
  1306. internal enum GRPCHTTP2Keys: String {
  1307. case path = ":path"
  1308. case contentType = "content-type"
  1309. case encoding = "grpc-encoding"
  1310. case acceptEncoding = "grpc-accept-encoding"
  1311. case scheme = ":scheme"
  1312. case method = ":method"
  1313. case te = "te"
  1314. case status = ":status"
  1315. case grpcStatus = "grpc-status"
  1316. case grpcStatusMessage = "grpc-message"
  1317. }
  1318. extension HPACKHeaders {
  1319. internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
  1320. self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
  1321. .map {
  1322. String($0)
  1323. }
  1324. }
  1325. internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
  1326. self.add(name: key.rawValue, value: value)
  1327. }
  1328. static func trailersOnly(code: Status.Code, message: String, metadata: Metadata = [:]) -> Self {
  1329. return .trailersOnly(status: Status(code: code, message: message), metadata: metadata)
  1330. }
  1331. static func trailersOnly(status: Status, metadata: Metadata = [:]) -> Self {
  1332. return .makeTrailers(isTrailersOnly: true, status: status, metadata: metadata)
  1333. }
  1334. static func trailers(status: Status, metadata: Metadata = [:]) -> Self {
  1335. return .makeTrailers(isTrailersOnly: false, status: status, metadata: metadata)
  1336. }
  1337. private static func makeTrailers(
  1338. isTrailersOnly: Bool,
  1339. status: Status,
  1340. metadata: Metadata
  1341. ) -> Self {
  1342. var trailers = HPACKHeaders()
  1343. if isTrailersOnly {
  1344. trailers.reserveCapacity(4 + metadata.count)
  1345. trailers.add("200", forKey: .status)
  1346. trailers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  1347. } else {
  1348. trailers.reserveCapacity(2 + metadata.count)
  1349. }
  1350. trailers.add(String(status.code.rawValue), forKey: .grpcStatus)
  1351. if !status.message.isEmpty, let encoded = GRPCStatusMessageMarshaller.marshall(status.message) {
  1352. trailers.add(encoded, forKey: .grpcStatusMessage)
  1353. }
  1354. for (key, value) in metadata {
  1355. trailers.add(name: key, value: value.encoded())
  1356. }
  1357. return trailers
  1358. }
  1359. }
  1360. extension Zlib.Method {
  1361. init?(encoding: CompressionAlgorithm) {
  1362. switch encoding {
  1363. case .none:
  1364. return nil
  1365. case .deflate:
  1366. self = .deflate
  1367. case .gzip:
  1368. self = .gzip
  1369. default:
  1370. return nil
  1371. }
  1372. }
  1373. }
  1374. extension Metadata {
  1375. init(headers: HPACKHeaders) {
  1376. var metadata = Metadata()
  1377. metadata.reserveCapacity(headers.count)
  1378. for header in headers {
  1379. if header.name.hasSuffix("-bin") {
  1380. do {
  1381. let decodedBinary = try header.value.base64Decoded()
  1382. metadata.addBinary(decodedBinary, forKey: header.name)
  1383. } catch {
  1384. metadata.addString(header.value, forKey: header.name)
  1385. }
  1386. } else {
  1387. metadata.addString(header.value, forKey: header.name)
  1388. }
  1389. }
  1390. self = metadata
  1391. }
  1392. }
  1393. extension Status.Code {
  1394. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  1395. init(httpStatusCode: HTTPResponseStatus) {
  1396. switch httpStatusCode {
  1397. case .badRequest:
  1398. self = .internalError
  1399. case .unauthorized:
  1400. self = .unauthenticated
  1401. case .forbidden:
  1402. self = .permissionDenied
  1403. case .notFound:
  1404. self = .unimplemented
  1405. case .tooManyRequests, .badGateway, .serviceUnavailable, .gatewayTimeout:
  1406. self = .unavailable
  1407. default:
  1408. self = .unknown
  1409. }
  1410. }
  1411. }
  1412. extension MethodDescriptor {
  1413. var path: String {
  1414. return "/\(self.service)/\(self.method)"
  1415. }
  1416. }