GRPCStreamStateMachine.swift 55 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. enum Scheme: String {
  21. case http
  22. case https
  23. }
  24. enum GRPCStreamStateMachineConfiguration {
  25. case client(ClientConfiguration)
  26. case server(ServerConfiguration)
  27. struct ClientConfiguration {
  28. var methodDescriptor: MethodDescriptor
  29. var scheme: Scheme
  30. var outboundEncoding: CompressionAlgorithm
  31. var acceptedEncodings: CompressionAlgorithmSet
  32. init(
  33. methodDescriptor: MethodDescriptor,
  34. scheme: Scheme,
  35. outboundEncoding: CompressionAlgorithm,
  36. acceptedEncodings: CompressionAlgorithmSet
  37. ) {
  38. self.methodDescriptor = methodDescriptor
  39. self.scheme = scheme
  40. self.outboundEncoding = outboundEncoding
  41. self.acceptedEncodings = acceptedEncodings.union(.none)
  42. }
  43. }
  44. struct ServerConfiguration {
  45. var scheme: Scheme
  46. var acceptedEncodings: CompressionAlgorithmSet
  47. init(scheme: Scheme, acceptedEncodings: CompressionAlgorithmSet) {
  48. self.scheme = scheme
  49. self.acceptedEncodings = acceptedEncodings.union(.none)
  50. }
  51. }
  52. }
  53. private enum GRPCStreamStateMachineState {
  54. case clientIdleServerIdle(ClientIdleServerIdleState)
  55. case clientOpenServerIdle(ClientOpenServerIdleState)
  56. case clientOpenServerOpen(ClientOpenServerOpenState)
  57. case clientOpenServerClosed(ClientOpenServerClosedState)
  58. case clientClosedServerIdle(ClientClosedServerIdleState)
  59. case clientClosedServerOpen(ClientClosedServerOpenState)
  60. case clientClosedServerClosed(ClientClosedServerClosedState)
  61. struct ClientIdleServerIdleState {
  62. let maximumPayloadSize: Int
  63. }
  64. struct ClientOpenServerIdleState {
  65. let maximumPayloadSize: Int
  66. var framer: GRPCMessageFramer
  67. var compressor: Zlib.Compressor?
  68. var outboundCompression: CompressionAlgorithm?
  69. // The deframer must be optional because the client will not have one configured
  70. // until the server opens and sends a grpc-encoding header.
  71. // It will be present for the server though, because even though it's idle,
  72. // it can still receive compressed messages from the client.
  73. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  74. var decompressor: Zlib.Decompressor?
  75. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  76. init(
  77. previousState: ClientIdleServerIdleState,
  78. compressor: Zlib.Compressor?,
  79. framer: GRPCMessageFramer,
  80. decompressor: Zlib.Decompressor?,
  81. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  82. ) {
  83. self.maximumPayloadSize = previousState.maximumPayloadSize
  84. self.compressor = compressor
  85. self.framer = framer
  86. self.decompressor = decompressor
  87. self.deframer = deframer
  88. self.inboundMessageBuffer = .init()
  89. }
  90. }
  91. struct ClientOpenServerOpenState {
  92. var framer: GRPCMessageFramer
  93. var compressor: Zlib.Compressor?
  94. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>
  95. var decompressor: Zlib.Decompressor?
  96. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  97. init(
  98. previousState: ClientOpenServerIdleState,
  99. deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>,
  100. decompressor: Zlib.Decompressor?
  101. ) {
  102. self.framer = previousState.framer
  103. self.compressor = previousState.compressor
  104. self.deframer = deframer
  105. self.decompressor = decompressor
  106. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  107. }
  108. }
  109. struct ClientOpenServerClosedState {
  110. var framer: GRPCMessageFramer?
  111. var compressor: Zlib.Compressor?
  112. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  113. var decompressor: Zlib.Decompressor?
  114. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  115. // This transition should only happen on the server-side when, upon receiving
  116. // initial client metadata, some of the headers are invalid and we must reject
  117. // the RPC.
  118. // We will mark the client as open (because it sent initial metadata albeit
  119. // invalid) but we'll close the server, meaning all future messages sent from
  120. // the client will be ignored. Because of this, we won't need to frame or
  121. // deframe any messages, as we won't be reading or writing any messages.
  122. init(previousState: ClientIdleServerIdleState) {
  123. self.framer = nil
  124. self.compressor = nil
  125. self.deframer = nil
  126. self.decompressor = nil
  127. self.inboundMessageBuffer = .init()
  128. }
  129. init(previousState: ClientOpenServerOpenState) {
  130. self.framer = previousState.framer
  131. self.compressor = previousState.compressor
  132. self.deframer = previousState.deframer
  133. self.decompressor = previousState.decompressor
  134. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  135. }
  136. init(previousState: ClientOpenServerIdleState) {
  137. self.framer = previousState.framer
  138. self.compressor = previousState.compressor
  139. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  140. // The server went directly from idle to closed - this means it sent a
  141. // trailers-only response:
  142. // - if we're the client, the previous state was a nil deframer, but that
  143. // is okay because we don't need a deframer as the server won't be sending
  144. // any messages;
  145. // - if we're the server, we'll keep whatever deframer we had.
  146. self.deframer = previousState.deframer
  147. self.decompressor = previousState.decompressor
  148. }
  149. }
  150. struct ClientClosedServerIdleState {
  151. let maximumPayloadSize: Int
  152. var framer: GRPCMessageFramer
  153. var compressor: Zlib.Compressor?
  154. var outboundCompression: CompressionAlgorithm?
  155. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  156. var decompressor: Zlib.Decompressor?
  157. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  158. /// We are closing the client as soon as it opens (i.e., endStream was set when receiving the client's
  159. /// initial metadata). We don't need to know a decompression algorithm, since we won't receive
  160. /// any more messages from the client anyways, as it's closed.
  161. init(
  162. previousState: ClientIdleServerIdleState,
  163. compressionAlgorithm: CompressionAlgorithm
  164. ) {
  165. self.maximumPayloadSize = previousState.maximumPayloadSize
  166. if let zlibMethod = Zlib.Method(encoding: compressionAlgorithm) {
  167. self.compressor = Zlib.Compressor(method: zlibMethod)
  168. }
  169. self.framer = GRPCMessageFramer()
  170. self.outboundCompression = compressionAlgorithm
  171. // We don't need a deframer since we won't receive any messages from the
  172. // client: it's closed.
  173. self.deframer = nil
  174. self.inboundMessageBuffer = .init()
  175. }
  176. init(previousState: ClientOpenServerIdleState) {
  177. self.maximumPayloadSize = previousState.maximumPayloadSize
  178. self.framer = previousState.framer
  179. self.compressor = previousState.compressor
  180. self.outboundCompression = previousState.outboundCompression
  181. self.deframer = previousState.deframer
  182. self.decompressor = previousState.decompressor
  183. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  184. }
  185. }
  186. struct ClientClosedServerOpenState {
  187. var framer: GRPCMessageFramer
  188. var compressor: Zlib.Compressor?
  189. let deframer: NIOSingleStepByteToMessageProcessor<GRPCMessageDeframer>?
  190. var decompressor: Zlib.Decompressor?
  191. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  192. init(previousState: ClientOpenServerOpenState) {
  193. self.framer = previousState.framer
  194. self.compressor = previousState.compressor
  195. self.deframer = previousState.deframer
  196. self.decompressor = previousState.decompressor
  197. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  198. }
  199. /// This should be called from the server path, as the deframer will already be configured in this scenario.
  200. init(previousState: ClientClosedServerIdleState) {
  201. self.framer = previousState.framer
  202. self.compressor = previousState.compressor
  203. // In the case of the server, we don't need to deframe/decompress any more
  204. // messages, since the client's closed.
  205. self.deframer = nil
  206. self.decompressor = nil
  207. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  208. }
  209. /// This should only be called from the client path, as the deframer has not yet been set up.
  210. init(
  211. previousState: ClientClosedServerIdleState,
  212. decompressionAlgorithm: CompressionAlgorithm
  213. ) {
  214. self.framer = previousState.framer
  215. self.compressor = previousState.compressor
  216. // In the case of the client, it will only be able to set up the deframer
  217. // after it receives the chosen encoding from the server.
  218. if let zlibMethod = Zlib.Method(encoding: decompressionAlgorithm) {
  219. self.decompressor = Zlib.Decompressor(method: zlibMethod)
  220. }
  221. let decoder = GRPCMessageDeframer(
  222. maximumPayloadSize: previousState.maximumPayloadSize,
  223. decompressor: self.decompressor
  224. )
  225. self.deframer = NIOSingleStepByteToMessageProcessor(decoder)
  226. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  227. }
  228. }
  229. struct ClientClosedServerClosedState {
  230. // We still need the framer and compressor in case the server has closed
  231. // but its buffer is not yet empty and still needs to send messages out to
  232. // the client.
  233. var framer: GRPCMessageFramer?
  234. var compressor: Zlib.Compressor?
  235. // These are already deframed, so we don't need the deframer anymore.
  236. var inboundMessageBuffer: OneOrManyQueue<[UInt8]>
  237. // This transition should only happen on the server-side when, upon receiving
  238. // initial client metadata, some of the headers are invalid and we must reject
  239. // the RPC.
  240. // We will mark the client as closed (because it set the EOS flag, even if
  241. // the initial metadata was invalid) and we'll close the server too.
  242. // Because of this, we won't need to frame any messages, as we
  243. // won't be writing any messages.
  244. init(previousState: ClientIdleServerIdleState) {
  245. self.framer = nil
  246. self.compressor = nil
  247. self.inboundMessageBuffer = .init()
  248. }
  249. init(previousState: ClientClosedServerOpenState) {
  250. self.framer = previousState.framer
  251. self.compressor = previousState.compressor
  252. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  253. }
  254. init(previousState: ClientClosedServerIdleState) {
  255. self.framer = previousState.framer
  256. self.compressor = previousState.compressor
  257. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  258. }
  259. init(previousState: ClientOpenServerIdleState) {
  260. self.framer = previousState.framer
  261. self.compressor = previousState.compressor
  262. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  263. }
  264. init(previousState: ClientOpenServerOpenState) {
  265. self.framer = previousState.framer
  266. self.compressor = previousState.compressor
  267. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  268. }
  269. init(previousState: ClientOpenServerClosedState) {
  270. self.framer = previousState.framer
  271. self.compressor = previousState.compressor
  272. self.inboundMessageBuffer = previousState.inboundMessageBuffer
  273. }
  274. }
  275. }
  276. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  277. struct GRPCStreamStateMachine {
  278. private var state: GRPCStreamStateMachineState
  279. private var configuration: GRPCStreamStateMachineConfiguration
  280. private var skipAssertions: Bool
  281. init(
  282. configuration: GRPCStreamStateMachineConfiguration,
  283. maximumPayloadSize: Int,
  284. skipAssertions: Bool = false
  285. ) {
  286. self.state = .clientIdleServerIdle(.init(maximumPayloadSize: maximumPayloadSize))
  287. self.configuration = configuration
  288. self.skipAssertions = skipAssertions
  289. }
  290. mutating func send(metadata: Metadata) throws -> HPACKHeaders {
  291. switch self.configuration {
  292. case .client(let clientConfiguration):
  293. return try self.clientSend(metadata: metadata, configuration: clientConfiguration)
  294. case .server(let serverConfiguration):
  295. return try self.serverSend(metadata: metadata, configuration: serverConfiguration)
  296. }
  297. }
  298. mutating func send(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  299. switch self.configuration {
  300. case .client:
  301. try self.clientSend(message: message, promise: promise)
  302. case .server:
  303. try self.serverSend(message: message, promise: promise)
  304. }
  305. }
  306. mutating func closeOutbound() throws {
  307. switch self.configuration {
  308. case .client:
  309. try self.clientCloseOutbound()
  310. case .server:
  311. try self.invalidState("Server cannot call close: it must send status and trailers.")
  312. }
  313. }
  314. mutating func send(
  315. status: Status,
  316. metadata: Metadata
  317. ) throws -> HPACKHeaders {
  318. switch self.configuration {
  319. case .client:
  320. try self.invalidState(
  321. "Client cannot send status and trailer."
  322. )
  323. case .server:
  324. return try self.serverSend(
  325. status: status,
  326. customMetadata: metadata
  327. )
  328. }
  329. }
  330. enum OnMetadataReceived: Equatable {
  331. case receivedMetadata(Metadata)
  332. // Client-specific actions
  333. case receivedStatusAndMetadata(status: Status, metadata: Metadata)
  334. case doNothing
  335. // Server-specific actions
  336. case rejectRPC(trailers: HPACKHeaders)
  337. }
  338. mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
  339. switch self.configuration {
  340. case .client(let clientConfiguration):
  341. return try self.clientReceive(
  342. headers: headers,
  343. endStream: endStream,
  344. configuration: clientConfiguration
  345. )
  346. case .server(let serverConfiguration):
  347. return try self.serverReceive(
  348. headers: headers,
  349. endStream: endStream,
  350. configuration: serverConfiguration
  351. )
  352. }
  353. }
  354. enum OnBufferReceivedAction: Equatable {
  355. case readInbound
  356. // Client-specific actions
  357. // This will be returned when the server sends a data frame with EOS set.
  358. // This is invalid as per the protocol specification, because the server
  359. // can only close by sending trailers, not by setting EOS when sending
  360. // a message.
  361. case endRPCAndForwardErrorStatus(Status)
  362. }
  363. mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction {
  364. switch self.configuration {
  365. case .client:
  366. return try self.clientReceive(buffer: buffer, endStream: endStream)
  367. case .server:
  368. return try self.serverReceive(buffer: buffer, endStream: endStream)
  369. }
  370. }
  371. /// The result of requesting the next outbound frame, which may contain multiple messages.
  372. enum OnNextOutboundFrame {
  373. /// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done
  374. /// writing messages (i.e. we are now closed).
  375. case noMoreMessages
  376. /// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying.
  377. case awaitMoreMessages
  378. /// A frame is ready to be sent.
  379. case sendFrame(
  380. frame: ByteBuffer,
  381. promise: EventLoopPromise<Void>?
  382. )
  383. }
  384. mutating func nextOutboundFrame() throws -> OnNextOutboundFrame {
  385. switch self.configuration {
  386. case .client:
  387. return try self.clientNextOutboundFrame()
  388. case .server:
  389. return try self.serverNextOutboundFrame()
  390. }
  391. }
  392. /// The result of requesting the next inbound message.
  393. enum OnNextInboundMessage: Equatable {
  394. /// The sender is done writing messages and there are no more messages to be received.
  395. case noMoreMessages
  396. /// There isn't a message ready to be sent, but we could still receive more, so keep trying.
  397. case awaitMoreMessages
  398. /// A message has been received.
  399. case receiveMessage([UInt8])
  400. }
  401. mutating func nextInboundMessage() -> OnNextInboundMessage {
  402. switch self.configuration {
  403. case .client:
  404. return self.clientNextInboundMessage()
  405. case .server:
  406. return self.serverNextInboundMessage()
  407. }
  408. }
  409. mutating func tearDown() {
  410. switch self.state {
  411. case .clientIdleServerIdle:
  412. ()
  413. case .clientOpenServerIdle(let state):
  414. state.compressor?.end()
  415. state.decompressor?.end()
  416. case .clientOpenServerOpen(let state):
  417. state.compressor?.end()
  418. state.decompressor?.end()
  419. case .clientOpenServerClosed(let state):
  420. state.compressor?.end()
  421. state.decompressor?.end()
  422. case .clientClosedServerIdle(let state):
  423. state.compressor?.end()
  424. state.decompressor?.end()
  425. case .clientClosedServerOpen(let state):
  426. state.compressor?.end()
  427. state.decompressor?.end()
  428. case .clientClosedServerClosed(let state):
  429. state.compressor?.end()
  430. }
  431. }
  432. }
  433. // - MARK: Client
  434. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  435. extension GRPCStreamStateMachine {
  436. private func makeClientHeaders(
  437. methodDescriptor: MethodDescriptor,
  438. scheme: Scheme,
  439. outboundEncoding: CompressionAlgorithm?,
  440. acceptedEncodings: CompressionAlgorithmSet,
  441. customMetadata: Metadata
  442. ) -> HPACKHeaders {
  443. var headers = HPACKHeaders()
  444. headers.reserveCapacity(7 + customMetadata.count)
  445. // Add required headers.
  446. // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  447. // The order is important here: reserved HTTP2 headers (those starting with `:`)
  448. // must come before all other headers.
  449. headers.add("POST", forKey: .method)
  450. headers.add(scheme.rawValue, forKey: .scheme)
  451. headers.add(methodDescriptor.fullyQualifiedMethod, forKey: .path)
  452. // Add required gRPC headers.
  453. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  454. headers.add("trailers", forKey: .te) // Used to detect incompatible proxies
  455. if let encoding = outboundEncoding, encoding != .none {
  456. headers.add(encoding.name, forKey: .encoding)
  457. }
  458. for encoding in acceptedEncodings.elements.filter({ $0 != .none }) {
  459. headers.add(encoding.name, forKey: .acceptEncoding)
  460. }
  461. for metadataPair in customMetadata {
  462. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  463. }
  464. return headers
  465. }
  466. private mutating func clientSend(
  467. metadata: Metadata,
  468. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  469. ) throws -> HPACKHeaders {
  470. // Client sends metadata only when opening the stream.
  471. switch self.state {
  472. case .clientIdleServerIdle(let state):
  473. let outboundEncoding = configuration.outboundEncoding
  474. let compressor = Zlib.Method(encoding: outboundEncoding)
  475. .flatMap { Zlib.Compressor(method: $0) }
  476. self.state = .clientOpenServerIdle(
  477. .init(
  478. previousState: state,
  479. compressor: compressor,
  480. framer: GRPCMessageFramer(),
  481. decompressor: nil,
  482. deframer: nil
  483. )
  484. )
  485. return self.makeClientHeaders(
  486. methodDescriptor: configuration.methodDescriptor,
  487. scheme: configuration.scheme,
  488. outboundEncoding: configuration.outboundEncoding,
  489. acceptedEncodings: configuration.acceptedEncodings,
  490. customMetadata: metadata
  491. )
  492. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  493. try self.invalidState(
  494. "Client is already open: shouldn't be sending metadata."
  495. )
  496. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  497. try self.invalidState(
  498. "Client is closed: can't send metadata."
  499. )
  500. }
  501. }
  502. private mutating func clientSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  503. switch self.state {
  504. case .clientIdleServerIdle:
  505. try self.invalidState("Client not yet open.")
  506. case .clientOpenServerIdle(var state):
  507. state.framer.append(message, promise: promise)
  508. self.state = .clientOpenServerIdle(state)
  509. case .clientOpenServerOpen(var state):
  510. state.framer.append(message, promise: promise)
  511. self.state = .clientOpenServerOpen(state)
  512. case .clientOpenServerClosed:
  513. // The server has closed, so it makes no sense to send the rest of the request.
  514. ()
  515. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  516. try self.invalidState(
  517. "Client is closed, cannot send a message."
  518. )
  519. }
  520. }
  521. private mutating func clientCloseOutbound() throws {
  522. switch self.state {
  523. case .clientIdleServerIdle:
  524. try self.invalidState("Client not yet open.")
  525. case .clientOpenServerIdle(let state):
  526. self.state = .clientClosedServerIdle(.init(previousState: state))
  527. case .clientOpenServerOpen(let state):
  528. self.state = .clientClosedServerOpen(.init(previousState: state))
  529. case .clientOpenServerClosed(let state):
  530. self.state = .clientClosedServerClosed(.init(previousState: state))
  531. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  532. try self.invalidState("Client is already closed.")
  533. }
  534. }
  535. /// Returns the client's next request to the server.
  536. /// - Returns: The request to be made to the server.
  537. private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame {
  538. switch self.state {
  539. case .clientIdleServerIdle:
  540. try self.invalidState("Client is not open yet.")
  541. case .clientOpenServerIdle(var state):
  542. let request = try state.framer.next(compressor: state.compressor)
  543. self.state = .clientOpenServerIdle(state)
  544. return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  545. ?? .awaitMoreMessages
  546. case .clientOpenServerOpen(var state):
  547. let request = try state.framer.next(compressor: state.compressor)
  548. self.state = .clientOpenServerOpen(state)
  549. return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  550. ?? .awaitMoreMessages
  551. case .clientClosedServerIdle(var state):
  552. let request = try state.framer.next(compressor: state.compressor)
  553. self.state = .clientClosedServerIdle(state)
  554. if let request {
  555. return .sendFrame(frame: request.bytes, promise: request.promise)
  556. } else {
  557. return .noMoreMessages
  558. }
  559. case .clientClosedServerOpen(var state):
  560. let request = try state.framer.next(compressor: state.compressor)
  561. self.state = .clientClosedServerOpen(state)
  562. if let request {
  563. return .sendFrame(frame: request.bytes, promise: request.promise)
  564. } else {
  565. return .noMoreMessages
  566. }
  567. case .clientOpenServerClosed, .clientClosedServerClosed:
  568. // No point in sending any more requests if the server is closed.
  569. return .noMoreMessages
  570. }
  571. }
  572. private enum ServerHeadersValidationResult {
  573. case valid
  574. case invalid(OnMetadataReceived)
  575. }
  576. private mutating func clientValidateHeadersReceivedFromServer(
  577. _ metadata: HPACKHeaders
  578. ) -> ServerHeadersValidationResult {
  579. var httpStatus: String? {
  580. metadata.firstString(forKey: .status)
  581. }
  582. var grpcStatus: Status.Code? {
  583. metadata.firstString(forKey: .grpcStatus)
  584. .flatMap { Int($0) }
  585. .flatMap { Status.Code(rawValue: $0) }
  586. }
  587. guard httpStatus == "200" || grpcStatus != nil else {
  588. let httpStatusCode =
  589. httpStatus
  590. .flatMap { Int($0) }
  591. .map { HTTPResponseStatus(statusCode: $0) }
  592. guard let httpStatusCode else {
  593. return .invalid(
  594. .receivedStatusAndMetadata(
  595. status: .init(code: .unknown, message: "HTTP Status Code is missing."),
  596. metadata: Metadata(headers: metadata)
  597. )
  598. )
  599. }
  600. if (100 ... 199).contains(httpStatusCode.code) {
  601. // For 1xx status codes, the entire header should be skipped and a
  602. // subsequent header should be read.
  603. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  604. return .invalid(.doNothing)
  605. }
  606. // Forward the mapped status code.
  607. return .invalid(
  608. .receivedStatusAndMetadata(
  609. status: .init(
  610. code: Status.Code(httpStatusCode: httpStatusCode),
  611. message: "Unexpected non-200 HTTP Status Code."
  612. ),
  613. metadata: Metadata(headers: metadata)
  614. )
  615. )
  616. }
  617. let contentTypeHeader = metadata.first(name: GRPCHTTP2Keys.contentType.rawValue)
  618. guard contentTypeHeader.flatMap(ContentType.init) != nil else {
  619. return .invalid(
  620. .receivedStatusAndMetadata(
  621. status: .init(
  622. code: .internalError,
  623. message: "Missing \(GRPCHTTP2Keys.contentType.rawValue) header"
  624. ),
  625. metadata: Metadata(headers: metadata)
  626. )
  627. )
  628. }
  629. return .valid
  630. }
  631. private enum ProcessInboundEncodingResult {
  632. case error(OnMetadataReceived)
  633. case success(CompressionAlgorithm)
  634. }
  635. private func processInboundEncoding(
  636. headers: HPACKHeaders,
  637. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  638. ) -> ProcessInboundEncodingResult {
  639. let inboundEncoding: CompressionAlgorithm
  640. if let serverEncoding = headers.first(name: GRPCHTTP2Keys.encoding.rawValue) {
  641. guard let parsedEncoding = CompressionAlgorithm(name: serverEncoding),
  642. configuration.acceptedEncodings.contains(parsedEncoding)
  643. else {
  644. return .error(
  645. .receivedStatusAndMetadata(
  646. status: .init(
  647. code: .internalError,
  648. message:
  649. "The server picked a compression algorithm ('\(serverEncoding)') the client does not know about."
  650. ),
  651. metadata: Metadata(headers: headers)
  652. )
  653. )
  654. }
  655. inboundEncoding = parsedEncoding
  656. } else {
  657. inboundEncoding = .none
  658. }
  659. return .success(inboundEncoding)
  660. }
  661. private func validateAndReturnStatusAndMetadata(
  662. _ metadata: HPACKHeaders
  663. ) throws -> OnMetadataReceived {
  664. let rawStatusCode = metadata.firstString(forKey: .grpcStatus)
  665. guard let rawStatusCode,
  666. let intStatusCode = Int(rawStatusCode),
  667. let statusCode = Status.Code(rawValue: intStatusCode)
  668. else {
  669. let message =
  670. "Non-initial metadata must be a trailer containing a valid grpc-status"
  671. + (rawStatusCode.flatMap { "but was \($0)" } ?? "")
  672. throw RPCError(code: .unknown, message: message)
  673. }
  674. let statusMessage =
  675. metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false)
  676. .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""
  677. var convertedMetadata = Metadata(headers: metadata)
  678. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  679. convertedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  680. return .receivedStatusAndMetadata(
  681. status: Status(code: statusCode, message: statusMessage),
  682. metadata: convertedMetadata
  683. )
  684. }
  685. private mutating func clientReceive(
  686. headers: HPACKHeaders,
  687. endStream: Bool,
  688. configuration: GRPCStreamStateMachineConfiguration.ClientConfiguration
  689. ) throws -> OnMetadataReceived {
  690. switch self.state {
  691. case .clientOpenServerIdle(let state):
  692. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  693. case (.invalid(let action), true):
  694. // The headers are invalid, but the server signalled that it was
  695. // closing the stream, so close both client and server.
  696. self.state = .clientClosedServerClosed(.init(previousState: state))
  697. return action
  698. case (.invalid(let action), false):
  699. self.state = .clientClosedServerIdle(.init(previousState: state))
  700. return action
  701. case (.valid, true):
  702. // This is a trailers-only response: close server.
  703. self.state = .clientOpenServerClosed(.init(previousState: state))
  704. return try self.validateAndReturnStatusAndMetadata(headers)
  705. case (.valid, false):
  706. switch self.processInboundEncoding(headers: headers, configuration: configuration) {
  707. case .error(let failure):
  708. return failure
  709. case .success(let inboundEncoding):
  710. let decompressor = Zlib.Method(encoding: inboundEncoding)
  711. .flatMap { Zlib.Decompressor(method: $0) }
  712. let deframer = GRPCMessageDeframer(
  713. maximumPayloadSize: state.maximumPayloadSize,
  714. decompressor: decompressor
  715. )
  716. self.state = .clientOpenServerOpen(
  717. .init(
  718. previousState: state,
  719. deframer: NIOSingleStepByteToMessageProcessor(deframer),
  720. decompressor: decompressor
  721. )
  722. )
  723. return .receivedMetadata(Metadata(headers: headers))
  724. }
  725. }
  726. case .clientOpenServerOpen(let state):
  727. // This state is valid even if endStream is not set: server can send
  728. // trailing metadata without END_STREAM set, and follow it with an
  729. // empty message frame where it is set.
  730. // However, we must make sure that grpc-status is set, otherwise this
  731. // is an invalid state.
  732. if endStream {
  733. self.state = .clientOpenServerClosed(.init(previousState: state))
  734. }
  735. return try self.validateAndReturnStatusAndMetadata(headers)
  736. case .clientClosedServerIdle(let state):
  737. switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
  738. case (.invalid(let action), true):
  739. // The headers are invalid, but the server signalled that it was
  740. // closing the stream, so close the server side too.
  741. self.state = .clientClosedServerClosed(.init(previousState: state))
  742. return action
  743. case (.invalid(let action), false):
  744. // Client is already closed, so we don't need to update our state.
  745. return action
  746. case (.valid, true):
  747. // This is a trailers-only response: close server.
  748. self.state = .clientClosedServerClosed(.init(previousState: state))
  749. return try self.validateAndReturnStatusAndMetadata(headers)
  750. case (.valid, false):
  751. switch self.processInboundEncoding(headers: headers, configuration: configuration) {
  752. case .error(let failure):
  753. return failure
  754. case .success(let inboundEncoding):
  755. self.state = .clientClosedServerOpen(
  756. .init(
  757. previousState: state,
  758. decompressionAlgorithm: inboundEncoding
  759. )
  760. )
  761. return .receivedMetadata(Metadata(headers: headers))
  762. }
  763. }
  764. case .clientClosedServerOpen(let state):
  765. // This state is valid even if endStream is not set: server can send
  766. // trailing metadata without END_STREAM set, and follow it with an
  767. // empty message frame where it is set.
  768. // However, we must make sure that grpc-status is set, otherwise this
  769. // is an invalid state.
  770. if endStream {
  771. self.state = .clientClosedServerClosed(.init(previousState: state))
  772. }
  773. return try self.validateAndReturnStatusAndMetadata(headers)
  774. case .clientClosedServerClosed:
  775. // We could end up here if we received a grpc-status header in a previous
  776. // frame (which would have already close the server) and then we receive
  777. // an empty frame with EOS set.
  778. // We wouldn't want to throw in that scenario, so we just ignore it.
  779. // Note that we don't want to ignore it if EOS is not set here though, as
  780. // then it would be an invalid payload.
  781. if !endStream || headers.count > 0 {
  782. try self.invalidState(
  783. "Server is closed, nothing could have been sent."
  784. )
  785. }
  786. return .doNothing
  787. case .clientIdleServerIdle:
  788. try self.invalidState(
  789. "Server cannot have sent metadata if the client is idle."
  790. )
  791. case .clientOpenServerClosed:
  792. try self.invalidState(
  793. "Server is closed, nothing could have been sent."
  794. )
  795. }
  796. }
  797. private mutating func clientReceive(
  798. buffer: ByteBuffer,
  799. endStream: Bool
  800. ) throws -> OnBufferReceivedAction {
  801. // This is a message received by the client, from the server.
  802. switch self.state {
  803. case .clientIdleServerIdle:
  804. try self.invalidState(
  805. "Cannot have received anything from server if client is not yet open."
  806. )
  807. case .clientOpenServerIdle, .clientClosedServerIdle:
  808. try self.invalidState(
  809. "Server cannot have sent a message before sending the initial metadata."
  810. )
  811. case .clientOpenServerOpen(var state):
  812. if endStream {
  813. // This is invalid as per the protocol specification, because the server
  814. // can only close by sending trailers, not by setting EOS when sending
  815. // a message.
  816. self.state = .clientClosedServerClosed(.init(previousState: state))
  817. return .endRPCAndForwardErrorStatus(
  818. Status(
  819. code: .internalError,
  820. message: """
  821. Server sent EOS alongside a data frame, but server is only allowed \
  822. to close by sending status and trailers.
  823. """
  824. )
  825. )
  826. }
  827. try state.deframer.process(buffer: buffer) { deframedMessage in
  828. state.inboundMessageBuffer.append(deframedMessage)
  829. }
  830. self.state = .clientOpenServerOpen(state)
  831. return .readInbound
  832. case .clientClosedServerOpen(var state):
  833. if endStream {
  834. self.state = .clientClosedServerClosed(.init(previousState: state))
  835. return .endRPCAndForwardErrorStatus(
  836. Status(
  837. code: .internalError,
  838. message: """
  839. Server sent EOS alongside a data frame, but server is only allowed \
  840. to close by sending status and trailers.
  841. """
  842. )
  843. )
  844. }
  845. // The client may have sent the end stream and thus it's closed,
  846. // but the server may still be responding.
  847. // The client must have a deframer set up, so force-unwrap is okay.
  848. try state.deframer!.process(buffer: buffer) { deframedMessage in
  849. state.inboundMessageBuffer.append(deframedMessage)
  850. }
  851. self.state = .clientClosedServerOpen(state)
  852. return .readInbound
  853. case .clientOpenServerClosed, .clientClosedServerClosed:
  854. try self.invalidState(
  855. "Cannot have received anything from a closed server."
  856. )
  857. }
  858. }
  859. private mutating func clientNextInboundMessage() -> OnNextInboundMessage {
  860. switch self.state {
  861. case .clientOpenServerOpen(var state):
  862. let message = state.inboundMessageBuffer.pop()
  863. self.state = .clientOpenServerOpen(state)
  864. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  865. case .clientOpenServerClosed(var state):
  866. let message = state.inboundMessageBuffer.pop()
  867. self.state = .clientOpenServerClosed(state)
  868. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  869. case .clientClosedServerOpen(var state):
  870. let message = state.inboundMessageBuffer.pop()
  871. self.state = .clientClosedServerOpen(state)
  872. return message.map { .receiveMessage($0) } ?? .awaitMoreMessages
  873. case .clientClosedServerClosed(var state):
  874. let message = state.inboundMessageBuffer.pop()
  875. self.state = .clientClosedServerClosed(state)
  876. return message.map { .receiveMessage($0) } ?? .noMoreMessages
  877. case .clientIdleServerIdle,
  878. .clientOpenServerIdle,
  879. .clientClosedServerIdle:
  880. return .awaitMoreMessages
  881. }
  882. }
  883. private func invalidState(_ message: String, line: UInt = #line) throws -> Never {
  884. if !self.skipAssertions {
  885. assertionFailure(message, line: line)
  886. }
  887. throw RPCError(code: .internalError, message: message)
  888. }
  889. }
  890. // - MARK: Server
  891. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  892. extension GRPCStreamStateMachine {
  893. private func makeResponseHeaders(
  894. outboundEncoding: CompressionAlgorithm?,
  895. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration,
  896. customMetadata: Metadata
  897. ) -> HPACKHeaders {
  898. // Response headers always contain :status (HTTP Status 200) and content-type.
  899. // They may also contain grpc-encoding, grpc-accept-encoding, and custom metadata.
  900. var headers = HPACKHeaders()
  901. headers.reserveCapacity(4 + customMetadata.count)
  902. headers.add("200", forKey: .status)
  903. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  904. if let outboundEncoding, outboundEncoding != .none {
  905. headers.add(outboundEncoding.name, forKey: .encoding)
  906. }
  907. for acceptedEncoding in configuration.acceptedEncodings.elements.filter({ $0 != .none }) {
  908. headers.add(acceptedEncoding.name, forKey: .acceptEncoding)
  909. }
  910. for metadataPair in customMetadata {
  911. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  912. }
  913. return headers
  914. }
  915. private mutating func serverSend(
  916. metadata: Metadata,
  917. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  918. ) throws -> HPACKHeaders {
  919. // Server sends initial metadata
  920. switch self.state {
  921. case .clientOpenServerIdle(let state):
  922. self.state = .clientOpenServerOpen(
  923. .init(
  924. previousState: state,
  925. // In the case of the server, it will already have a deframer set up,
  926. // because it already knows what encoding the client is using:
  927. // it's okay to force-unwrap.
  928. deframer: state.deframer!,
  929. decompressor: state.decompressor
  930. )
  931. )
  932. return self.makeResponseHeaders(
  933. outboundEncoding: state.outboundCompression,
  934. configuration: configuration,
  935. customMetadata: metadata
  936. )
  937. case .clientClosedServerIdle(let state):
  938. self.state = .clientClosedServerOpen(.init(previousState: state))
  939. return self.makeResponseHeaders(
  940. outboundEncoding: state.outboundCompression,
  941. configuration: configuration,
  942. customMetadata: metadata
  943. )
  944. case .clientIdleServerIdle:
  945. try self.invalidState(
  946. "Client cannot be idle if server is sending initial metadata: it must have opened."
  947. )
  948. case .clientOpenServerClosed, .clientClosedServerClosed:
  949. try self.invalidState(
  950. "Server cannot send metadata if closed."
  951. )
  952. case .clientOpenServerOpen, .clientClosedServerOpen:
  953. try self.invalidState(
  954. "Server has already sent initial metadata."
  955. )
  956. }
  957. }
  958. private mutating func serverSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
  959. switch self.state {
  960. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  961. try self.invalidState(
  962. "Server must have sent initial metadata before sending a message."
  963. )
  964. case .clientOpenServerOpen(var state):
  965. state.framer.append(message, promise: promise)
  966. self.state = .clientOpenServerOpen(state)
  967. case .clientClosedServerOpen(var state):
  968. state.framer.append(message, promise: promise)
  969. self.state = .clientClosedServerOpen(state)
  970. case .clientOpenServerClosed, .clientClosedServerClosed:
  971. try self.invalidState(
  972. "Server can't send a message if it's closed."
  973. )
  974. }
  975. }
  976. private func makeTrailers(
  977. status: Status,
  978. customMetadata: Metadata?,
  979. trailersOnly: Bool
  980. ) -> HPACKHeaders {
  981. // Trailers always contain the grpc-status header, and optionally,
  982. // grpc-status-message, and custom metadata.
  983. // If it's a trailers-only response, they will also contain :status and
  984. // content-type.
  985. var headers = HPACKHeaders()
  986. let customMetadataCount = customMetadata?.count ?? 0
  987. if trailersOnly {
  988. // Reserve 4 for capacity: 3 for the required headers, and 1 for the
  989. // optional status message.
  990. headers.reserveCapacity(4 + customMetadataCount)
  991. headers.add("200", forKey: .status)
  992. headers.add(ContentType.grpc.canonicalValue, forKey: .contentType)
  993. } else {
  994. // Reserve 2 for capacity: one for the required grpc-status, and
  995. // one for the optional message.
  996. headers.reserveCapacity(2 + customMetadataCount)
  997. }
  998. headers.add(String(status.code.rawValue), forKey: .grpcStatus)
  999. if !status.message.isEmpty {
  1000. if let percentEncodedMessage = GRPCStatusMessageMarshaller.marshall(status.message) {
  1001. headers.add(percentEncodedMessage, forKey: .grpcStatusMessage)
  1002. }
  1003. }
  1004. if let customMetadata {
  1005. for metadataPair in customMetadata {
  1006. headers.add(name: metadataPair.key, value: metadataPair.value.encoded())
  1007. }
  1008. }
  1009. return headers
  1010. }
  1011. private mutating func serverSend(
  1012. status: Status,
  1013. customMetadata: Metadata
  1014. ) throws -> HPACKHeaders {
  1015. // Close the server.
  1016. switch self.state {
  1017. case .clientOpenServerOpen(let state):
  1018. self.state = .clientOpenServerClosed(.init(previousState: state))
  1019. return self.makeTrailers(
  1020. status: status,
  1021. customMetadata: customMetadata,
  1022. trailersOnly: false
  1023. )
  1024. case .clientClosedServerOpen(let state):
  1025. self.state = .clientClosedServerClosed(.init(previousState: state))
  1026. return self.makeTrailers(
  1027. status: status,
  1028. customMetadata: customMetadata,
  1029. trailersOnly: false
  1030. )
  1031. case .clientOpenServerIdle(let state):
  1032. self.state = .clientOpenServerClosed(.init(previousState: state))
  1033. return self.makeTrailers(
  1034. status: status,
  1035. customMetadata: customMetadata,
  1036. trailersOnly: true
  1037. )
  1038. case .clientClosedServerIdle(let state):
  1039. self.state = .clientClosedServerClosed(.init(previousState: state))
  1040. return self.makeTrailers(
  1041. status: status,
  1042. customMetadata: customMetadata,
  1043. trailersOnly: true
  1044. )
  1045. case .clientIdleServerIdle:
  1046. try self.invalidState(
  1047. "Server can't send status if client is idle."
  1048. )
  1049. case .clientOpenServerClosed, .clientClosedServerClosed:
  1050. try self.invalidState(
  1051. "Server can't send anything if closed."
  1052. )
  1053. }
  1054. }
  1055. mutating private func closeServerAndBuildRejectRPCAction(
  1056. currentState: GRPCStreamStateMachineState.ClientIdleServerIdleState,
  1057. endStream: Bool,
  1058. rejectWithStatus status: Status
  1059. ) -> OnMetadataReceived {
  1060. if endStream {
  1061. self.state = .clientClosedServerClosed(.init(previousState: currentState))
  1062. } else {
  1063. self.state = .clientOpenServerClosed(.init(previousState: currentState))
  1064. }
  1065. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1066. return .rejectRPC(trailers: trailers)
  1067. }
  1068. private mutating func serverReceive(
  1069. headers: HPACKHeaders,
  1070. endStream: Bool,
  1071. configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
  1072. ) throws -> OnMetadataReceived {
  1073. switch self.state {
  1074. case .clientIdleServerIdle(let state):
  1075. let contentType = headers.firstString(forKey: .contentType)
  1076. .flatMap { ContentType(value: $0) }
  1077. if contentType == nil {
  1078. self.state = .clientOpenServerClosed(.init(previousState: state))
  1079. // Respond with HTTP-level Unsupported Media Type status code.
  1080. var trailers = HPACKHeaders()
  1081. trailers.add("415", forKey: .status)
  1082. return .rejectRPC(trailers: trailers)
  1083. }
  1084. let path = headers.firstString(forKey: .path)
  1085. .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
  1086. if path == nil {
  1087. return self.closeServerAndBuildRejectRPCAction(
  1088. currentState: state,
  1089. endStream: endStream,
  1090. rejectWithStatus: Status(
  1091. code: .unimplemented,
  1092. message: "No \(GRPCHTTP2Keys.path.rawValue) header has been set."
  1093. )
  1094. )
  1095. }
  1096. let scheme = headers.firstString(forKey: .scheme)
  1097. .flatMap { Scheme(rawValue: $0) }
  1098. if scheme == nil {
  1099. return self.closeServerAndBuildRejectRPCAction(
  1100. currentState: state,
  1101. endStream: endStream,
  1102. rejectWithStatus: Status(
  1103. code: .invalidArgument,
  1104. message: ":scheme header must be present and one of \"http\" or \"https\"."
  1105. )
  1106. )
  1107. }
  1108. guard let method = headers.firstString(forKey: .method), method == "POST" else {
  1109. return self.closeServerAndBuildRejectRPCAction(
  1110. currentState: state,
  1111. endStream: endStream,
  1112. rejectWithStatus: Status(
  1113. code: .invalidArgument,
  1114. message: ":method header is expected to be present and have a value of \"POST\"."
  1115. )
  1116. )
  1117. }
  1118. guard let te = headers.firstString(forKey: .te), te == "trailers" else {
  1119. return self.closeServerAndBuildRejectRPCAction(
  1120. currentState: state,
  1121. endStream: endStream,
  1122. rejectWithStatus: Status(
  1123. code: .invalidArgument,
  1124. message: "\"te\" header is expected to be present and have a value of \"trailers\"."
  1125. )
  1126. )
  1127. }
  1128. // Firstly, find out if we support the client's chosen encoding, and reject
  1129. // the RPC if we don't.
  1130. let inboundEncoding: CompressionAlgorithm
  1131. let encodingValues = headers.values(
  1132. forHeader: GRPCHTTP2Keys.encoding.rawValue,
  1133. canonicalForm: true
  1134. )
  1135. var encodingValuesIterator = encodingValues.makeIterator()
  1136. if let rawEncoding = encodingValuesIterator.next() {
  1137. guard encodingValuesIterator.next() == nil else {
  1138. let status = Status(
  1139. code: .internalError,
  1140. message: "\(GRPCHTTP2Keys.encoding) must contain no more than one value."
  1141. )
  1142. let trailers = self.makeTrailers(status: status, customMetadata: nil, trailersOnly: true)
  1143. return .rejectRPC(trailers: trailers)
  1144. }
  1145. guard let clientEncoding = CompressionAlgorithm(name: rawEncoding),
  1146. configuration.acceptedEncodings.contains(clientEncoding)
  1147. else {
  1148. let statusMessage = """
  1149. \(rawEncoding) compression is not supported; \
  1150. supported algorithms are listed in grpc-accept-encoding
  1151. """
  1152. var customMetadata = Metadata()
  1153. customMetadata.reserveCapacity(configuration.acceptedEncodings.count)
  1154. for acceptedEncoding in configuration.acceptedEncodings.elements {
  1155. customMetadata.addString(
  1156. acceptedEncoding.name,
  1157. forKey: GRPCHTTP2Keys.acceptEncoding.rawValue
  1158. )
  1159. }
  1160. let trailers = self.makeTrailers(
  1161. status: Status(code: .unimplemented, message: statusMessage),
  1162. customMetadata: customMetadata,
  1163. trailersOnly: true
  1164. )
  1165. return .rejectRPC(trailers: trailers)
  1166. }
  1167. // Server supports client's encoding.
  1168. inboundEncoding = clientEncoding
  1169. } else {
  1170. inboundEncoding = .none
  1171. }
  1172. // Secondly, find a compatible encoding the server can use to compress outbound messages,
  1173. // based on the encodings the client has advertised.
  1174. var outboundEncoding: CompressionAlgorithm = .none
  1175. let clientAdvertisedEncodings = headers.values(
  1176. forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue,
  1177. canonicalForm: true
  1178. )
  1179. // Find the preferred encoding and use it to compress responses.
  1180. // If it's identity, just skip it altogether, since we won't be
  1181. // compressing.
  1182. for clientAdvertisedEncoding in clientAdvertisedEncodings {
  1183. if let algorithm = CompressionAlgorithm(name: clientAdvertisedEncoding),
  1184. configuration.acceptedEncodings.contains(algorithm)
  1185. {
  1186. outboundEncoding = algorithm
  1187. break
  1188. }
  1189. }
  1190. if endStream {
  1191. self.state = .clientClosedServerIdle(
  1192. .init(
  1193. previousState: state,
  1194. compressionAlgorithm: outboundEncoding
  1195. )
  1196. )
  1197. } else {
  1198. let compressor = Zlib.Method(encoding: outboundEncoding)
  1199. .flatMap { Zlib.Compressor(method: $0) }
  1200. let decompressor = Zlib.Method(encoding: inboundEncoding)
  1201. .flatMap { Zlib.Decompressor(method: $0) }
  1202. let deframer = GRPCMessageDeframer(
  1203. maximumPayloadSize: state.maximumPayloadSize,
  1204. decompressor: decompressor
  1205. )
  1206. self.state = .clientOpenServerIdle(
  1207. .init(
  1208. previousState: state,
  1209. compressor: compressor,
  1210. framer: GRPCMessageFramer(),
  1211. decompressor: decompressor,
  1212. deframer: NIOSingleStepByteToMessageProcessor(deframer)
  1213. )
  1214. )
  1215. }
  1216. return .receivedMetadata(Metadata(headers: headers))
  1217. case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
  1218. try self.invalidState(
  1219. "Client shouldn't have sent metadata twice."
  1220. )
  1221. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1222. try self.invalidState(
  1223. "Client can't have sent metadata if closed."
  1224. )
  1225. }
  1226. }
  1227. private mutating func serverReceive(
  1228. buffer: ByteBuffer,
  1229. endStream: Bool
  1230. ) throws -> OnBufferReceivedAction {
  1231. switch self.state {
  1232. case .clientIdleServerIdle:
  1233. try self.invalidState(
  1234. "Can't have received a message if client is idle."
  1235. )
  1236. case .clientOpenServerIdle(var state):
  1237. // Deframer must be present on the server side, as we know the decompression
  1238. // algorithm from the moment the client opens.
  1239. try state.deframer!.process(buffer: buffer) { deframedMessage in
  1240. state.inboundMessageBuffer.append(deframedMessage)
  1241. }
  1242. if endStream {
  1243. self.state = .clientClosedServerIdle(.init(previousState: state))
  1244. } else {
  1245. self.state = .clientOpenServerIdle(state)
  1246. }
  1247. case .clientOpenServerOpen(var state):
  1248. try state.deframer.process(buffer: buffer) { deframedMessage in
  1249. state.inboundMessageBuffer.append(deframedMessage)
  1250. }
  1251. if endStream {
  1252. self.state = .clientClosedServerOpen(.init(previousState: state))
  1253. } else {
  1254. self.state = .clientOpenServerOpen(state)
  1255. }
  1256. case .clientOpenServerClosed(let state):
  1257. // Client is not done sending request, but server has already closed.
  1258. // Ignore the rest of the request: do nothing, unless endStream is set,
  1259. // in which case close the client.
  1260. if endStream {
  1261. self.state = .clientClosedServerClosed(.init(previousState: state))
  1262. }
  1263. case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
  1264. try self.invalidState(
  1265. "Client can't send a message if closed."
  1266. )
  1267. }
  1268. return .readInbound
  1269. }
  1270. private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
  1271. switch self.state {
  1272. case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
  1273. try self.invalidState("Server is not open yet.")
  1274. case .clientOpenServerOpen(var state):
  1275. let response = try state.framer.next(compressor: state.compressor)
  1276. self.state = .clientOpenServerOpen(state)
  1277. return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  1278. ?? .awaitMoreMessages
  1279. case .clientClosedServerOpen(var state):
  1280. let response = try state.framer.next(compressor: state.compressor)
  1281. self.state = .clientClosedServerOpen(state)
  1282. return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
  1283. ?? .awaitMoreMessages
  1284. case .clientOpenServerClosed(var state):
  1285. let response = try state.framer?.next(compressor: state.compressor)
  1286. self.state = .clientOpenServerClosed(state)
  1287. if let response {
  1288. return .sendFrame(frame: response.bytes, promise: response.promise)
  1289. } else {
  1290. return .noMoreMessages
  1291. }
  1292. case .clientClosedServerClosed(var state):
  1293. let response = try state.framer?.next(compressor: state.compressor)
  1294. self.state = .clientClosedServerClosed(state)
  1295. if let response {
  1296. return .sendFrame(frame: response.bytes, promise: response.promise)
  1297. } else {
  1298. return .noMoreMessages
  1299. }
  1300. }
  1301. }
  1302. private mutating func serverNextInboundMessage() -> OnNextInboundMessage {
  1303. switch self.state {
  1304. case .clientOpenServerIdle(var state):
  1305. let request = state.inboundMessageBuffer.pop()
  1306. self.state = .clientOpenServerIdle(state)
  1307. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1308. case .clientOpenServerOpen(var state):
  1309. let request = state.inboundMessageBuffer.pop()
  1310. self.state = .clientOpenServerOpen(state)
  1311. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1312. case .clientOpenServerClosed(var state):
  1313. let request = state.inboundMessageBuffer.pop()
  1314. self.state = .clientOpenServerClosed(state)
  1315. return request.map { .receiveMessage($0) } ?? .awaitMoreMessages
  1316. case .clientClosedServerOpen(var state):
  1317. let request = state.inboundMessageBuffer.pop()
  1318. self.state = .clientClosedServerOpen(state)
  1319. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1320. case .clientClosedServerClosed(var state):
  1321. let request = state.inboundMessageBuffer.pop()
  1322. self.state = .clientClosedServerClosed(state)
  1323. return request.map { .receiveMessage($0) } ?? .noMoreMessages
  1324. case .clientClosedServerIdle:
  1325. return .noMoreMessages
  1326. case .clientIdleServerIdle:
  1327. return .awaitMoreMessages
  1328. }
  1329. }
  1330. }
  1331. extension MethodDescriptor {
  1332. init?(fullyQualifiedMethod: String) {
  1333. let split = fullyQualifiedMethod.split(separator: "/")
  1334. guard split.count == 2 else {
  1335. return nil
  1336. }
  1337. self.init(service: String(split[0]), method: String(split[1]))
  1338. }
  1339. }
  1340. internal enum GRPCHTTP2Keys: String {
  1341. case path = ":path"
  1342. case contentType = "content-type"
  1343. case encoding = "grpc-encoding"
  1344. case acceptEncoding = "grpc-accept-encoding"
  1345. case scheme = ":scheme"
  1346. case method = ":method"
  1347. case te = "te"
  1348. case status = ":status"
  1349. case grpcStatus = "grpc-status"
  1350. case grpcStatusMessage = "grpc-status-message"
  1351. }
  1352. extension HPACKHeaders {
  1353. internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
  1354. self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
  1355. .map {
  1356. String($0)
  1357. }
  1358. }
  1359. internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
  1360. self.add(name: key.rawValue, value: value)
  1361. }
  1362. }
  1363. extension Zlib.Method {
  1364. init?(encoding: CompressionAlgorithm) {
  1365. switch encoding {
  1366. case .none:
  1367. return nil
  1368. case .deflate:
  1369. self = .deflate
  1370. case .gzip:
  1371. self = .gzip
  1372. default:
  1373. return nil
  1374. }
  1375. }
  1376. }
  1377. extension Metadata {
  1378. init(headers: HPACKHeaders) {
  1379. var metadata = Metadata()
  1380. metadata.reserveCapacity(headers.count)
  1381. for header in headers {
  1382. if header.name.hasSuffix("-bin") {
  1383. do {
  1384. let decodedBinary = try header.value.base64Decoded()
  1385. metadata.addBinary(decodedBinary, forKey: header.name)
  1386. } catch {
  1387. metadata.addString(header.value, forKey: header.name)
  1388. }
  1389. } else {
  1390. metadata.addString(header.value, forKey: header.name)
  1391. }
  1392. }
  1393. self = metadata
  1394. }
  1395. }
  1396. extension Status.Code {
  1397. // See https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
  1398. init(httpStatusCode: HTTPResponseStatus) {
  1399. switch httpStatusCode {
  1400. case .badRequest:
  1401. self = .internalError
  1402. case .unauthorized:
  1403. self = .unauthenticated
  1404. case .forbidden:
  1405. self = .permissionDenied
  1406. case .notFound:
  1407. self = .unimplemented
  1408. case .tooManyRequests, .badGateway, .serviceUnavailable, .gatewayTimeout:
  1409. self = .unavailable
  1410. default:
  1411. self = .unknown
  1412. }
  1413. }
  1414. }