GRPCClientStateMachineTests.swift 41 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Foundation
  18. @testable import GRPC
  19. import Logging
  20. import NIO
  21. import NIOHPACK
  22. import NIOHTTP1
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage(_ message: String) throws -> ByteBuffer {
  35. let buffer = self.allocator.buffer(string: message)
  36. let writer = LengthPrefixedMessageWriter(compression: .none)
  37. return try writer.write(buffer: buffer, allocator: self.allocator, compressed: false)
  38. }
  39. /// Writes a message into the given `buffer`.
  40. func writeMessage(_ message: String, into buffer: inout ByteBuffer) throws {
  41. var other = try self.writeMessage(message)
  42. buffer.writeBuffer(&other)
  43. }
  44. /// Returns a minimally valid `HPACKHeaders` for a response.
  45. func makeResponseHeaders(
  46. status: String? = "200",
  47. contentType: String? = "application/grpc+proto"
  48. ) -> HPACKHeaders {
  49. var headers: HPACKHeaders = [:]
  50. status.map { headers.add(name: ":status", value: $0) }
  51. contentType.map { headers.add(name: "content-type", value: $0) }
  52. return headers
  53. }
  54. }
  55. // MARK: - Send Request Headers
  56. extension GRPCClientStateMachineTests {
  57. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  58. var stateMachine = self.makeStateMachine(state)
  59. stateMachine.sendRequestHeaders(requestHead: .init(
  60. method: "POST",
  61. scheme: "http",
  62. path: "/echo/Get",
  63. host: "host",
  64. deadline: .distantFuture,
  65. customMetadata: [:],
  66. encoding: .disabled
  67. )).assertFailure {
  68. XCTAssertEqual($0, .invalidState)
  69. }
  70. }
  71. func testSendRequestHeadersFromIdle() {
  72. var stateMachine = self
  73. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  74. stateMachine.sendRequestHeaders(requestHead: .init(
  75. method: "POST",
  76. scheme: "http",
  77. path: "/echo/Get",
  78. host: "host",
  79. deadline: .distantFuture,
  80. customMetadata: [:],
  81. encoding: .disabled
  82. )).assertSuccess()
  83. }
  84. func testSendRequestHeadersFromClientActiveServerIdle() {
  85. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(
  86. writeState: .one(),
  87. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  88. ))
  89. }
  90. func testSendRequestHeadersFromClientClosedServerIdle() {
  91. self
  92. .doTestSendRequestHeadersFromInvalidState(
  93. .clientClosedServerIdle(pendingReadState: .init(
  94. arity: .one,
  95. messageEncoding: .disabled
  96. ))
  97. )
  98. }
  99. func testSendRequestHeadersFromActive() {
  100. self
  101. .doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(
  102. writeState: .one(),
  103. readState: .one()
  104. ))
  105. }
  106. func testSendRequestHeadersFromClientClosedServerActive() {
  107. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  108. }
  109. func testSendRequestHeadersFromClosed() {
  110. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  111. }
  112. }
  113. // MARK: - Send Request
  114. extension GRPCClientStateMachineTests {
  115. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  116. var stateMachine = self.makeStateMachine(state)
  117. stateMachine.sendRequest(
  118. ByteBuffer(string: "Hello!"),
  119. compressed: false,
  120. allocator: self.allocator
  121. ).assertFailure {
  122. XCTAssertEqual($0, expected)
  123. }
  124. }
  125. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  126. var stateMachine = self.makeStateMachine(state)
  127. let request = "Hello!"
  128. stateMachine.sendRequest(
  129. ByteBuffer(string: request),
  130. compressed: false,
  131. allocator: self.allocator
  132. ).assertSuccess { buffer in
  133. var buffer = buffer
  134. // Remove the length and compression flag prefix.
  135. buffer.moveReaderIndex(forwardBy: 5)
  136. let data = buffer.readString(length: buffer.readableBytes)!
  137. XCTAssertEqual(request, data)
  138. }
  139. }
  140. func testSendRequestFromIdle() {
  141. self.doTestSendRequestFromInvalidState(
  142. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  143. expected: .invalidState
  144. )
  145. }
  146. func testSendRequestFromClientActiveServerIdle() {
  147. self.doTestSendRequestFromValidState(.clientActiveServerIdle(
  148. writeState: .one(),
  149. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  150. ))
  151. }
  152. func testSendRequestFromClientClosedServerIdle() {
  153. self.doTestSendRequestFromInvalidState(
  154. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  155. expected: .cardinalityViolation
  156. )
  157. }
  158. func testSendRequestFromActive() {
  159. self
  160. .doTestSendRequestFromValidState(.clientActiveServerActive(
  161. writeState: .one(),
  162. readState: .one()
  163. ))
  164. }
  165. func testSendRequestFromClientClosedServerActive() {
  166. self.doTestSendRequestFromInvalidState(
  167. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  168. expected: .cardinalityViolation
  169. )
  170. }
  171. func testSendRequestFromClosed() {
  172. self.doTestSendRequestFromInvalidState(
  173. .clientClosedServerClosed,
  174. expected: .cardinalityViolation
  175. )
  176. }
  177. }
  178. // MARK: - Send End of Request Stream
  179. extension GRPCClientStateMachineTests {
  180. func doTestSendEndOfRequestStreamFromInvalidState(
  181. _ state: StateMachine.State,
  182. expected: SendEndOfRequestStreamError
  183. ) {
  184. var stateMachine = self.makeStateMachine(state)
  185. stateMachine.sendEndOfRequestStream().assertFailure {
  186. XCTAssertEqual($0, expected)
  187. }
  188. }
  189. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  190. var stateMachine = self.makeStateMachine(state)
  191. stateMachine.sendEndOfRequestStream().assertSuccess()
  192. }
  193. func testSendEndOfRequestStreamFromIdle() {
  194. self.doTestSendEndOfRequestStreamFromInvalidState(
  195. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  196. expected: .invalidState
  197. )
  198. }
  199. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  200. self.doTestSendEndOfRequestStreamFromValidState(
  201. .clientActiveServerIdle(
  202. writeState: .one(),
  203. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  204. )
  205. )
  206. }
  207. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  208. self.doTestSendEndOfRequestStreamFromInvalidState(
  209. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  210. expected: .alreadyClosed
  211. )
  212. }
  213. func testSendEndOfRequestStreamFromActive() {
  214. self.doTestSendEndOfRequestStreamFromValidState(
  215. .clientActiveServerActive(writeState: .one(), readState: .one())
  216. )
  217. }
  218. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  219. self.doTestSendEndOfRequestStreamFromInvalidState(
  220. .clientClosedServerActive(readState: .one()),
  221. expected: .alreadyClosed
  222. )
  223. }
  224. func testSendEndOfRequestStreamFromClosed() {
  225. self.doTestSendEndOfRequestStreamFromInvalidState(
  226. .clientClosedServerClosed,
  227. expected: .alreadyClosed
  228. )
  229. }
  230. }
  231. // MARK: - Receive Response Headers
  232. extension GRPCClientStateMachineTests {
  233. func doTestReceiveResponseHeadersFromInvalidState(
  234. _ state: StateMachine.State,
  235. expected: ReceiveResponseHeadError
  236. ) {
  237. var stateMachine = self.makeStateMachine(state)
  238. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  239. XCTAssertEqual($0, expected)
  240. }
  241. }
  242. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  243. var stateMachine = self.makeStateMachine(state)
  244. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  245. }
  246. func testReceiveResponseHeadersFromIdle() {
  247. self.doTestReceiveResponseHeadersFromInvalidState(
  248. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  249. expected: .invalidState
  250. )
  251. }
  252. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  253. self.doTestReceiveResponseHeadersFromValidState(
  254. .clientActiveServerIdle(
  255. writeState: .one(),
  256. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  257. )
  258. )
  259. }
  260. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  261. self.doTestReceiveResponseHeadersFromValidState(
  262. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  263. )
  264. }
  265. func testReceiveResponseHeadersFromActive() {
  266. self.doTestReceiveResponseHeadersFromInvalidState(
  267. .clientActiveServerActive(writeState: .one(), readState: .one()),
  268. expected: .invalidState
  269. )
  270. }
  271. func testReceiveResponseHeadersFromClientClosedServerActive() {
  272. self.doTestReceiveResponseHeadersFromInvalidState(
  273. .clientClosedServerActive(readState: .one()),
  274. expected: .invalidState
  275. )
  276. }
  277. func testReceiveResponseHeadersFromClosed() {
  278. self.doTestReceiveResponseHeadersFromInvalidState(
  279. .clientClosedServerClosed,
  280. expected: .invalidState
  281. )
  282. }
  283. }
  284. // MARK: - Receive Response
  285. extension GRPCClientStateMachineTests {
  286. func doTestReceiveResponseFromInvalidState(
  287. _ state: StateMachine.State,
  288. expected: MessageReadError
  289. ) throws {
  290. var stateMachine = self.makeStateMachine(state)
  291. let message = "Hello!"
  292. var buffer = try self.writeMessage(message)
  293. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  294. XCTAssertEqual($0, expected)
  295. }
  296. }
  297. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  298. var stateMachine = self.makeStateMachine(state)
  299. let message = "Hello!"
  300. var buffer = try self.writeMessage(message)
  301. stateMachine.receiveResponseBuffer(&buffer).assertSuccess { messages in
  302. XCTAssertEqual(messages, [ByteBuffer(string: message)])
  303. }
  304. }
  305. func testReceiveResponseFromIdle() throws {
  306. try self.doTestReceiveResponseFromInvalidState(
  307. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  308. expected: .invalidState
  309. )
  310. }
  311. func testReceiveResponseFromClientActiveServerIdle() throws {
  312. try self.doTestReceiveResponseFromInvalidState(
  313. .clientActiveServerIdle(
  314. writeState: .one(),
  315. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  316. ),
  317. expected: .invalidState
  318. )
  319. }
  320. func testReceiveResponseFromClientClosedServerIdle() throws {
  321. try self.doTestReceiveResponseFromInvalidState(
  322. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  323. expected: .invalidState
  324. )
  325. }
  326. func testReceiveResponseFromActive() throws {
  327. try self.doTestReceiveResponseFromValidState(
  328. .clientActiveServerActive(writeState: .one(), readState: .one())
  329. )
  330. }
  331. func testReceiveResponseFromClientClosedServerActive() throws {
  332. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  333. }
  334. func testReceiveResponseFromClosed() throws {
  335. try self.doTestReceiveResponseFromInvalidState(
  336. .clientClosedServerClosed,
  337. expected: .invalidState
  338. )
  339. }
  340. }
  341. // MARK: - Receive End of Response Stream
  342. extension GRPCClientStateMachineTests {
  343. func doTestReceiveEndOfResponseStreamFromInvalidState(
  344. _ state: StateMachine.State,
  345. expected: ReceiveEndOfResponseStreamError
  346. ) {
  347. var stateMachine = self.makeStateMachine(state)
  348. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  349. }
  350. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  351. var stateMachine = self.makeStateMachine(state)
  352. var trailers: HPACKHeaders = [
  353. GRPCHeaderName.statusCode: "0",
  354. GRPCHeaderName.statusMessage: "ok",
  355. ]
  356. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  357. // content-type to make a valid set of trailers.
  358. switch state {
  359. case .clientActiveServerIdle,
  360. .clientClosedServerIdle:
  361. trailers.add(name: ":status", value: "200")
  362. trailers.add(name: "content-type", value: "application/grpc+proto")
  363. default:
  364. break
  365. }
  366. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  367. XCTAssertEqual(status.code, .ok)
  368. XCTAssertEqual(status.message, "ok")
  369. }
  370. }
  371. func testReceiveEndOfResponseStreamFromIdle() {
  372. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  373. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  374. expected: .invalidState
  375. )
  376. }
  377. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  378. self.doTestReceiveEndOfResponseStreamFromValidState(
  379. .clientActiveServerIdle(
  380. writeState: .one(),
  381. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  382. )
  383. )
  384. }
  385. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  386. self.doTestReceiveEndOfResponseStreamFromValidState(
  387. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  388. )
  389. }
  390. func testReceiveEndOfResponseStreamFromActive() {
  391. self.doTestReceiveEndOfResponseStreamFromValidState(
  392. .clientActiveServerActive(writeState: .one(), readState: .one())
  393. )
  394. }
  395. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  396. self.doTestReceiveEndOfResponseStreamFromValidState(
  397. .clientClosedServerActive(readState: .one())
  398. )
  399. }
  400. func testReceiveEndOfResponseStreamFromClosed() {
  401. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  402. .clientClosedServerClosed,
  403. expected: .invalidState
  404. )
  405. }
  406. }
  407. // MARK: - Basic RPC flow.
  408. extension GRPCClientStateMachineTests {
  409. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  410. var headers = HPACKHeaders()
  411. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  412. if let message = message {
  413. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  414. }
  415. return headers
  416. }
  417. func testSimpleUnaryFlow() throws {
  418. var stateMachine = self
  419. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  420. // Initiate the RPC
  421. stateMachine.sendRequestHeaders(requestHead: .init(
  422. method: "POST",
  423. scheme: "https",
  424. path: "/echo/Get",
  425. host: "foo",
  426. deadline: .distantFuture,
  427. customMetadata: [:],
  428. encoding: .disabled
  429. )).assertSuccess()
  430. // Receive acknowledgement.
  431. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  432. // Send a request.
  433. stateMachine.sendRequest(
  434. ByteBuffer(string: "Hello!"),
  435. compressed: false,
  436. allocator: self.allocator
  437. ).assertSuccess()
  438. // Close the request stream.
  439. stateMachine.sendEndOfRequestStream().assertSuccess()
  440. // Receive a response.
  441. var buffer = try self.writeMessage("Hello!")
  442. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  443. // Receive the status.
  444. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  445. }
  446. func testSimpleClientActiveFlow() throws {
  447. var stateMachine = self
  448. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  449. // Initiate the RPC
  450. stateMachine.sendRequestHeaders(requestHead: .init(
  451. method: "POST",
  452. scheme: "https",
  453. path: "/echo/Get",
  454. host: "foo",
  455. deadline: .distantFuture,
  456. customMetadata: [:],
  457. encoding: .disabled
  458. )).assertSuccess()
  459. // Receive acknowledgement.
  460. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  461. // Send some requests.
  462. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  463. .assertSuccess()
  464. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  465. .assertSuccess()
  466. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false, allocator: self.allocator)
  467. .assertSuccess()
  468. // Close the request stream.
  469. stateMachine.sendEndOfRequestStream().assertSuccess()
  470. // Receive a response.
  471. var buffer = try self.writeMessage("Hello!")
  472. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  473. // Receive the status.
  474. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  475. }
  476. func testSimpleServerActiveFlow() throws {
  477. var stateMachine = self
  478. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  479. // Initiate the RPC
  480. stateMachine.sendRequestHeaders(requestHead: .init(
  481. method: "POST",
  482. scheme: "https",
  483. path: "/echo/Get",
  484. host: "foo",
  485. deadline: .distantFuture,
  486. customMetadata: [:],
  487. encoding: .disabled
  488. )).assertSuccess()
  489. // Receive acknowledgement.
  490. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  491. // Send a request.
  492. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  493. .assertSuccess()
  494. // Close the request stream.
  495. stateMachine.sendEndOfRequestStream().assertSuccess()
  496. // Receive a response.
  497. var firstBuffer = try self.writeMessage("1")
  498. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  499. // Receive two responses in one buffer.
  500. var secondBuffer = try self.writeMessage("2")
  501. try self.writeMessage("3", into: &secondBuffer)
  502. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  503. // Receive the status.
  504. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  505. }
  506. func testSimpleBidirectionalActiveFlow() throws {
  507. var stateMachine = self
  508. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  509. // Initiate the RPC
  510. stateMachine.sendRequestHeaders(requestHead: .init(
  511. method: "POST",
  512. scheme: "https",
  513. path: "/echo/Get",
  514. host: "foo",
  515. deadline: .distantFuture,
  516. customMetadata: [:],
  517. encoding: .disabled
  518. )).assertSuccess()
  519. // Receive acknowledgement.
  520. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  521. // Interleave requests and responses:
  522. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  523. .assertSuccess()
  524. // Receive a response.
  525. var firstBuffer = try self.writeMessage("1")
  526. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  527. // Send two more requests.
  528. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  529. .assertSuccess()
  530. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false, allocator: self.allocator)
  531. .assertSuccess()
  532. // Receive two responses in one buffer.
  533. var secondBuffer = try self.writeMessage("2")
  534. try self.writeMessage("3", into: &secondBuffer)
  535. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  536. // Close the request stream.
  537. stateMachine.sendEndOfRequestStream().assertSuccess()
  538. // Receive the status.
  539. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  540. }
  541. }
  542. // MARK: - Too many requests / responses.
  543. extension GRPCClientStateMachineTests {
  544. func testSendTooManyRequestsFromClientActiveServerIdle() {
  545. for messageCount in [MessageArity.one, MessageArity.many] {
  546. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  547. writeState: .one(),
  548. pendingReadState: .init(arity: messageCount, messageEncoding: .disabled)
  549. ))
  550. // One is fine.
  551. stateMachine
  552. .sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  553. .assertSuccess()
  554. // Two is not.
  555. stateMachine
  556. .sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  557. .assertFailure {
  558. XCTAssertEqual($0, .cardinalityViolation)
  559. }
  560. }
  561. }
  562. func testSendTooManyRequestsFromActive() {
  563. for readState in [ReadState.one(), ReadState.many()] {
  564. var stateMachine = self
  565. .makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  566. // One is fine.
  567. stateMachine
  568. .sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  569. .assertSuccess()
  570. // Two is not.
  571. stateMachine
  572. .sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  573. .assertFailure {
  574. XCTAssertEqual($0, .cardinalityViolation)
  575. }
  576. }
  577. }
  578. func testSendTooManyRequestsFromClosed() {
  579. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  580. // No requests allowed!
  581. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  582. .assertFailure {
  583. XCTAssertEqual($0, .cardinalityViolation)
  584. }
  585. }
  586. func testReceiveTooManyRequests() throws {
  587. for writeState in [WriteState.one(), WriteState.many()] {
  588. var stateMachine = self
  589. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  590. // One response is fine.
  591. var firstBuffer = try self.writeMessage("foo")
  592. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  593. var secondBuffer = try self.writeMessage("bar")
  594. stateMachine.receiveResponseBuffer(&secondBuffer).assertFailure {
  595. XCTAssertEqual($0, .cardinalityViolation)
  596. }
  597. }
  598. }
  599. func testReceiveTooManyRequestsInOneBuffer() throws {
  600. for writeState in [WriteState.one(), WriteState.many()] {
  601. var stateMachine = self
  602. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  603. // Write two responses into a single buffer.
  604. var buffer = try self.writeMessage("foo")
  605. var other = try self.writeMessage("bar")
  606. buffer.writeBuffer(&other)
  607. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  608. XCTAssertEqual($0, .cardinalityViolation)
  609. }
  610. }
  611. }
  612. }
  613. // MARK: - Send Request Headers
  614. extension GRPCClientStateMachineTests {
  615. func testSendRequestHeaders() throws {
  616. var stateMachine = self
  617. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  618. stateMachine.sendRequestHeaders(requestHead: .init(
  619. method: "POST",
  620. scheme: "http",
  621. path: "/echo/Get",
  622. host: "localhost",
  623. deadline: .now() + .hours(1),
  624. customMetadata: ["x-grpc-id": "request-id"],
  625. encoding: .enabled(.init(
  626. forRequests: .identity,
  627. acceptableForResponses: [.identity],
  628. decompressionLimit: .ratio(10)
  629. ))
  630. )).assertSuccess { headers in
  631. XCTAssertEqual(headers[":method"], ["POST"])
  632. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  633. XCTAssertEqual(headers[":authority"], ["localhost"])
  634. XCTAssertEqual(headers[":scheme"], ["http"])
  635. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  636. XCTAssertEqual(headers["te"], ["trailers"])
  637. // We convert the deadline into a timeout, we can't be exactly sure what that timeout is.
  638. XCTAssertTrue(headers.contains(name: "grpc-timeout"))
  639. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  640. XCTAssertEqual(headers["grpc-encoding"], ["identity"])
  641. XCTAssertTrue(headers["grpc-accept-encoding"].contains("identity"))
  642. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  643. }
  644. }
  645. func testSendRequestHeadersNormalizesCustomMetadata() throws {
  646. // `HPACKHeaders` uses case-insensitive lookup for header names so we can't check the equality
  647. // for individual headers. We'll pull out the entries we care about by matching a sentinel value
  648. // and then compare `HPACKHeaders` instances (since the equality check *is* case sensitive).
  649. let filterKey = "a-key-for-filtering"
  650. let customMetadata: HPACKHeaders = [
  651. "partiallyLower": filterKey,
  652. "ALLUPPER": filterKey,
  653. ]
  654. var stateMachine = self
  655. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  656. stateMachine.sendRequestHeaders(requestHead: .init(
  657. method: "POST",
  658. scheme: "http",
  659. path: "/echo/Get",
  660. host: "localhost",
  661. deadline: .distantFuture,
  662. customMetadata: customMetadata,
  663. encoding: .disabled
  664. )).assertSuccess { headers in
  665. // Pull out the entries we care about by matching values
  666. let filtered = headers.filter { _, value, _ in
  667. value == filterKey
  668. }.map { name, value, _ in
  669. (name, value)
  670. }
  671. let justCustomMetadata = HPACKHeaders(filtered)
  672. let expected: HPACKHeaders = [
  673. "partiallylower": filterKey,
  674. "allupper": filterKey,
  675. ]
  676. XCTAssertEqual(justCustomMetadata, expected)
  677. }
  678. }
  679. func testSendRequestHeadersWithCustomUserAgent() throws {
  680. let customMetadata: HPACKHeaders = [
  681. "user-agent": "test-user-agent",
  682. ]
  683. var stateMachine = self
  684. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  685. stateMachine.sendRequestHeaders(requestHead: .init(
  686. method: "POST",
  687. scheme: "http",
  688. path: "/echo/Get",
  689. host: "localhost",
  690. deadline: .distantFuture,
  691. customMetadata: customMetadata,
  692. encoding: .enabled(.init(
  693. forRequests: nil,
  694. acceptableForResponses: [],
  695. decompressionLimit: .ratio(10)
  696. ))
  697. )).assertSuccess { headers in
  698. XCTAssertEqual(headers["user-agent"], ["test-user-agent"])
  699. }
  700. }
  701. func testSendRequestHeadersWithNoCompressionInEitherDirection() throws {
  702. var stateMachine = self
  703. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  704. stateMachine.sendRequestHeaders(requestHead: .init(
  705. method: "POST",
  706. scheme: "http",
  707. path: "/echo/Get",
  708. host: "localhost",
  709. deadline: .distantFuture,
  710. customMetadata: ["x-grpc-id": "request-id"],
  711. encoding: .enabled(.init(
  712. forRequests: nil,
  713. acceptableForResponses: [],
  714. decompressionLimit: .ratio(10)
  715. ))
  716. )).assertSuccess { headers in
  717. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  718. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  719. }
  720. }
  721. func testSendRequestHeadersWithNoCompressionForRequests() throws {
  722. var stateMachine = self
  723. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  724. stateMachine.sendRequestHeaders(requestHead: .init(
  725. method: "POST",
  726. scheme: "http",
  727. path: "/echo/Get",
  728. host: "localhost",
  729. deadline: .distantFuture,
  730. customMetadata: ["x-grpc-id": "request-id"],
  731. encoding: .enabled(.init(
  732. forRequests: nil,
  733. acceptableForResponses: [.identity, .gzip],
  734. decompressionLimit: .ratio(10)
  735. ))
  736. )).assertSuccess { headers in
  737. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  738. XCTAssertTrue(headers.contains(name: "grpc-accept-encoding"))
  739. }
  740. }
  741. func testSendRequestHeadersWithNoCompressionForResponses() throws {
  742. var stateMachine = self
  743. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  744. stateMachine.sendRequestHeaders(requestHead: .init(
  745. method: "POST",
  746. scheme: "http",
  747. path: "/echo/Get",
  748. host: "localhost",
  749. deadline: .distantFuture,
  750. customMetadata: ["x-grpc-id": "request-id"],
  751. encoding: .enabled(.init(
  752. forRequests: .gzip,
  753. acceptableForResponses: [],
  754. decompressionLimit: .ratio(10)
  755. ))
  756. )).assertSuccess { headers in
  757. XCTAssertEqual(headers["grpc-encoding"], ["gzip"])
  758. // This asymmetry is strange but allowed: if a client does not advertise support of the
  759. // compression it is using, the server may still process the message so long as it too
  760. // supports the compression.
  761. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  762. }
  763. }
  764. func testReceiveResponseHeadersWithOkStatus() throws {
  765. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  766. writeState: .one(),
  767. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  768. ))
  769. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  770. }
  771. func testReceiveResponseHeadersWithNotOkStatus() throws {
  772. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  773. writeState: .one(),
  774. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  775. ))
  776. let code = "\(HTTPResponseStatus.paymentRequired.code)"
  777. let headers = self.makeResponseHeaders(status: code)
  778. stateMachine.receiveResponseHeaders(headers).assertFailure {
  779. XCTAssertEqual($0, .invalidHTTPStatus(code))
  780. }
  781. }
  782. func testReceiveResponseHeadersWithoutContentType() throws {
  783. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  784. writeState: .one(),
  785. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  786. ))
  787. let headers = self.makeResponseHeaders(contentType: nil)
  788. stateMachine.receiveResponseHeaders(headers).assertFailure {
  789. XCTAssertEqual($0, .invalidContentType(nil))
  790. }
  791. }
  792. func testReceiveResponseHeadersWithInvalidContentType() throws {
  793. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  794. writeState: .one(),
  795. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  796. ))
  797. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  798. stateMachine.receiveResponseHeaders(headers).assertFailure {
  799. XCTAssertEqual($0, .invalidContentType("video/mpeg"))
  800. }
  801. }
  802. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  803. let configuration = ClientMessageEncoding.Configuration(
  804. forRequests: .none,
  805. acceptableForResponses: [.identity],
  806. decompressionLimit: .ratio(1)
  807. )
  808. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  809. writeState: .one(),
  810. pendingReadState: .init(arity: .one, messageEncoding: .enabled(configuration))
  811. ))
  812. var headers = self.makeResponseHeaders()
  813. // Identity should always be supported.
  814. headers.add(name: "grpc-encoding", value: "identity")
  815. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  816. switch stateMachine.state {
  817. case let .clientActiveServerActive(_, .reading(_, reader)):
  818. XCTAssertEqual(reader.compression?.algorithm, .identity)
  819. default:
  820. XCTFail("unexpected state \(stateMachine.state)")
  821. }
  822. }
  823. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  824. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  825. writeState: .one(),
  826. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  827. ))
  828. var headers = self.makeResponseHeaders()
  829. headers.add(name: "grpc-encoding", value: "snappy")
  830. stateMachine.receiveResponseHeaders(headers).assertFailure {
  831. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  832. }
  833. }
  834. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  835. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  836. writeState: .one(),
  837. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  838. ))
  839. var headers = self.makeResponseHeaders()
  840. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  841. stateMachine.receiveResponseHeaders(headers).assertFailure {
  842. XCTAssertEqual($0, .unsupportedMessageEncoding("not-a-known-compression-(probably)"))
  843. }
  844. }
  845. func testReceiveEndOfResponseStreamWithStatus() throws {
  846. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  847. let trailers: HPACKHeaders = ["grpc-status": "0"]
  848. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  849. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  850. XCTAssertEqual(status.message, nil)
  851. }
  852. }
  853. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  854. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  855. let trailers: HPACKHeaders = ["grpc-status": "999"]
  856. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  857. XCTAssertEqual(status.code, .unknown)
  858. }
  859. }
  860. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  861. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  862. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  863. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  864. XCTAssertEqual(status.code, .unknown)
  865. }
  866. }
  867. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  868. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  869. let trailers: HPACKHeaders = [
  870. "grpc-status": "5",
  871. "grpc-message": "foo bar 🚀",
  872. ]
  873. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  874. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  875. XCTAssertEqual(status.message, "foo bar 🚀")
  876. }
  877. }
  878. func testReceiveTrailersOnlyEndOfResponseStreamWithoutContentType() throws {
  879. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  880. writeState: .one(),
  881. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  882. ))
  883. let trailers: HPACKHeaders = [
  884. ":status": "200",
  885. "grpc-status": "5",
  886. "grpc-message": "foo bar 🚀",
  887. ]
  888. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  889. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  890. XCTAssertEqual(status.message, "foo bar 🚀")
  891. }
  892. }
  893. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidContentType() throws {
  894. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  895. writeState: .one(),
  896. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  897. ))
  898. let trailers: HPACKHeaders = [
  899. ":status": "200",
  900. "grpc-status": "5",
  901. "grpc-message": "foo bar 🚀",
  902. "content-type": "invalid",
  903. ]
  904. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  905. XCTAssertEqual(error, .invalidContentType("invalid"))
  906. }
  907. }
  908. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndValidGRPCStatus() throws {
  909. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  910. writeState: .one(),
  911. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  912. ))
  913. let trailers: HPACKHeaders = [
  914. ":status": "418",
  915. "grpc-status": "5",
  916. ]
  917. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  918. XCTAssertEqual(
  919. error,
  920. .invalidHTTPStatusWithGRPCStatus(GRPCStatus(
  921. code: GRPCStatus.Code(rawValue: 5)!,
  922. message: nil
  923. ))
  924. )
  925. }
  926. }
  927. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndNoGRPCStatus() throws {
  928. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  929. writeState: .one(),
  930. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  931. ))
  932. let trailers: HPACKHeaders = [":status": "418"]
  933. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  934. XCTAssertEqual(error, .invalidHTTPStatus("418"))
  935. }
  936. }
  937. }
  938. class ReadStateTests: GRPCTestCase {
  939. var allocator = ByteBufferAllocator()
  940. func testReadWhenNoExpectedMessages() {
  941. var state: ReadState = .notReading
  942. var buffer = self.allocator.buffer(capacity: 0)
  943. state.readMessages(&buffer).assertFailure {
  944. XCTAssertEqual($0, .cardinalityViolation)
  945. }
  946. state.assertNotReading()
  947. }
  948. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  949. // Write a message into the buffer:
  950. let message = ByteBuffer(string: "Hello!")
  951. let writer = LengthPrefixedMessageWriter(compression: .none)
  952. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  953. // And some extra junk bytes:
  954. let bytes: [UInt8] = [0x00]
  955. buffer.writeBytes(bytes)
  956. var state: ReadState = .one()
  957. state.readMessages(&buffer).assertFailure {
  958. XCTAssertEqual($0, .leftOverBytes)
  959. }
  960. state.assertNotReading()
  961. }
  962. func testReadTooManyMessagesForOneExpectedMessages() throws {
  963. // Write a message into the buffer twice:
  964. let message = ByteBuffer(string: "Hello!")
  965. let writer = LengthPrefixedMessageWriter(compression: .none)
  966. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  967. var second = try writer.write(buffer: message, allocator: self.allocator)
  968. buffer.writeBuffer(&second)
  969. var state: ReadState = .one()
  970. state.readMessages(&buffer).assertFailure {
  971. XCTAssertEqual($0, .cardinalityViolation)
  972. }
  973. state.assertNotReading()
  974. }
  975. func testReadOneMessageForOneExpectedMessages() throws {
  976. // Write a message into the buffer twice:
  977. let message = ByteBuffer(string: "Hello!")
  978. let writer = LengthPrefixedMessageWriter(compression: .none)
  979. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  980. var state: ReadState = .one()
  981. state.readMessages(&buffer).assertSuccess {
  982. XCTAssertEqual($0, [message])
  983. }
  984. // We shouldn't be able to read anymore.
  985. state.assertNotReading()
  986. }
  987. func testReadOneMessageForManyExpectedMessages() throws {
  988. // Write a message into the buffer twice:
  989. let message = ByteBuffer(string: "Hello!")
  990. let writer = LengthPrefixedMessageWriter(compression: .none)
  991. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  992. var state: ReadState = .many()
  993. state.readMessages(&buffer).assertSuccess {
  994. XCTAssertEqual($0, [message])
  995. }
  996. // We should still be able to read.
  997. state.assertReading()
  998. }
  999. func testReadManyMessagesForManyExpectedMessages() throws {
  1000. // Write a message into the buffer twice:
  1001. let message = ByteBuffer(string: "Hello!")
  1002. let writer = LengthPrefixedMessageWriter(compression: .none)
  1003. var first = try writer.write(buffer: message, allocator: self.allocator)
  1004. var second = try writer.write(buffer: message, allocator: self.allocator)
  1005. var third = try writer.write(buffer: message, allocator: self.allocator)
  1006. first.writeBuffer(&second)
  1007. first.writeBuffer(&third)
  1008. var state: ReadState = .many()
  1009. state.readMessages(&first).assertSuccess {
  1010. XCTAssertEqual($0, [message, message, message])
  1011. }
  1012. // We should still be able to read.
  1013. state.assertReading()
  1014. }
  1015. }
  1016. // MARK: Result helpers
  1017. extension Result {
  1018. /// Asserts the `Result` was a success.
  1019. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  1020. switch self {
  1021. case let .success(success):
  1022. do {
  1023. try verify(success)
  1024. } catch {
  1025. XCTFail("verify threw: \(error)")
  1026. }
  1027. case let .failure(error):
  1028. XCTFail("unexpected .failure: \(error)")
  1029. }
  1030. }
  1031. /// Asserts the `Result` was a failure.
  1032. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  1033. switch self {
  1034. case let .success(success):
  1035. XCTFail("unexpected .success: \(success)")
  1036. case let .failure(error):
  1037. do {
  1038. try verify(error)
  1039. } catch {
  1040. XCTFail("verify threw: \(error)")
  1041. }
  1042. }
  1043. }
  1044. }
  1045. // MARK: ReadState, PendingWriteState, and WriteState helpers
  1046. extension ReadState {
  1047. static func one() -> ReadState {
  1048. let reader = LengthPrefixedMessageReader()
  1049. return .reading(.one, reader)
  1050. }
  1051. static func many() -> ReadState {
  1052. let reader = LengthPrefixedMessageReader()
  1053. return .reading(.many, reader)
  1054. }
  1055. func assertReading() {
  1056. switch self {
  1057. case .reading:
  1058. ()
  1059. case .notReading:
  1060. XCTFail("unexpected state .notReading")
  1061. }
  1062. }
  1063. func assertNotReading() {
  1064. switch self {
  1065. case .reading:
  1066. XCTFail("unexpected state .reading")
  1067. case .notReading:
  1068. ()
  1069. }
  1070. }
  1071. }
  1072. extension PendingWriteState {
  1073. static func one() -> PendingWriteState {
  1074. return .init(arity: .one, contentType: .protobuf)
  1075. }
  1076. static func many() -> PendingWriteState {
  1077. return .init(arity: .many, contentType: .protobuf)
  1078. }
  1079. }
  1080. extension WriteState {
  1081. static func one() -> WriteState {
  1082. return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  1083. }
  1084. static func many() -> WriteState {
  1085. return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  1086. }
  1087. }