GRPCStreamStateMachineTests.swift 79 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOHPACK
  19. import XCTest
  20. @testable import GRPCHTTP2Core
  21. private enum TargetStateMachineState: CaseIterable {
  22. case clientIdleServerIdle
  23. case clientOpenServerIdle
  24. case clientOpenServerOpen
  25. case clientOpenServerClosed
  26. case clientClosedServerIdle
  27. case clientClosedServerOpen
  28. case clientClosedServerClosed
  29. }
  30. extension HPACKHeaders {
  31. // Client
  32. fileprivate static let clientInitialMetadata: Self = [
  33. GRPCHTTP2Keys.path.rawValue: "test/test",
  34. GRPCHTTP2Keys.scheme.rawValue: "http",
  35. GRPCHTTP2Keys.method.rawValue: "POST",
  36. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  37. GRPCHTTP2Keys.te.rawValue: "trailers",
  38. ]
  39. fileprivate static let clientInitialMetadataWithDeflateCompression: Self = [
  40. GRPCHTTP2Keys.path.rawValue: "test/test",
  41. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  42. GRPCHTTP2Keys.method.rawValue: "POST",
  43. GRPCHTTP2Keys.scheme.rawValue: "https",
  44. GRPCHTTP2Keys.te.rawValue: "te",
  45. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  46. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  47. ]
  48. fileprivate static let clientInitialMetadataWithGzipCompression: Self = [
  49. GRPCHTTP2Keys.path.rawValue: "test/test",
  50. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  51. GRPCHTTP2Keys.method.rawValue: "POST",
  52. GRPCHTTP2Keys.scheme.rawValue: "https",
  53. GRPCHTTP2Keys.te.rawValue: "te",
  54. GRPCHTTP2Keys.acceptEncoding.rawValue: "gzip",
  55. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  56. ]
  57. fileprivate static let receivedWithoutContentType: Self = [
  58. GRPCHTTP2Keys.path.rawValue: "test/test"
  59. ]
  60. fileprivate static let receivedWithInvalidContentType: Self = [
  61. GRPCHTTP2Keys.path.rawValue: "test/test",
  62. GRPCHTTP2Keys.contentType.rawValue: "invalid/invalid",
  63. ]
  64. fileprivate static let receivedWithoutEndpoint: Self = [
  65. GRPCHTTP2Keys.contentType.rawValue: "application/grpc"
  66. ]
  67. // Server
  68. fileprivate static let serverInitialMetadata: Self = [
  69. GRPCHTTP2Keys.status.rawValue: "200",
  70. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  71. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  72. ]
  73. fileprivate static let serverInitialMetadataWithDeflateCompression: Self = [
  74. GRPCHTTP2Keys.status.rawValue: "200",
  75. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  76. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  77. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  78. ]
  79. fileprivate static let serverTrailers: Self = [
  80. GRPCHTTP2Keys.status.rawValue: "200",
  81. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  82. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  83. ]
  84. }
  85. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  86. final class GRPCStreamClientStateMachineTests: XCTestCase {
  87. private func makeClientStateMachine(
  88. targetState: TargetStateMachineState,
  89. compressionEnabled: Bool = false
  90. ) -> GRPCStreamStateMachine {
  91. var stateMachine = GRPCStreamStateMachine(
  92. configuration: .client(
  93. .init(
  94. methodDescriptor: .init(service: "test", method: "test"),
  95. scheme: .http,
  96. outboundEncoding: compressionEnabled ? .deflate : .identity,
  97. acceptedEncodings: [.deflate]
  98. )
  99. ),
  100. maximumPayloadSize: 100,
  101. skipAssertions: true
  102. )
  103. let serverMetadata: HPACKHeaders =
  104. compressionEnabled ? .serverInitialMetadataWithDeflateCompression : .serverInitialMetadata
  105. switch targetState {
  106. case .clientIdleServerIdle:
  107. break
  108. case .clientOpenServerIdle:
  109. // Open client
  110. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  111. case .clientOpenServerOpen:
  112. // Open client
  113. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  114. // Open server
  115. XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false))
  116. case .clientOpenServerClosed:
  117. // Open client
  118. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  119. // Open server
  120. XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false))
  121. // Close server
  122. XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true))
  123. case .clientClosedServerIdle:
  124. // Open client
  125. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  126. // Close client
  127. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
  128. case .clientClosedServerOpen:
  129. // Open client
  130. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  131. // Open server
  132. XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false))
  133. // Close client
  134. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
  135. case .clientClosedServerClosed:
  136. // Open client
  137. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  138. // Open server
  139. XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false))
  140. // Close client
  141. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
  142. // Close server
  143. XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true))
  144. }
  145. return stateMachine
  146. }
  147. // - MARK: Send Metadata
  148. func testSendMetadataWhenClientIdleAndServerIdle() throws {
  149. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  150. XCTAssertNoThrow(try stateMachine.send(metadata: []))
  151. }
  152. func testSendMetadataWhenClientAlreadyOpen() throws {
  153. for targetState in [
  154. TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed,
  155. ] {
  156. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  157. // Try sending metadata again: should throw
  158. XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) {
  159. error in
  160. XCTAssertEqual(error.code, .internalError)
  161. XCTAssertEqual(error.message, "Client is already open: shouldn't be sending metadata.")
  162. }
  163. }
  164. }
  165. func testSendMetadataWhenClientAlreadyClosed() throws {
  166. for targetState in [
  167. TargetStateMachineState.clientClosedServerIdle, .clientClosedServerOpen,
  168. .clientClosedServerClosed,
  169. ] {
  170. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  171. // Try sending metadata again: should throw
  172. XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) {
  173. error in
  174. XCTAssertEqual(error.code, .internalError)
  175. XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
  176. }
  177. }
  178. }
  179. // - MARK: Send Message
  180. func testSendMessageWhenClientIdleAndServerIdle() {
  181. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  182. // Try to send a message without opening (i.e. without sending initial metadata)
  183. XCTAssertThrowsError(
  184. ofType: RPCError.self,
  185. try stateMachine.send(message: [], endStream: false)
  186. ) { error in
  187. XCTAssertEqual(error.code, .internalError)
  188. XCTAssertEqual(error.message, "Client not yet open.")
  189. }
  190. }
  191. func testSendMessageWhenClientOpen() {
  192. for targetState in [
  193. TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed,
  194. ] {
  195. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  196. // Now send a message
  197. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false))
  198. }
  199. }
  200. func testSendMessageWhenClientClosed() {
  201. for targetState in [
  202. TargetStateMachineState.clientClosedServerIdle, .clientClosedServerOpen,
  203. .clientClosedServerClosed,
  204. ] {
  205. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  206. // Try sending another message: it should fail
  207. XCTAssertThrowsError(
  208. ofType: RPCError.self,
  209. try stateMachine.send(message: [], endStream: false)
  210. ) { error in
  211. XCTAssertEqual(error.code, .internalError)
  212. XCTAssertEqual(error.message, "Client is closed, cannot send a message.")
  213. }
  214. }
  215. }
  216. // - MARK: Send Status and Trailers
  217. func testSendStatusAndTrailers() {
  218. for targetState in TargetStateMachineState.allCases {
  219. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  220. // This operation is never allowed on the client.
  221. XCTAssertThrowsError(
  222. ofType: RPCError.self,
  223. try stateMachine.send(
  224. status: Status(code: .ok, message: ""),
  225. metadata: .init()
  226. )
  227. ) { error in
  228. XCTAssertEqual(error.code, .internalError)
  229. XCTAssertEqual(error.message, "Client cannot send status and trailer.")
  230. }
  231. }
  232. }
  233. // - MARK: Receive initial metadata
  234. func testReceiveInitialMetadataWhenClientIdleAndServerIdle() {
  235. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  236. XCTAssertThrowsError(
  237. ofType: RPCError.self,
  238. try stateMachine.receive(metadata: .init(), endStream: false)
  239. ) { error in
  240. XCTAssertEqual(error.code, .internalError)
  241. XCTAssertEqual(error.message, "Server cannot have sent metadata if the client is idle.")
  242. }
  243. }
  244. func testReceiveInvalidInitialMetadataWhenServerIdle() throws {
  245. for targetState in [
  246. TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle,
  247. ] {
  248. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  249. // Receive metadata with unexpected non-200 status code
  250. let action = try stateMachine.receive(
  251. metadata: [GRPCHTTP2Keys.status.rawValue: "300"],
  252. endStream: false
  253. )
  254. XCTAssertEqual(
  255. action,
  256. .receivedStatusAndMetadata(
  257. status: .init(code: .unknown, message: "Unexpected non-200 HTTP Status Code."),
  258. metadata: [":status": "300"]
  259. )
  260. )
  261. }
  262. }
  263. func testReceiveInitialMetadataWhenServerIdle() throws {
  264. for targetState in [
  265. TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle,
  266. ] {
  267. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  268. // Receive metadata = open server
  269. let action = try stateMachine.receive(
  270. metadata: [
  271. GRPCHTTP2Keys.status.rawValue: "200",
  272. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  273. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  274. "custom": "123",
  275. "custom-bin": String(base64Encoding: [42, 43, 44]),
  276. ],
  277. endStream: false
  278. )
  279. var expectedMetadata: Metadata = [
  280. ":status": "200",
  281. "content-type": "application/grpc",
  282. "grpc-encoding": "deflate",
  283. "custom": "123",
  284. ]
  285. expectedMetadata.addBinary([42, 43, 44], forKey: "custom-bin")
  286. XCTAssertEqual(action, .receivedMetadata(expectedMetadata))
  287. }
  288. }
  289. func testReceiveInitialMetadataWhenServerOpen() throws {
  290. for targetState in [
  291. TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen,
  292. ] {
  293. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  294. // Receiving initial metadata again should throw if grpc-status is not present.
  295. XCTAssertThrowsError(
  296. ofType: RPCError.self,
  297. try stateMachine.receive(
  298. metadata: [
  299. GRPCHTTP2Keys.status.rawValue: "200",
  300. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  301. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  302. "custom": "123",
  303. "custom-bin": String(base64Encoding: [42, 43, 44]),
  304. ],
  305. endStream: false
  306. )
  307. ) { error in
  308. XCTAssertEqual(error.code, .unknown)
  309. XCTAssertEqual(
  310. error.message,
  311. "Non-initial metadata must be a trailer containing a valid grpc-status"
  312. )
  313. }
  314. // Now make sure everything works well if we include grpc-status
  315. let action = try stateMachine.receive(
  316. metadata: [
  317. GRPCHTTP2Keys.status.rawValue: "200",
  318. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.ok.rawValue),
  319. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  320. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  321. "custom": "123",
  322. "custom-bin": String(base64Encoding: [42, 43, 44]),
  323. ],
  324. endStream: false
  325. )
  326. var expectedMetadata: Metadata = [
  327. ":status": "200",
  328. "content-type": "application/grpc",
  329. "grpc-encoding": "deflate",
  330. "custom": "123",
  331. ]
  332. expectedMetadata.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  333. expectedMetadata.addBinary([42, 43, 44], forKey: "custom-bin")
  334. XCTAssertEqual(
  335. action,
  336. .receivedStatusAndMetadata(
  337. status: Status(code: .ok, message: ""),
  338. metadata: expectedMetadata
  339. )
  340. )
  341. }
  342. }
  343. func testReceiveInitialMetadataWhenServerClosed() {
  344. for targetState in [TargetStateMachineState.clientOpenServerClosed, .clientClosedServerClosed] {
  345. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  346. XCTAssertThrowsError(
  347. ofType: RPCError.self,
  348. try stateMachine.receive(metadata: .init(), endStream: false)
  349. ) { error in
  350. XCTAssertEqual(error.code, .internalError)
  351. XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.")
  352. }
  353. }
  354. }
  355. // - MARK: Receive end trailers
  356. func testReceiveEndTrailerWhenClientIdleAndServerIdle() {
  357. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  358. // Receive an end trailer
  359. XCTAssertThrowsError(
  360. ofType: RPCError.self,
  361. try stateMachine.receive(metadata: .init(), endStream: true)
  362. ) { error in
  363. XCTAssertEqual(error.code, .internalError)
  364. XCTAssertEqual(error.message, "Server cannot have sent metadata if the client is idle.")
  365. }
  366. }
  367. func testReceiveEndTrailerWhenClientOpenAndServerIdle() throws {
  368. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle)
  369. // Receive a trailers-only response
  370. let trailersOnlyResponse: HPACKHeaders = [
  371. GRPCHTTP2Keys.status.rawValue: "200",
  372. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  373. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue),
  374. GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall(
  375. "Some status message"
  376. )!,
  377. "custom-key": "custom-value",
  378. ]
  379. let trailers = try stateMachine.receive(metadata: trailersOnlyResponse, endStream: true)
  380. switch trailers {
  381. case .receivedStatusAndMetadata(let status, let metadata):
  382. XCTAssertEqual(status, Status(code: .internalError, message: "Some status message"))
  383. XCTAssertEqual(
  384. metadata,
  385. [
  386. ":status": "200",
  387. "content-type": "application/grpc",
  388. "custom-key": "custom-value",
  389. ]
  390. )
  391. case .receivedMetadata, .doNothing, .rejectRPC:
  392. XCTFail("Expected .receivedStatusAndMetadata")
  393. }
  394. }
  395. func testReceiveEndTrailerWhenServerOpen() throws {
  396. for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] {
  397. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  398. // Receive an end trailer
  399. let action = try stateMachine.receive(
  400. metadata: [
  401. GRPCHTTP2Keys.status.rawValue: "200",
  402. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.ok.rawValue),
  403. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  404. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  405. "custom": "123",
  406. ],
  407. endStream: true
  408. )
  409. let expectedMetadata: Metadata = [
  410. ":status": "200",
  411. "content-type": "application/grpc",
  412. "grpc-encoding": "deflate",
  413. "custom": "123",
  414. ]
  415. XCTAssertEqual(
  416. action,
  417. .receivedStatusAndMetadata(
  418. status: .init(code: .ok, message: ""),
  419. metadata: expectedMetadata
  420. )
  421. )
  422. }
  423. }
  424. func testReceiveEndTrailerWhenClientOpenAndServerClosed() {
  425. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed)
  426. // Receive another end trailer
  427. XCTAssertThrowsError(
  428. ofType: RPCError.self,
  429. try stateMachine.receive(metadata: .init(), endStream: true)
  430. ) { error in
  431. XCTAssertEqual(error.code, .internalError)
  432. XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.")
  433. }
  434. }
  435. func testReceiveEndTrailerWhenClientClosedAndServerIdle() throws {
  436. var stateMachine = self.makeClientStateMachine(targetState: .clientClosedServerIdle)
  437. // Server sends a trailers-only response
  438. let trailersOnlyResponse: HPACKHeaders = [
  439. GRPCHTTP2Keys.status.rawValue: "200",
  440. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  441. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue),
  442. GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall(
  443. "Some status message"
  444. )!,
  445. "custom-key": "custom-value",
  446. ]
  447. let trailers = try stateMachine.receive(metadata: trailersOnlyResponse, endStream: true)
  448. switch trailers {
  449. case .receivedStatusAndMetadata(let status, let metadata):
  450. XCTAssertEqual(status, Status(code: .internalError, message: "Some status message"))
  451. XCTAssertEqual(
  452. metadata,
  453. [
  454. ":status": "200",
  455. "content-type": "application/grpc",
  456. "custom-key": "custom-value",
  457. ]
  458. )
  459. case .receivedMetadata, .doNothing, .rejectRPC:
  460. XCTFail("Expected .receivedStatusAndMetadata")
  461. }
  462. }
  463. func testReceiveEndTrailerWhenClientClosedAndServerClosed() {
  464. var stateMachine = self.makeClientStateMachine(targetState: .clientClosedServerClosed)
  465. // Close server again (endStream = true) and assert we don't throw.
  466. // This can happen if the previous close was caused by a grpc-status header
  467. // and then the server sends an empty frame with EOS set.
  468. XCTAssertEqual(try stateMachine.receive(metadata: .init(), endStream: true), .doNothing)
  469. }
  470. // - MARK: Receive message
  471. func testReceiveMessageWhenClientIdleAndServerIdle() {
  472. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  473. XCTAssertThrowsError(
  474. ofType: RPCError.self,
  475. try stateMachine.receive(message: .init(), endStream: false)
  476. ) { error in
  477. XCTAssertEqual(error.code, .internalError)
  478. XCTAssertEqual(
  479. error.message,
  480. "Cannot have received anything from server if client is not yet open."
  481. )
  482. }
  483. }
  484. func testReceiveMessageWhenServerIdle() {
  485. for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle] {
  486. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  487. XCTAssertThrowsError(
  488. ofType: RPCError.self,
  489. try stateMachine.receive(message: .init(), endStream: false)
  490. ) { error in
  491. XCTAssertEqual(error.code, .internalError)
  492. XCTAssertEqual(
  493. error.message,
  494. "Server cannot have sent a message before sending the initial metadata."
  495. )
  496. }
  497. }
  498. }
  499. func testReceiveMessageWhenServerOpen() throws {
  500. for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] {
  501. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  502. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false))
  503. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  504. }
  505. }
  506. func testReceiveMessageWhenServerClosed() {
  507. for targetState in [TargetStateMachineState.clientOpenServerClosed, .clientClosedServerClosed] {
  508. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  509. XCTAssertThrowsError(
  510. ofType: RPCError.self,
  511. try stateMachine.receive(message: .init(), endStream: false)
  512. ) { error in
  513. XCTAssertEqual(error.code, .internalError)
  514. XCTAssertEqual(error.message, "Cannot have received anything from a closed server.")
  515. }
  516. }
  517. }
  518. // - MARK: Next outbound message
  519. func testNextOutboundMessageWhenClientIdleAndServerIdle() {
  520. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  521. XCTAssertThrowsError(
  522. ofType: RPCError.self,
  523. try stateMachine.nextOutboundMessage()
  524. ) { error in
  525. XCTAssertEqual(error.code, .internalError)
  526. XCTAssertEqual(error.message, "Client is not open yet.")
  527. }
  528. }
  529. func testNextOutboundMessageWhenClientOpenAndServerOpenOrIdle() throws {
  530. for targetState in [TargetStateMachineState.clientOpenServerIdle, .clientOpenServerOpen] {
  531. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  532. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  533. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  534. let expectedBytes: [UInt8] = [
  535. 0, // compression flag: unset
  536. 0, 0, 0, 2, // message length: 2 bytes
  537. 42, 42, // original message
  538. ]
  539. XCTAssertEqual(
  540. try stateMachine.nextOutboundMessage(),
  541. .sendMessage(ByteBuffer(bytes: expectedBytes))
  542. )
  543. // And then make sure that nothing else is returned anymore
  544. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  545. }
  546. }
  547. func testNextOutboundMessageWhenClientOpenAndServerIdle_WithCompression() throws {
  548. var stateMachine = self.makeClientStateMachine(
  549. targetState: .clientOpenServerIdle,
  550. compressionEnabled: true
  551. )
  552. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  553. let originalMessage = [UInt8]([42, 42, 43, 43])
  554. XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false))
  555. let request = try stateMachine.nextOutboundMessage()
  556. let framedMessage = try self.frameMessage(originalMessage, compress: true)
  557. XCTAssertEqual(request, .sendMessage(framedMessage))
  558. }
  559. func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws {
  560. var stateMachine = self.makeClientStateMachine(
  561. targetState: .clientOpenServerOpen,
  562. compressionEnabled: true
  563. )
  564. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  565. let originalMessage = [UInt8]([42, 42, 43, 43])
  566. XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false))
  567. let request = try stateMachine.nextOutboundMessage()
  568. let framedMessage = try self.frameMessage(originalMessage, compress: true)
  569. XCTAssertEqual(request, .sendMessage(framedMessage))
  570. }
  571. func testNextOutboundMessageWhenClientOpenAndServerClosed() throws {
  572. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerClosed)
  573. // No more messages to send
  574. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  575. // Queue a message, but assert the action is .noMoreMessages nevertheless,
  576. // because the server is closed.
  577. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  578. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  579. }
  580. func testNextOutboundMessageWhenClientClosedAndServerIdle() throws {
  581. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle)
  582. // Send a message and close client
  583. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true))
  584. // Make sure that getting the next outbound message _does_ return the message
  585. // we have enqueued.
  586. let request = try stateMachine.nextOutboundMessage()
  587. let expectedBytes: [UInt8] = [
  588. 0, // compression flag: unset
  589. 0, 0, 0, 2, // message length: 2 bytes
  590. 42, 42, // original message
  591. ]
  592. XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes)))
  593. // And then make sure that nothing else is returned anymore
  594. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  595. }
  596. func testNextOutboundMessageWhenClientClosedAndServerOpen() throws {
  597. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
  598. // Send a message and close client
  599. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true))
  600. // Make sure that getting the next outbound message _does_ return the message
  601. // we have enqueued.
  602. let request = try stateMachine.nextOutboundMessage()
  603. let expectedBytes: [UInt8] = [
  604. 0, // compression flag: unset
  605. 0, 0, 0, 2, // message length: 2 bytes
  606. 42, 42, // original message
  607. ]
  608. XCTAssertEqual(request, .sendMessage(ByteBuffer(bytes: expectedBytes)))
  609. // And then make sure that nothing else is returned anymore
  610. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  611. }
  612. func testNextOutboundMessageWhenClientClosedAndServerClosed() throws {
  613. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
  614. // Send a message
  615. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  616. // Close server
  617. XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true))
  618. // Close client
  619. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
  620. // Even though we have enqueued a message, don't send it, because the server
  621. // is closed.
  622. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  623. }
  624. // - MARK: Next inbound message
  625. func testNextInboundMessageWhenServerIdle() {
  626. for targetState in [
  627. TargetStateMachineState.clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle,
  628. ] {
  629. var stateMachine = self.makeClientStateMachine(targetState: targetState)
  630. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  631. }
  632. }
  633. func testNextInboundMessageWhenClientOpenAndServerOpen() throws {
  634. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
  635. let receivedBytes = ByteBuffer(bytes: [
  636. 0, // compression flag: unset
  637. 0, 0, 0, 2, // message length: 2 bytes
  638. 42, 42, // original message
  639. ])
  640. try stateMachine.receive(message: receivedBytes, endStream: false)
  641. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  642. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  643. }
  644. func testNextInboundMessageWhenClientOpenAndServerOpen_WithCompression() throws {
  645. var stateMachine = self.makeClientStateMachine(
  646. targetState: .clientOpenServerOpen,
  647. compressionEnabled: true
  648. )
  649. let originalMessage = [UInt8]([42, 42, 43, 43])
  650. let receivedBytes = try self.frameMessage(originalMessage, compress: true)
  651. try stateMachine.receive(message: receivedBytes, endStream: false)
  652. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage))
  653. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  654. }
  655. func testNextInboundMessageWhenClientOpenAndServerClosed() throws {
  656. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
  657. let receivedBytes = ByteBuffer(bytes: [
  658. 0, // compression flag: unset
  659. 0, 0, 0, 2, // message length: 2 bytes
  660. 42, 42, // original message
  661. ])
  662. try stateMachine.receive(message: receivedBytes, endStream: false)
  663. // Close server
  664. XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true))
  665. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  666. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  667. }
  668. func testNextInboundMessageWhenClientClosedAndServerOpen() throws {
  669. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
  670. let receivedBytes = ByteBuffer(bytes: [
  671. 0, // compression flag: unset
  672. 0, 0, 0, 2, // message length: 2 bytes
  673. 42, 42, // original message
  674. ])
  675. try stateMachine.receive(message: receivedBytes, endStream: false)
  676. // Close client
  677. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
  678. // Even though the client is closed, because it received a message while open,
  679. // we must get the message now.
  680. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  681. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  682. }
  683. func testNextInboundMessageWhenClientClosedAndServerClosed() throws {
  684. var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen)
  685. let receivedBytes = ByteBuffer(bytes: [
  686. 0, // compression flag: unset
  687. 0, 0, 0, 2, // message length: 2 bytes
  688. 42, 42, // original message
  689. ])
  690. try stateMachine.receive(message: receivedBytes, endStream: false)
  691. // Close server
  692. XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true))
  693. // Close client
  694. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true))
  695. // Even though the client is closed, because it received a message while open,
  696. // we must get the message now.
  697. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  698. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  699. }
  700. // - MARK: Common paths
  701. func testNormalFlow() throws {
  702. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  703. // Client sends metadata
  704. let clientInitialMetadata = try stateMachine.send(metadata: .init())
  705. XCTAssertEqual(
  706. clientInitialMetadata,
  707. [
  708. GRPCHTTP2Keys.path.rawValue: "test/test",
  709. GRPCHTTP2Keys.scheme.rawValue: "http",
  710. GRPCHTTP2Keys.method.rawValue: "POST",
  711. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  712. GRPCHTTP2Keys.te.rawValue: "trailers",
  713. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  714. ]
  715. )
  716. // Server sends initial metadata
  717. let serverInitialHeadersAction = try stateMachine.receive(
  718. metadata: .serverInitialMetadata,
  719. endStream: false
  720. )
  721. XCTAssertEqual(
  722. serverInitialHeadersAction,
  723. .receivedMetadata([
  724. ":status": "200",
  725. "content-type": "application/grpc",
  726. "grpc-accept-encoding": "deflate",
  727. ])
  728. )
  729. // Client sends messages
  730. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  731. let message = [UInt8]([1, 2, 3, 4])
  732. let framedMessage = try self.frameMessage(message, compress: false)
  733. try stateMachine.send(message: message, endStream: false)
  734. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
  735. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  736. // Server sends response
  737. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  738. let firstResponseBytes = [UInt8]([5, 6, 7])
  739. let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
  740. let secondResponseBytes = [UInt8]([8, 9, 10])
  741. let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
  742. try stateMachine.receive(message: firstResponse, endStream: false)
  743. try stateMachine.receive(message: secondResponse, endStream: false)
  744. // Make sure messages have arrived
  745. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes))
  746. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(secondResponseBytes))
  747. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  748. // Client sends end
  749. try stateMachine.send(message: [], endStream: true)
  750. // Server ends
  751. let metadataReceivedAction = try stateMachine.receive(
  752. metadata: .serverTrailers,
  753. endStream: true
  754. )
  755. let receivedMetadata = {
  756. var m = Metadata(headers: .serverTrailers)
  757. m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  758. m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  759. return m
  760. }()
  761. XCTAssertEqual(
  762. metadataReceivedAction,
  763. .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata)
  764. )
  765. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  766. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  767. }
  768. func testClientClosesBeforeServerOpens() throws {
  769. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  770. // Client sends metadata
  771. let clientInitialMetadata = try stateMachine.send(metadata: .init())
  772. XCTAssertEqual(
  773. clientInitialMetadata,
  774. [
  775. GRPCHTTP2Keys.path.rawValue: "test/test",
  776. GRPCHTTP2Keys.scheme.rawValue: "http",
  777. GRPCHTTP2Keys.method.rawValue: "POST",
  778. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  779. GRPCHTTP2Keys.te.rawValue: "trailers",
  780. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  781. ]
  782. )
  783. // Client sends messages and ends
  784. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  785. let message = [UInt8]([1, 2, 3, 4])
  786. let framedMessage = try self.frameMessage(message, compress: false)
  787. try stateMachine.send(message: message, endStream: true)
  788. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
  789. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  790. // Server sends initial metadata
  791. let serverInitialHeadersAction = try stateMachine.receive(
  792. metadata: .serverInitialMetadata,
  793. endStream: false
  794. )
  795. XCTAssertEqual(
  796. serverInitialHeadersAction,
  797. .receivedMetadata([
  798. ":status": "200",
  799. "content-type": "application/grpc",
  800. "grpc-accept-encoding": "deflate",
  801. ])
  802. )
  803. // Server sends response
  804. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  805. let firstResponseBytes = [UInt8]([5, 6, 7])
  806. let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
  807. let secondResponseBytes = [UInt8]([8, 9, 10])
  808. let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
  809. try stateMachine.receive(message: firstResponse, endStream: false)
  810. try stateMachine.receive(message: secondResponse, endStream: false)
  811. // Make sure messages have arrived
  812. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes))
  813. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(secondResponseBytes))
  814. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  815. // Server ends
  816. let metadataReceivedAction = try stateMachine.receive(
  817. metadata: .serverTrailers,
  818. endStream: true
  819. )
  820. let receivedMetadata = {
  821. var m = Metadata(headers: .serverTrailers)
  822. m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  823. m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  824. return m
  825. }()
  826. XCTAssertEqual(
  827. metadataReceivedAction,
  828. .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata)
  829. )
  830. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  831. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  832. }
  833. func testClientClosesBeforeServerResponds() throws {
  834. var stateMachine = self.makeClientStateMachine(targetState: .clientIdleServerIdle)
  835. // Client sends metadata
  836. let clientInitialMetadata = try stateMachine.send(metadata: .init())
  837. XCTAssertEqual(
  838. clientInitialMetadata,
  839. [
  840. GRPCHTTP2Keys.path.rawValue: "test/test",
  841. GRPCHTTP2Keys.scheme.rawValue: "http",
  842. GRPCHTTP2Keys.method.rawValue: "POST",
  843. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  844. GRPCHTTP2Keys.te.rawValue: "trailers",
  845. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  846. ]
  847. )
  848. // Client sends messages
  849. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  850. let message = [UInt8]([1, 2, 3, 4])
  851. let framedMessage = try self.frameMessage(message, compress: false)
  852. try stateMachine.send(message: message, endStream: false)
  853. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage))
  854. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  855. // Server sends initial metadata
  856. let serverInitialHeadersAction = try stateMachine.receive(
  857. metadata: .serverInitialMetadata,
  858. endStream: false
  859. )
  860. XCTAssertEqual(
  861. serverInitialHeadersAction,
  862. .receivedMetadata([
  863. ":status": "200",
  864. "content-type": "application/grpc",
  865. "grpc-accept-encoding": "deflate",
  866. ])
  867. )
  868. // Client sends end
  869. try stateMachine.send(message: [], endStream: true)
  870. // Server sends response
  871. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  872. let firstResponseBytes = [UInt8]([5, 6, 7])
  873. let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
  874. let secondResponseBytes = [UInt8]([8, 9, 10])
  875. let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
  876. try stateMachine.receive(message: firstResponse, endStream: false)
  877. try stateMachine.receive(message: secondResponse, endStream: false)
  878. // Make sure messages have arrived
  879. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes))
  880. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(secondResponseBytes))
  881. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  882. // Server ends
  883. let metadataReceivedAction = try stateMachine.receive(
  884. metadata: .serverTrailers,
  885. endStream: true
  886. )
  887. let receivedMetadata = {
  888. var m = Metadata(headers: .serverTrailers)
  889. m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatus.rawValue)
  890. m.removeAllValues(forKey: GRPCHTTP2Keys.grpcStatusMessage.rawValue)
  891. return m
  892. }()
  893. XCTAssertEqual(
  894. metadataReceivedAction,
  895. .receivedStatusAndMetadata(status: .init(code: .ok, message: ""), metadata: receivedMetadata)
  896. )
  897. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  898. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  899. }
  900. }
  901. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  902. final class GRPCStreamServerStateMachineTests: XCTestCase {
  903. private func makeServerStateMachine(
  904. targetState: TargetStateMachineState,
  905. compressionEnabled: Bool = false
  906. ) -> GRPCStreamStateMachine {
  907. var stateMachine = GRPCStreamStateMachine(
  908. configuration: .server(
  909. .init(
  910. scheme: .http,
  911. acceptedEncodings: [.deflate]
  912. )
  913. ),
  914. maximumPayloadSize: 100,
  915. skipAssertions: true
  916. )
  917. let clientMetadata: HPACKHeaders =
  918. compressionEnabled ? .clientInitialMetadataWithDeflateCompression : .clientInitialMetadata
  919. switch targetState {
  920. case .clientIdleServerIdle:
  921. break
  922. case .clientOpenServerIdle:
  923. // Open client
  924. XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false))
  925. case .clientOpenServerOpen:
  926. // Open client
  927. XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false))
  928. // Open server
  929. XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata)))
  930. case .clientOpenServerClosed:
  931. // Open client
  932. XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false))
  933. // Open server
  934. XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata)))
  935. // Close server
  936. XCTAssertNoThrow(
  937. try stateMachine.send(
  938. status: .init(code: .ok, message: ""),
  939. metadata: []
  940. )
  941. )
  942. case .clientClosedServerIdle:
  943. // Open client
  944. XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false))
  945. // Close client
  946. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  947. case .clientClosedServerOpen:
  948. // Open client
  949. XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false))
  950. // Open server
  951. XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata)))
  952. // Close client
  953. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  954. case .clientClosedServerClosed:
  955. // Open client
  956. XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false))
  957. // Open server
  958. XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata)))
  959. // Close client
  960. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  961. // Close server
  962. XCTAssertNoThrow(
  963. try stateMachine.send(
  964. status: .init(code: .ok, message: ""),
  965. metadata: []
  966. )
  967. )
  968. }
  969. return stateMachine
  970. }
  971. // - MARK: Send Metadata
  972. func testSendMetadataWhenClientIdleAndServerIdle() throws {
  973. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  974. XCTAssertThrowsError(
  975. ofType: RPCError.self,
  976. try stateMachine.send(metadata: .init())
  977. ) { error in
  978. XCTAssertEqual(error.code, .internalError)
  979. XCTAssertEqual(
  980. error.message,
  981. "Client cannot be idle if server is sending initial metadata: it must have opened."
  982. )
  983. }
  984. }
  985. func testSendMetadataWhenClientOpenAndServerIdle() throws {
  986. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  987. XCTAssertNoThrow(try stateMachine.send(metadata: .init()))
  988. }
  989. func testSendMetadataWhenClientOpenAndServerOpen() throws {
  990. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  991. // Try sending metadata again: should throw
  992. XCTAssertThrowsError(
  993. ofType: RPCError.self,
  994. try stateMachine.send(metadata: .init())
  995. ) { error in
  996. XCTAssertEqual(error.code, .internalError)
  997. XCTAssertEqual(error.message, "Server has already sent initial metadata.")
  998. }
  999. }
  1000. func testSendMetadataWhenClientOpenAndServerClosed() throws {
  1001. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)
  1002. // Try sending metadata again: should throw
  1003. XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { error in
  1004. XCTAssertEqual(error.code, .internalError)
  1005. XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
  1006. }
  1007. }
  1008. func testSendMetadataWhenClientClosedAndServerIdle() throws {
  1009. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1010. // We should be allowed to send initial metadata if client is closed:
  1011. // client may be finished sending request but may still be awaiting response.
  1012. XCTAssertNoThrow(try stateMachine.send(metadata: .init()))
  1013. }
  1014. func testSendMetadataWhenClientClosedAndServerOpen() throws {
  1015. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
  1016. // Try sending metadata again: should throw
  1017. XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { error in
  1018. XCTAssertEqual(error.code, .internalError)
  1019. XCTAssertEqual(error.message, "Server has already sent initial metadata.")
  1020. }
  1021. }
  1022. func testSendMetadataWhenClientClosedAndServerClosed() throws {
  1023. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed)
  1024. // Try sending metadata again: should throw
  1025. XCTAssertThrowsError(ofType: RPCError.self, try stateMachine.send(metadata: .init())) { error in
  1026. XCTAssertEqual(error.code, .internalError)
  1027. XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
  1028. }
  1029. }
  1030. // - MARK: Send Message
  1031. func testSendMessageWhenClientIdleAndServerIdle() {
  1032. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1033. XCTAssertThrowsError(
  1034. ofType: RPCError.self,
  1035. try stateMachine.send(message: [], endStream: false)
  1036. ) { error in
  1037. XCTAssertEqual(error.code, .internalError)
  1038. XCTAssertEqual(
  1039. error.message,
  1040. "Server must have sent initial metadata before sending a message."
  1041. )
  1042. }
  1043. }
  1044. func testSendMessageWhenClientOpenAndServerIdle() {
  1045. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1046. // Now send a message
  1047. XCTAssertThrowsError(
  1048. ofType: RPCError.self,
  1049. try stateMachine.send(message: [], endStream: false)
  1050. ) { error in
  1051. XCTAssertEqual(error.code, .internalError)
  1052. XCTAssertEqual(
  1053. error.message,
  1054. "Server must have sent initial metadata before sending a message."
  1055. )
  1056. }
  1057. }
  1058. func testSendMessageWhenClientOpenAndServerOpen() {
  1059. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1060. // Now send a message
  1061. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false))
  1062. }
  1063. func testSendMessageWhenClientOpenAndServerClosed() {
  1064. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)
  1065. // Try sending another message: it should fail
  1066. XCTAssertThrowsError(
  1067. ofType: RPCError.self,
  1068. try stateMachine.send(message: [], endStream: false)
  1069. ) { error in
  1070. XCTAssertEqual(error.code, .internalError)
  1071. XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
  1072. }
  1073. }
  1074. func testSendMessageWhenClientClosedAndServerIdle() {
  1075. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1076. XCTAssertThrowsError(
  1077. ofType: RPCError.self,
  1078. try stateMachine.send(message: [], endStream: false)
  1079. ) { error in
  1080. XCTAssertEqual(error.code, .internalError)
  1081. XCTAssertEqual(
  1082. error.message,
  1083. "Server must have sent initial metadata before sending a message."
  1084. )
  1085. }
  1086. }
  1087. func testSendMessageWhenClientClosedAndServerOpen() {
  1088. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
  1089. // Try sending a message: even though client is closed, we should send it
  1090. // because it may be expecting a response.
  1091. XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false))
  1092. }
  1093. func testSendMessageWhenClientClosedAndServerClosed() {
  1094. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed)
  1095. // Try sending another message: it should fail
  1096. XCTAssertThrowsError(
  1097. ofType: RPCError.self,
  1098. try stateMachine.send(message: [], endStream: false)
  1099. ) { error in
  1100. XCTAssertEqual(error.code, .internalError)
  1101. XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
  1102. }
  1103. }
  1104. // - MARK: Send Status and Trailers
  1105. func testSendStatusAndTrailersWhenClientIdle() {
  1106. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1107. XCTAssertThrowsError(
  1108. ofType: RPCError.self,
  1109. try stateMachine.send(
  1110. status: .init(code: .ok, message: ""),
  1111. metadata: .init()
  1112. )
  1113. ) { error in
  1114. XCTAssertEqual(error.code, .internalError)
  1115. XCTAssertEqual(error.message, "Server can't send status if client is idle.")
  1116. }
  1117. }
  1118. func testSendStatusAndTrailersWhenClientOpenAndServerIdle() throws {
  1119. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1120. let trailers = try stateMachine.send(
  1121. status: .init(code: .unknown, message: "RPC unknown"),
  1122. metadata: .init()
  1123. )
  1124. // Make sure it's a trailers-only response: it must have :status header and content-type
  1125. XCTAssertEqual(
  1126. trailers,
  1127. [
  1128. ":status": "200",
  1129. "content-type": "application/grpc",
  1130. "grpc-status": "2",
  1131. "grpc-status-message": "RPC unknown",
  1132. ]
  1133. )
  1134. // Try sending another message: it should fail because server is now closed.
  1135. XCTAssertThrowsError(
  1136. ofType: RPCError.self,
  1137. try stateMachine.send(message: [], endStream: false)
  1138. ) { error in
  1139. XCTAssertEqual(error.code, .internalError)
  1140. XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
  1141. }
  1142. }
  1143. func testSendStatusAndTrailersWhenClientOpenAndServerOpen() throws {
  1144. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1145. let trailers = try stateMachine.send(
  1146. status: .init(code: .ok, message: ""),
  1147. metadata: .init()
  1148. )
  1149. // Make sure it's NOT a trailers-only response, because the server was
  1150. // already open (so it sent initial metadata): it shouldn't have :status or content-type headers
  1151. XCTAssertEqual(trailers, ["grpc-status": "0"])
  1152. // Try sending another message: it should fail because server is now closed.
  1153. XCTAssertThrowsError(
  1154. ofType: RPCError.self,
  1155. try stateMachine.send(message: [], endStream: false)
  1156. ) { error in
  1157. XCTAssertEqual(error.code, .internalError)
  1158. XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
  1159. }
  1160. }
  1161. func testSendStatusAndTrailersWhenClientOpenAndServerClosed() {
  1162. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)
  1163. XCTAssertThrowsError(
  1164. ofType: RPCError.self,
  1165. try stateMachine.send(
  1166. status: .init(code: .ok, message: ""),
  1167. metadata: .init()
  1168. )
  1169. ) { error in
  1170. XCTAssertEqual(error.code, .internalError)
  1171. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  1172. }
  1173. }
  1174. func testSendStatusAndTrailersWhenClientClosedAndServerIdle() throws {
  1175. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1176. let trailers = try stateMachine.send(
  1177. status: .init(code: .unknown, message: "RPC unknown"),
  1178. metadata: .init()
  1179. )
  1180. // Make sure it's a trailers-only response: it must have :status header and content-type
  1181. XCTAssertEqual(
  1182. trailers,
  1183. [
  1184. ":status": "200",
  1185. "content-type": "application/grpc",
  1186. "grpc-status": "2",
  1187. "grpc-status-message": "RPC unknown",
  1188. ]
  1189. )
  1190. // Try sending another message: it should fail because server is now closed.
  1191. XCTAssertThrowsError(
  1192. ofType: RPCError.self,
  1193. try stateMachine.send(message: [], endStream: false)
  1194. ) { error in
  1195. XCTAssertEqual(error.code, .internalError)
  1196. XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
  1197. }
  1198. }
  1199. func testSendStatusAndTrailersWhenClientClosedAndServerOpen() throws {
  1200. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
  1201. let trailers = try stateMachine.send(
  1202. status: .init(code: .ok, message: ""),
  1203. metadata: .init()
  1204. )
  1205. // Make sure it's NOT a trailers-only response, because the server was
  1206. // already open (so it sent initial metadata): it shouldn't have :status or content-type headers
  1207. XCTAssertEqual(trailers, ["grpc-status": "0"])
  1208. // Try sending another message: it should fail because server is now closed.
  1209. XCTAssertThrowsError(
  1210. ofType: RPCError.self,
  1211. try stateMachine.send(message: [], endStream: false)
  1212. ) { error in
  1213. XCTAssertEqual(error.code, .internalError)
  1214. XCTAssertEqual(error.message, "Server can't send a message if it's closed.")
  1215. }
  1216. }
  1217. func testSendStatusAndTrailersWhenClientClosedAndServerClosed() {
  1218. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed)
  1219. XCTAssertThrowsError(
  1220. ofType: RPCError.self,
  1221. try stateMachine.send(
  1222. status: .init(code: .ok, message: ""),
  1223. metadata: .init()
  1224. )
  1225. ) { error in
  1226. XCTAssertEqual(error.code, .internalError)
  1227. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  1228. }
  1229. }
  1230. // - MARK: Receive metadata
  1231. func testReceiveMetadataWhenClientIdleAndServerIdle() throws {
  1232. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1233. let action = try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1234. XCTAssertEqual(
  1235. action,
  1236. .receivedMetadata(Metadata(headers: .clientInitialMetadata))
  1237. )
  1238. }
  1239. func testReceiveMetadataWhenClientIdleAndServerIdle_WithEndStream() throws {
  1240. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1241. let action = try stateMachine.receive(metadata: .clientInitialMetadata, endStream: true)
  1242. XCTAssertEqual(
  1243. action,
  1244. .receivedMetadata(Metadata(headers: .clientInitialMetadata))
  1245. )
  1246. }
  1247. func testReceiveMetadataWhenClientIdleAndServerIdle_MissingContentType() throws {
  1248. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1249. let action = try stateMachine.receive(
  1250. metadata: .receivedWithoutContentType,
  1251. endStream: false
  1252. )
  1253. self.assertRejectedRPC(action) { trailers in
  1254. XCTAssertEqual(trailers.count, 1)
  1255. XCTAssertEqual(trailers.firstString(forKey: .status), "415")
  1256. }
  1257. }
  1258. func testReceiveMetadataWhenClientIdleAndServerIdle_InvalidContentType() throws {
  1259. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1260. let action = try stateMachine.receive(
  1261. metadata: .receivedWithInvalidContentType,
  1262. endStream: false
  1263. )
  1264. self.assertRejectedRPC(action) { trailers in
  1265. XCTAssertEqual(trailers.count, 1)
  1266. XCTAssertEqual(trailers.firstString(forKey: .status), "415")
  1267. }
  1268. }
  1269. func testReceiveMetadataWhenClientIdleAndServerIdle_MissingPath() throws {
  1270. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1271. let action = try stateMachine.receive(
  1272. metadata: .receivedWithoutEndpoint,
  1273. endStream: false
  1274. )
  1275. self.assertRejectedRPC(action) { trailers in
  1276. XCTAssertEqual(
  1277. trailers,
  1278. [
  1279. ":status": "200",
  1280. "content-type": "application/grpc",
  1281. "grpc-status": "12",
  1282. "grpc-status-message": "No :path header has been set.",
  1283. ]
  1284. )
  1285. }
  1286. }
  1287. func testReceiveMetadataWhenClientIdleAndServerIdle_ServerUnsupportedEncoding() throws {
  1288. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1289. // Try opening client with a compression algorithm that is not accepted
  1290. // by the server.
  1291. let action = try stateMachine.receive(
  1292. metadata: .clientInitialMetadataWithGzipCompression,
  1293. endStream: false
  1294. )
  1295. self.assertRejectedRPC(action) { trailers in
  1296. XCTAssertEqual(
  1297. trailers,
  1298. [
  1299. ":status": "200",
  1300. "content-type": "application/grpc",
  1301. "grpc-accept-encoding": "deflate",
  1302. "grpc-status": "12",
  1303. "grpc-status-message":
  1304. "gzip compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  1305. ]
  1306. )
  1307. }
  1308. }
  1309. //TODO: add more encoding-related validation tests (for both client and server)
  1310. // and message encoding tests
  1311. func testReceiveMetadataWhenClientOpenAndServerIdle() throws {
  1312. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1313. // Try receiving initial metadata again - should fail
  1314. XCTAssertThrowsError(
  1315. ofType: RPCError.self,
  1316. try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1317. ) { error in
  1318. XCTAssertEqual(error.code, .internalError)
  1319. XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.")
  1320. }
  1321. }
  1322. func testReceiveMetadataWhenClientOpenAndServerOpen() throws {
  1323. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1324. XCTAssertThrowsError(
  1325. ofType: RPCError.self,
  1326. try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1327. ) { error in
  1328. XCTAssertEqual(error.code, .internalError)
  1329. XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.")
  1330. }
  1331. }
  1332. func testReceiveMetadataWhenClientOpenAndServerClosed() {
  1333. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)
  1334. XCTAssertThrowsError(
  1335. ofType: RPCError.self,
  1336. try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1337. ) { error in
  1338. XCTAssertEqual(error.code, .internalError)
  1339. XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.")
  1340. }
  1341. }
  1342. func testReceiveMetadataWhenClientClosedAndServerIdle() {
  1343. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1344. XCTAssertThrowsError(
  1345. ofType: RPCError.self,
  1346. try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1347. ) { error in
  1348. XCTAssertEqual(error.code, .internalError)
  1349. XCTAssertEqual(error.message, "Client can't have sent metadata if closed.")
  1350. }
  1351. }
  1352. func testReceiveMetadataWhenClientClosedAndServerOpen() {
  1353. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
  1354. XCTAssertThrowsError(
  1355. ofType: RPCError.self,
  1356. try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1357. ) { error in
  1358. XCTAssertEqual(error.code, .internalError)
  1359. XCTAssertEqual(error.message, "Client can't have sent metadata if closed.")
  1360. }
  1361. }
  1362. func testReceiveMetadataWhenClientClosedAndServerClosed() {
  1363. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed)
  1364. XCTAssertThrowsError(
  1365. ofType: RPCError.self,
  1366. try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false)
  1367. ) { error in
  1368. XCTAssertEqual(error.code, .internalError)
  1369. XCTAssertEqual(error.message, "Client can't have sent metadata if closed.")
  1370. }
  1371. }
  1372. // - MARK: Receive message
  1373. func testReceiveMessageWhenClientIdleAndServerIdle() {
  1374. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1375. XCTAssertThrowsError(
  1376. ofType: RPCError.self,
  1377. try stateMachine.receive(message: .init(), endStream: false)
  1378. ) { error in
  1379. XCTAssertEqual(error.code, .internalError)
  1380. XCTAssertEqual(error.message, "Can't have received a message if client is idle.")
  1381. }
  1382. }
  1383. func testReceiveMessageWhenClientOpenAndServerIdle() {
  1384. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1385. // Receive messages successfully: the second one should close client.
  1386. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false))
  1387. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  1388. // Verify client is now closed
  1389. XCTAssertThrowsError(
  1390. ofType: RPCError.self,
  1391. try stateMachine.receive(message: .init(), endStream: false)
  1392. ) { error in
  1393. XCTAssertEqual(error.code, .internalError)
  1394. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  1395. }
  1396. }
  1397. func testReceiveMessageWhenClientOpenAndServerOpen() throws {
  1398. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1399. // Receive messages successfully: the second one should close client.
  1400. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false))
  1401. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  1402. // Verify client is now closed
  1403. XCTAssertThrowsError(
  1404. ofType: RPCError.self,
  1405. try stateMachine.receive(message: .init(), endStream: false)
  1406. ) { error in
  1407. XCTAssertEqual(error.code, .internalError)
  1408. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  1409. }
  1410. }
  1411. func testReceiveMessageWhenClientOpenAndServerClosed() {
  1412. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)
  1413. // Client is not done sending request, don't fail.
  1414. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false))
  1415. }
  1416. func testReceiveMessageWhenClientClosedAndServerIdle() {
  1417. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1418. XCTAssertThrowsError(
  1419. ofType: RPCError.self,
  1420. try stateMachine.receive(message: .init(), endStream: false)
  1421. ) { error in
  1422. XCTAssertEqual(error.code, .internalError)
  1423. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  1424. }
  1425. }
  1426. func testReceiveMessageWhenClientClosedAndServerOpen() {
  1427. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
  1428. XCTAssertThrowsError(
  1429. ofType: RPCError.self,
  1430. try stateMachine.receive(message: .init(), endStream: false)
  1431. ) { error in
  1432. XCTAssertEqual(error.code, .internalError)
  1433. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  1434. }
  1435. }
  1436. func testReceiveMessageWhenClientClosedAndServerClosed() {
  1437. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerClosed)
  1438. XCTAssertThrowsError(
  1439. ofType: RPCError.self,
  1440. try stateMachine.receive(message: .init(), endStream: false)
  1441. ) { error in
  1442. XCTAssertEqual(error.code, .internalError)
  1443. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  1444. }
  1445. }
  1446. // - MARK: Next outbound message
  1447. func testNextOutboundMessageWhenClientIdleAndServerIdle() {
  1448. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1449. XCTAssertThrowsError(
  1450. ofType: RPCError.self,
  1451. try stateMachine.nextOutboundMessage()
  1452. ) { error in
  1453. XCTAssertEqual(error.code, .internalError)
  1454. XCTAssertEqual(error.message, "Server is not open yet.")
  1455. }
  1456. }
  1457. func testNextOutboundMessageWhenClientOpenAndServerIdle() throws {
  1458. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1459. XCTAssertThrowsError(
  1460. ofType: RPCError.self,
  1461. try stateMachine.nextOutboundMessage()
  1462. ) { error in
  1463. XCTAssertEqual(error.code, .internalError)
  1464. XCTAssertEqual(error.message, "Server is not open yet.")
  1465. }
  1466. }
  1467. func testNextOutboundMessageWhenClientOpenAndServerIdle_WithCompression() throws {
  1468. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1469. XCTAssertThrowsError(
  1470. ofType: RPCError.self,
  1471. try stateMachine.nextOutboundMessage()
  1472. ) { error in
  1473. XCTAssertEqual(error.code, .internalError)
  1474. XCTAssertEqual(error.message, "Server is not open yet.")
  1475. }
  1476. }
  1477. func testNextOutboundMessageWhenClientOpenAndServerOpen() throws {
  1478. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1479. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1480. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  1481. let response = try stateMachine.nextOutboundMessage()
  1482. let expectedBytes: [UInt8] = [
  1483. 0, // compression flag: unset
  1484. 0, 0, 0, 2, // message length: 2 bytes
  1485. 42, 42, // original message
  1486. ]
  1487. XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
  1488. // And then make sure that nothing else is returned
  1489. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1490. }
  1491. func testNextOutboundMessageWhenClientOpenAndServerOpen_WithCompression() throws {
  1492. var stateMachine = self.makeServerStateMachine(
  1493. targetState: .clientOpenServerOpen,
  1494. compressionEnabled: true
  1495. )
  1496. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1497. let originalMessage = [UInt8]([42, 42, 43, 43])
  1498. XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false))
  1499. let response = try stateMachine.nextOutboundMessage()
  1500. let framedMessage = try self.frameMessage(originalMessage, compress: true)
  1501. XCTAssertEqual(response, .sendMessage(framedMessage))
  1502. }
  1503. func testNextOutboundMessageWhenClientOpenAndServerClosed() throws {
  1504. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1505. // Send message and close server
  1506. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  1507. XCTAssertNoThrow(
  1508. try stateMachine.send(
  1509. status: .init(code: .ok, message: ""),
  1510. metadata: []
  1511. )
  1512. )
  1513. let response = try stateMachine.nextOutboundMessage()
  1514. let expectedBytes: [UInt8] = [
  1515. 0, // compression flag: unset
  1516. 0, 0, 0, 2, // message length: 2 bytes
  1517. 42, 42, // original message
  1518. ]
  1519. XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
  1520. // And then make sure that nothing else is returned anymore
  1521. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  1522. }
  1523. func testNextOutboundMessageWhenClientClosedAndServerIdle() throws {
  1524. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1525. XCTAssertThrowsError(
  1526. ofType: RPCError.self,
  1527. try stateMachine.nextOutboundMessage()
  1528. ) { error in
  1529. XCTAssertEqual(error.code, .internalError)
  1530. XCTAssertEqual(error.message, "Server is not open yet.")
  1531. }
  1532. }
  1533. func testNextOutboundMessageWhenClientClosedAndServerOpen() throws {
  1534. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1535. // Send a message
  1536. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  1537. // Close client
  1538. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  1539. // Send another message
  1540. XCTAssertNoThrow(try stateMachine.send(message: [43, 43], endStream: false))
  1541. // Make sure that getting the next outbound message _does_ return the message
  1542. // we have enqueued.
  1543. let response = try stateMachine.nextOutboundMessage()
  1544. let expectedBytes: [UInt8] = [
  1545. 0, // compression flag: unset
  1546. 0, 0, 0, 2, // message length: 2 bytes
  1547. 42, 42, // original message
  1548. // End of first message - beginning of second
  1549. 0, // compression flag: unset
  1550. 0, 0, 0, 2, // message length: 2 bytes
  1551. 43, 43, // original message
  1552. ]
  1553. XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
  1554. // And then make sure that nothing else is returned anymore
  1555. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1556. }
  1557. func testNextOutboundMessageWhenClientClosedAndServerClosed() throws {
  1558. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen)
  1559. // Send a message and close server
  1560. XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false))
  1561. XCTAssertNoThrow(
  1562. try stateMachine.send(
  1563. status: .init(code: .ok, message: ""),
  1564. metadata: []
  1565. )
  1566. )
  1567. // We have enqueued a message, make sure we return it even though server is closed,
  1568. // because we haven't yet drained all of the pending messages.
  1569. let response = try stateMachine.nextOutboundMessage()
  1570. let expectedBytes: [UInt8] = [
  1571. 0, // compression flag: unset
  1572. 0, 0, 0, 2, // message length: 2 bytes
  1573. 42, 42, // original message
  1574. ]
  1575. XCTAssertEqual(response, .sendMessage(ByteBuffer(bytes: expectedBytes)))
  1576. // And then make sure that nothing else is returned anymore
  1577. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  1578. }
  1579. // - MARK: Next inbound message
  1580. func testNextInboundMessageWhenClientIdleAndServerIdle() {
  1581. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1582. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1583. }
  1584. func testNextInboundMessageWhenClientOpenAndServerIdle() {
  1585. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)
  1586. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1587. }
  1588. func testNextInboundMessageWhenClientOpenAndServerOpen() throws {
  1589. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1590. let receivedBytes = ByteBuffer(bytes: [
  1591. 0, // compression flag: unset
  1592. 0, 0, 0, 2, // message length: 2 bytes
  1593. 42, 42, // original message
  1594. ])
  1595. try stateMachine.receive(message: receivedBytes, endStream: false)
  1596. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  1597. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1598. }
  1599. func testNextInboundMessageWhenClientOpenAndServerOpen_WithCompression() throws {
  1600. var stateMachine = self.makeServerStateMachine(
  1601. targetState: .clientOpenServerOpen,
  1602. compressionEnabled: true
  1603. )
  1604. let originalMessage = [UInt8]([42, 42, 43, 43])
  1605. let receivedBytes = try self.frameMessage(originalMessage, compress: true)
  1606. try stateMachine.receive(message: receivedBytes, endStream: false)
  1607. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage))
  1608. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1609. }
  1610. func testNextInboundMessageWhenClientOpenAndServerClosed() throws {
  1611. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1612. let receivedBytes = ByteBuffer(bytes: [
  1613. 0, // compression flag: unset
  1614. 0, 0, 0, 2, // message length: 2 bytes
  1615. 42, 42, // original message
  1616. ])
  1617. try stateMachine.receive(message: receivedBytes, endStream: false)
  1618. // Close server
  1619. XCTAssertNoThrow(
  1620. try stateMachine.send(
  1621. status: .init(code: .ok, message: ""),
  1622. metadata: []
  1623. )
  1624. )
  1625. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  1626. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1627. }
  1628. func testNextInboundMessageWhenClientClosedAndServerIdle() {
  1629. var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerIdle)
  1630. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  1631. }
  1632. func testNextInboundMessageWhenClientClosedAndServerOpen() throws {
  1633. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1634. let receivedBytes = ByteBuffer(bytes: [
  1635. 0, // compression flag: unset
  1636. 0, 0, 0, 2, // message length: 2 bytes
  1637. 42, 42, // original message
  1638. ])
  1639. try stateMachine.receive(message: receivedBytes, endStream: false)
  1640. // Close client
  1641. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  1642. // Even though the client is closed, because the server received a message
  1643. // while it was still open, we must get the message now.
  1644. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  1645. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  1646. }
  1647. func testNextInboundMessageWhenClientClosedAndServerClosed() throws {
  1648. var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen)
  1649. let receivedBytes = ByteBuffer(bytes: [
  1650. 0, // compression flag: unset
  1651. 0, 0, 0, 2, // message length: 2 bytes
  1652. 42, 42, // original message
  1653. ])
  1654. try stateMachine.receive(message: receivedBytes, endStream: false)
  1655. // Close server
  1656. XCTAssertNoThrow(
  1657. try stateMachine.send(
  1658. status: .init(code: .ok, message: ""),
  1659. metadata: []
  1660. )
  1661. )
  1662. // Close client
  1663. XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true))
  1664. // Even though the client and server are closed, because the server received
  1665. // a message while the client was still open, we must get the message now.
  1666. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42]))
  1667. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  1668. }
  1669. // - MARK: Common paths
  1670. func testNormalFlow() throws {
  1671. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1672. // Client sends metadata
  1673. let receiveMetadataAction = try stateMachine.receive(
  1674. metadata: .clientInitialMetadata,
  1675. endStream: false
  1676. )
  1677. XCTAssertEqual(
  1678. receiveMetadataAction,
  1679. .receivedMetadata(Metadata(headers: .clientInitialMetadata))
  1680. )
  1681. // Server sends initial metadata
  1682. let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"]))
  1683. XCTAssertEqual(
  1684. sentInitialHeaders,
  1685. [
  1686. ":status": "200",
  1687. "content-type": "application/grpc",
  1688. "grpc-accept-encoding": "deflate",
  1689. "custom": "value",
  1690. ]
  1691. )
  1692. // Client sends messages
  1693. let deframedMessage = [UInt8]([1, 2, 3, 4])
  1694. let completeMessage = try self.frameMessage(deframedMessage, compress: false)
  1695. // Split message into two parts to make sure the stitching together of the frames works well
  1696. let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
  1697. let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
  1698. try stateMachine.receive(message: firstMessage, endStream: false)
  1699. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1700. try stateMachine.receive(message: secondMessage, endStream: false)
  1701. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
  1702. // Server sends response
  1703. let firstResponse = [UInt8]([5, 6, 7])
  1704. let secondResponse = [UInt8]([8, 9, 10])
  1705. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1706. try stateMachine.send(message: firstResponse, endStream: false)
  1707. try stateMachine.send(message: secondResponse, endStream: false)
  1708. // Make sure messages are outbound
  1709. let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
  1710. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages))
  1711. // Client sends end
  1712. try stateMachine.receive(message: ByteBuffer(), endStream: true)
  1713. // Server ends
  1714. let response = try stateMachine.send(
  1715. status: .init(code: .ok, message: ""),
  1716. metadata: []
  1717. )
  1718. XCTAssertEqual(response, ["grpc-status": "0"])
  1719. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  1720. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  1721. }
  1722. func testClientClosesBeforeServerOpens() throws {
  1723. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1724. // Client sends metadata
  1725. let receiveMetadataAction = try stateMachine.receive(
  1726. metadata: .clientInitialMetadata,
  1727. endStream: false
  1728. )
  1729. XCTAssertEqual(
  1730. receiveMetadataAction,
  1731. .receivedMetadata(Metadata(headers: .clientInitialMetadata))
  1732. )
  1733. // Client sends messages
  1734. let deframedMessage = [UInt8]([1, 2, 3, 4])
  1735. let completeMessage = try self.frameMessage(deframedMessage, compress: false)
  1736. // Split message into two parts to make sure the stitching together of the frames works well
  1737. let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
  1738. let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
  1739. try stateMachine.receive(message: firstMessage, endStream: false)
  1740. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1741. try stateMachine.receive(message: secondMessage, endStream: false)
  1742. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
  1743. // Client sends end
  1744. try stateMachine.receive(message: ByteBuffer(), endStream: true)
  1745. // Server sends initial metadata
  1746. let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"]))
  1747. XCTAssertEqual(
  1748. sentInitialHeaders,
  1749. [
  1750. "custom": "value",
  1751. ":status": "200",
  1752. "content-type": "application/grpc",
  1753. "grpc-accept-encoding": "deflate",
  1754. ]
  1755. )
  1756. // Server sends response
  1757. let firstResponse = [UInt8]([5, 6, 7])
  1758. let secondResponse = [UInt8]([8, 9, 10])
  1759. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1760. try stateMachine.send(message: firstResponse, endStream: false)
  1761. try stateMachine.send(message: secondResponse, endStream: false)
  1762. // Make sure messages are outbound
  1763. let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
  1764. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages))
  1765. // Server ends
  1766. let response = try stateMachine.send(
  1767. status: .init(code: .ok, message: ""),
  1768. metadata: []
  1769. )
  1770. XCTAssertEqual(response, ["grpc-status": "0"])
  1771. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  1772. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  1773. }
  1774. func testClientClosesBeforeServerResponds() throws {
  1775. var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle)
  1776. // Client sends metadata
  1777. let receiveMetadataAction = try stateMachine.receive(
  1778. metadata: .clientInitialMetadata,
  1779. endStream: false
  1780. )
  1781. XCTAssertEqual(
  1782. receiveMetadataAction,
  1783. .receivedMetadata(Metadata(headers: .clientInitialMetadata))
  1784. )
  1785. // Client sends messages
  1786. let deframedMessage = [UInt8]([1, 2, 3, 4])
  1787. let completeMessage = try self.frameMessage(deframedMessage, compress: false)
  1788. // Split message into two parts to make sure the stitching together of the frames works well
  1789. let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
  1790. let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
  1791. try stateMachine.receive(message: firstMessage, endStream: false)
  1792. XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)
  1793. try stateMachine.receive(message: secondMessage, endStream: false)
  1794. XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage))
  1795. // Server sends initial metadata
  1796. let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"]))
  1797. XCTAssertEqual(
  1798. sentInitialHeaders,
  1799. [
  1800. "custom": "value",
  1801. ":status": "200",
  1802. "content-type": "application/grpc",
  1803. "grpc-accept-encoding": "deflate",
  1804. ]
  1805. )
  1806. // Client sends end
  1807. try stateMachine.receive(message: ByteBuffer(), endStream: true)
  1808. // Server sends response
  1809. let firstResponse = [UInt8]([5, 6, 7])
  1810. let secondResponse = [UInt8]([8, 9, 10])
  1811. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages)
  1812. try stateMachine.send(message: firstResponse, endStream: false)
  1813. try stateMachine.send(message: secondResponse, endStream: false)
  1814. // Make sure messages are outbound
  1815. let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
  1816. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages))
  1817. // Server ends
  1818. let response = try stateMachine.send(
  1819. status: .init(code: .ok, message: ""),
  1820. metadata: []
  1821. )
  1822. XCTAssertEqual(response, ["grpc-status": "0"])
  1823. XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages)
  1824. XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages)
  1825. }
  1826. }
  1827. extension XCTestCase {
  1828. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  1829. func assertRejectedRPC(
  1830. _ action: GRPCStreamStateMachine.OnMetadataReceived,
  1831. expression: (HPACKHeaders) throws -> Void
  1832. ) rethrows {
  1833. guard case .rejectRPC(let trailers) = action else {
  1834. XCTFail("RPC should have been rejected.")
  1835. return
  1836. }
  1837. try expression(trailers)
  1838. }
  1839. func frameMessage(_ message: [UInt8], compress: Bool) throws -> ByteBuffer {
  1840. try frameMessages([message], compress: compress)
  1841. }
  1842. func frameMessages(_ messages: [[UInt8]], compress: Bool) throws -> ByteBuffer {
  1843. var framer = GRPCMessageFramer()
  1844. let compressor: Zlib.Compressor? = {
  1845. if compress {
  1846. return Zlib.Compressor(method: .deflate)
  1847. } else {
  1848. return nil
  1849. }
  1850. }()
  1851. defer { compressor?.end() }
  1852. for message in messages {
  1853. framer.append(message)
  1854. }
  1855. return try XCTUnwrap(framer.next(compressor: compressor))
  1856. }
  1857. }