GRPCClientStateMachineTests.swift 33 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. @testable import GRPC
  18. import EchoModel
  19. import Logging
  20. import NIOHTTP1
  21. import NIOHPACK
  22. import NIO
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine<Request, Response>
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage<T: Message>(_ message: T) throws -> ByteBuffer {
  35. var buffer = self.allocator.buffer(capacity: 0)
  36. try self.writeMessage(message, into: &buffer)
  37. return buffer
  38. }
  39. /// Writes the given messages into a new `ByteBuffer` (with length-prefixing).
  40. func writeMessages<T: Message>(_ messages: T...) throws -> ByteBuffer {
  41. var buffer = self.allocator.buffer(capacity: 0)
  42. try messages.forEach {
  43. try self.writeMessage($0, into: &buffer)
  44. }
  45. return buffer
  46. }
  47. /// Writes a message into the given `buffer`.
  48. func writeMessage<T: Message>(_ message: T, into buffer: inout ByteBuffer) throws {
  49. let messageData = try message.serializedData()
  50. let writer = LengthPrefixedMessageWriter(compression: .none)
  51. writer.write(messageData, into: &buffer)
  52. }
  53. /// Returns a minimally valid `HPACKHeaders` for a response.
  54. func makeResponseHeaders(
  55. status: String? = "200",
  56. contentType: String? = "application/grpc+proto"
  57. ) -> HPACKHeaders {
  58. var headers: HPACKHeaders = [:]
  59. status.map { headers.add(name: ":status", value: $0) }
  60. contentType.map { headers.add(name: "content-type", value: $0) }
  61. return headers
  62. }
  63. }
  64. // MARK: - Send Request Headers
  65. extension GRPCClientStateMachineTests {
  66. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  67. var stateMachine = self.makeStateMachine(state)
  68. stateMachine.sendRequestHeaders(requestHead: .init(
  69. method: "POST",
  70. scheme: "http",
  71. path: "/echo/Get",
  72. host: "host",
  73. timeout: .infinite,
  74. customMetadata: [:]
  75. )).assertFailure {
  76. XCTAssertEqual($0, .invalidState)
  77. }
  78. }
  79. func testSendRequestHeadersFromIdle() {
  80. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  81. stateMachine.sendRequestHeaders(requestHead: .init(
  82. method: "POST",
  83. scheme: "http",
  84. path: "/echo/Get",
  85. host: "host",
  86. timeout: .infinite,
  87. customMetadata: [:]
  88. )).assertSuccess()
  89. }
  90. func testSendRequestHeadersFromClientActiveServerIdle() {
  91. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  92. }
  93. func testSendRequestHeadersFromClientClosedServerIdle() {
  94. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerIdle(readArity: .one))
  95. }
  96. func testSendRequestHeadersFromActive() {
  97. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(writeState: .one(), readState: .one()))
  98. }
  99. func testSendRequestHeadersFromClientClosedServerActive() {
  100. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  101. }
  102. func testSendRequestHeadersFromClosed() {
  103. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  104. }
  105. }
  106. // MARK: - Send Request
  107. extension GRPCClientStateMachineTests {
  108. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  109. var stateMachine = self.makeStateMachine(state)
  110. stateMachine.sendRequest(.init(text: "Hello!"), allocator: self.allocator).assertFailure {
  111. XCTAssertEqual($0, expected)
  112. }
  113. }
  114. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  115. var stateMachine = self.makeStateMachine(state)
  116. let request: Request = .with { $0.text = "Hello!" }
  117. stateMachine.sendRequest(request, allocator: self.allocator).assertSuccess() { buffer in
  118. var buffer = buffer
  119. // Remove the length and compression flag prefix.
  120. buffer.moveReaderIndex(forwardBy: 5)
  121. let data = buffer.readData(length: buffer.readableBytes)!
  122. XCTAssertEqual(request, try Request(serializedData: data))
  123. }
  124. }
  125. func testSendRequestFromIdle() {
  126. self.doTestSendRequestFromInvalidState(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one), expected: .invalidState)
  127. }
  128. func testSendRequestFromClientActiveServerIdle() {
  129. self.doTestSendRequestFromValidState(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  130. }
  131. func testSendRequestFromClientClosedServerIdle() {
  132. self.doTestSendRequestFromInvalidState(.clientClosedServerIdle(readArity: .one), expected: .cardinalityViolation)
  133. }
  134. func testSendRequestFromActive() {
  135. self.doTestSendRequestFromValidState(.clientActiveServerActive(writeState: .one(), readState: .one()))
  136. }
  137. func testSendRequestFromClientClosedServerActive() {
  138. self.doTestSendRequestFromInvalidState(.clientClosedServerIdle(readArity: .one), expected: .cardinalityViolation)
  139. }
  140. func testSendRequestFromClosed() {
  141. self.doTestSendRequestFromInvalidState(.clientClosedServerClosed, expected: .cardinalityViolation)
  142. }
  143. }
  144. // MARK: - Send End of Request Stream
  145. extension GRPCClientStateMachineTests {
  146. func doTestSendEndOfRequestStreamFromInvalidState(
  147. _ state: StateMachine.State,
  148. expected: SendEndOfRequestStreamError
  149. ) {
  150. var stateMachine = self.makeStateMachine(state)
  151. stateMachine.sendEndOfRequestStream().assertFailure {
  152. XCTAssertEqual($0, expected)
  153. }
  154. }
  155. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  156. var stateMachine = self.makeStateMachine(state)
  157. stateMachine.sendEndOfRequestStream().assertSuccess()
  158. }
  159. func testSendEndOfRequestStreamFromIdle() {
  160. self.doTestSendEndOfRequestStreamFromInvalidState(
  161. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  162. expected: .invalidState
  163. )
  164. }
  165. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  166. self.doTestSendEndOfRequestStreamFromValidState(
  167. .clientActiveServerIdle(writeState: .one(), readArity: .one)
  168. )
  169. }
  170. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  171. self.doTestSendEndOfRequestStreamFromInvalidState(
  172. .clientClosedServerIdle(readArity: .one),
  173. expected: .alreadyClosed
  174. )
  175. }
  176. func testSendEndOfRequestStreamFromActive() {
  177. self.doTestSendEndOfRequestStreamFromValidState(
  178. .clientActiveServerActive(writeState: .one(), readState: .one())
  179. )
  180. }
  181. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  182. self.doTestSendEndOfRequestStreamFromInvalidState(
  183. .clientClosedServerActive(readState: .one()),
  184. expected: .alreadyClosed
  185. )
  186. }
  187. func testSendEndOfRequestStreamFromClosed() {
  188. self.doTestSendEndOfRequestStreamFromInvalidState(
  189. .clientClosedServerClosed,
  190. expected: .alreadyClosed
  191. )
  192. }
  193. }
  194. // MARK: - Receive Response Headers
  195. extension GRPCClientStateMachineTests {
  196. func doTestReceiveResponseHeadersFromInvalidState(
  197. _ state: StateMachine.State,
  198. expected: ReceiveResponseHeadError
  199. ) {
  200. var stateMachine = self.makeStateMachine(state)
  201. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  202. XCTAssertEqual($0, expected)
  203. }
  204. }
  205. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  206. var stateMachine = self.makeStateMachine(state)
  207. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  208. }
  209. func testReceiveResponseHeadersFromIdle() {
  210. self.doTestReceiveResponseHeadersFromInvalidState(
  211. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  212. expected: .invalidState
  213. )
  214. }
  215. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  216. self.doTestReceiveResponseHeadersFromValidState(
  217. .clientActiveServerIdle(writeState: .one(), readArity: .one)
  218. )
  219. }
  220. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  221. self.doTestReceiveResponseHeadersFromValidState(
  222. .clientClosedServerIdle(readArity: .one)
  223. )
  224. }
  225. func testReceiveResponseHeadersFromActive() {
  226. self.doTestReceiveResponseHeadersFromInvalidState(
  227. .clientActiveServerActive(writeState: .one(), readState: .one()),
  228. expected: .invalidState
  229. )
  230. }
  231. func testReceiveResponseHeadersFromClientClosedServerActive() {
  232. self.doTestReceiveResponseHeadersFromInvalidState(
  233. .clientClosedServerActive(readState: .one()),
  234. expected: .invalidState
  235. )
  236. }
  237. func testReceiveResponseHeadersFromClosed() {
  238. self.doTestReceiveResponseHeadersFromInvalidState(
  239. .clientClosedServerClosed,
  240. expected: .invalidState
  241. )
  242. }
  243. }
  244. // MARK: - Receive Response
  245. extension GRPCClientStateMachineTests {
  246. func doTestReceiveResponseFromInvalidState(
  247. _ state: StateMachine.State,
  248. expected: MessageReadError
  249. ) throws {
  250. var stateMachine = self.makeStateMachine(state)
  251. let message = Response.with { $0.text = "Hello!" }
  252. var buffer = try self.writeMessage(message)
  253. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  254. XCTAssertEqual($0, expected)
  255. }
  256. }
  257. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  258. var stateMachine = self.makeStateMachine(state)
  259. let message = Response.with { $0.text = "Hello!" }
  260. var buffer = try self.writeMessage(message)
  261. stateMachine.receiveResponseBuffer(&buffer).assertSuccess { messages in
  262. XCTAssertEqual(messages, [message])
  263. }
  264. }
  265. func testReceiveResponseFromIdle() throws {
  266. try self.doTestReceiveResponseFromInvalidState(
  267. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  268. expected: .invalidState
  269. )
  270. }
  271. func testReceiveResponseFromClientActiveServerIdle() throws {
  272. try self.doTestReceiveResponseFromInvalidState(
  273. .clientActiveServerIdle(writeState: .one(), readArity: .one),
  274. expected: .invalidState
  275. )
  276. }
  277. func testReceiveResponseFromClientClosedServerIdle() throws {
  278. try self.doTestReceiveResponseFromInvalidState(
  279. .clientClosedServerIdle(readArity: .one),
  280. expected: .invalidState
  281. )
  282. }
  283. func testReceiveResponseFromActive() throws {
  284. try self.doTestReceiveResponseFromValidState(
  285. .clientActiveServerActive(writeState: .one(), readState: .one())
  286. )
  287. }
  288. func testReceiveResponseFromClientClosedServerActive() throws {
  289. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  290. }
  291. func testReceiveResponseFromClosed() throws {
  292. try self.doTestReceiveResponseFromInvalidState(
  293. .clientClosedServerClosed,
  294. expected: .invalidState
  295. )
  296. }
  297. }
  298. // MARK: - Receive End of Response Stream
  299. extension GRPCClientStateMachineTests {
  300. func doTestReceiveEndOfResponseStreamFromInvalidState(
  301. _ state: StateMachine.State,
  302. expected: ReceiveEndOfResponseStreamError
  303. ) {
  304. var stateMachine = self.makeStateMachine(state)
  305. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  306. }
  307. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  308. var stateMachine = self.makeStateMachine(state)
  309. var trailers: HPACKHeaders = [
  310. GRPCHeaderName.statusCode: "0",
  311. GRPCHeaderName.statusMessage: "ok"
  312. ]
  313. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  314. // content-type to make a valid set of trailers.
  315. switch state {
  316. case .clientActiveServerIdle,
  317. .clientClosedServerIdle:
  318. trailers.add(name: ":status", value: "200")
  319. trailers.add(name: "content-type", value: "application/grpc+proto")
  320. default:
  321. break
  322. }
  323. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  324. XCTAssertEqual(status.code, .ok)
  325. XCTAssertEqual(status.message, "ok")
  326. }
  327. }
  328. func testReceiveEndOfResponseStreamFromIdle() {
  329. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  330. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  331. expected: .invalidState
  332. )
  333. }
  334. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  335. self.doTestReceiveEndOfResponseStreamFromValidState(
  336. .clientActiveServerIdle(writeState: .one(), readArity: .one)
  337. )
  338. }
  339. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  340. self.doTestReceiveEndOfResponseStreamFromValidState(
  341. .clientClosedServerIdle(readArity: .one)
  342. )
  343. }
  344. func testReceiveEndOfResponseStreamFromActive() {
  345. self.doTestReceiveEndOfResponseStreamFromValidState(
  346. .clientActiveServerActive(writeState: .one(), readState: .one())
  347. )
  348. }
  349. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  350. self.doTestReceiveEndOfResponseStreamFromValidState(
  351. .clientClosedServerActive(readState: .one())
  352. )
  353. }
  354. func testReceiveEndOfResponseStreamFromClosed() {
  355. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  356. .clientClosedServerClosed,
  357. expected: .invalidState
  358. )
  359. }
  360. }
  361. // MARK: - Basic RPC flow.
  362. extension GRPCClientStateMachineTests {
  363. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  364. var headers = HPACKHeaders()
  365. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  366. if let message = message {
  367. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  368. }
  369. return headers
  370. }
  371. func testSimpleUnaryFlow() throws {
  372. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  373. // Initiate the RPC
  374. stateMachine.sendRequestHeaders(requestHead: .init(
  375. method: "POST",
  376. scheme: "https",
  377. path: "/echo/Get",
  378. host: "foo",
  379. timeout: .infinite,
  380. customMetadata: [:]
  381. )).assertSuccess()
  382. // Receive acknowledgement.
  383. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  384. // Send a request.
  385. stateMachine.sendRequest(.with { $0.text = "Hello!" }, allocator: self.allocator).assertSuccess()
  386. // Close the request stream.
  387. stateMachine.sendEndOfRequestStream().assertSuccess()
  388. // Receive a response.
  389. var buffer = try self.writeMessage(Response.with { $0.text = "Hello!" })
  390. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  391. // Receive the status.
  392. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  393. }
  394. func testSimpleClientActiveFlow() throws {
  395. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  396. // Initiate the RPC
  397. stateMachine.sendRequestHeaders(requestHead: .init(
  398. method: "POST",
  399. scheme: "https",
  400. path: "/echo/Get",
  401. host: "foo",
  402. timeout: .infinite,
  403. customMetadata: [:]
  404. )).assertSuccess()
  405. // Receive acknowledgement.
  406. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  407. // Send some requests.
  408. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  409. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertSuccess()
  410. stateMachine.sendRequest(.with { $0.text = "3" }, allocator: self.allocator).assertSuccess()
  411. // Close the request stream.
  412. stateMachine.sendEndOfRequestStream().assertSuccess()
  413. // Receive a response.
  414. var buffer = try self.writeMessage(Response.with { $0.text = "Hello!" })
  415. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  416. // Receive the status.
  417. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  418. }
  419. func testSimpleServerActiveFlow() throws {
  420. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  421. // Initiate the RPC
  422. stateMachine.sendRequestHeaders(requestHead: .init(
  423. method: "POST",
  424. scheme: "https",
  425. path: "/echo/Get",
  426. host: "foo",
  427. timeout: .infinite,
  428. customMetadata: [:]
  429. )).assertSuccess()
  430. // Receive acknowledgement.
  431. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  432. // Send a request.
  433. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  434. // Close the request stream.
  435. stateMachine.sendEndOfRequestStream().assertSuccess()
  436. // Receive a response.
  437. var firstBuffer = try self.writeMessage(Response.with { $0.text = "1" })
  438. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  439. // Receive two responses in one buffer.
  440. var secondBuffer = try self.writeMessage(Response.with { $0.text = "2" })
  441. try self.writeMessage(Response.with { $0.text = "3" }, into: &secondBuffer)
  442. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  443. // Receive the status.
  444. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  445. }
  446. func testSimpleBidirectionalActiveFlow() throws {
  447. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  448. // Initiate the RPC
  449. stateMachine.sendRequestHeaders(requestHead: .init(
  450. method: "POST",
  451. scheme: "https",
  452. path: "/echo/Get",
  453. host: "foo",
  454. timeout: .infinite,
  455. customMetadata: [:]
  456. )).assertSuccess()
  457. // Receive acknowledgement.
  458. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  459. // Interleave requests and responses:
  460. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  461. // Receive a response.
  462. var firstBuffer = try self.writeMessage(Response.with { $0.text = "1" })
  463. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  464. // Send two more requests.
  465. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertSuccess()
  466. stateMachine.sendRequest(.with { $0.text = "3" }, allocator: self.allocator).assertSuccess()
  467. // Receive two responses in one buffer.
  468. var secondBuffer = try self.writeMessage(Response.with { $0.text = "2" })
  469. try self.writeMessage(Response.with { $0.text = "3" }, into: &secondBuffer)
  470. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  471. // Close the request stream.
  472. stateMachine.sendEndOfRequestStream().assertSuccess()
  473. // Receive the status.
  474. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  475. }
  476. }
  477. // MARK: - Too many requests / responses.
  478. extension GRPCClientStateMachineTests {
  479. func testSendTooManyRequestsFromClientActiveServerIdle() {
  480. for messageCount in [MessageArity.one, MessageArity.many] {
  481. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: messageCount))
  482. // One is fine.
  483. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  484. // Two is not.
  485. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertFailure {
  486. XCTAssertEqual($0, .cardinalityViolation)
  487. }
  488. }
  489. }
  490. func testSendTooManyRequestsFromActive() {
  491. for readState in [ReadState.one(), ReadState.many()] {
  492. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  493. // One is fine.
  494. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  495. // Two is not.
  496. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertFailure {
  497. XCTAssertEqual($0, .cardinalityViolation)
  498. }
  499. }
  500. }
  501. func testSendTooManyRequestsFromClosed() {
  502. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  503. // No requests allowed!
  504. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertFailure {
  505. XCTAssertEqual($0, .cardinalityViolation)
  506. }
  507. }
  508. func testReceiveTooManyRequests() throws {
  509. for writeState in [WriteState.one(), WriteState.many()] {
  510. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  511. let response = Response.with { $0.text = "foo" }
  512. // One response is fine.
  513. var firstBuffer = try self.writeMessage(response)
  514. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  515. var secondBuffer = try self.writeMessage(response)
  516. stateMachine.receiveResponseBuffer(&secondBuffer).assertFailure {
  517. XCTAssertEqual($0, .cardinalityViolation)
  518. }
  519. }
  520. }
  521. func testReceiveTooManyRequestsInOneBuffer() throws {
  522. for writeState in [WriteState.one(), WriteState.many()] {
  523. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  524. // Write two responses into a single buffer.
  525. let response = Response.with { $0.text = "foo" }
  526. var buffer = try self.writeMessages(response, response)
  527. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  528. XCTAssertEqual($0, .cardinalityViolation)
  529. }
  530. }
  531. }
  532. }
  533. // MARK: - Send Request Headers
  534. extension GRPCClientStateMachineTests {
  535. func testSendRequestHeaders() throws {
  536. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  537. stateMachine.sendRequestHeaders(requestHead: .init(
  538. method: "POST",
  539. scheme: "http",
  540. path: "/echo/Get",
  541. host: "localhost",
  542. timeout: .hours(rounding: 1),
  543. customMetadata: ["x-grpc-id": "request-id"]
  544. )).assertSuccess { headers in
  545. XCTAssertEqual(headers[":method"], ["POST"])
  546. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  547. XCTAssertEqual(headers[":authority"], ["localhost"])
  548. XCTAssertEqual(headers[":scheme"], ["http"])
  549. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  550. XCTAssertEqual(headers["te"], ["trailers"])
  551. XCTAssertEqual(headers["grpc-timeout"], ["1H"])
  552. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  553. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  554. }
  555. }
  556. func testReceiveResponseHeadersWithOkStatus() throws {
  557. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  558. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  559. }
  560. func testReceiveResponseHeadersWithNotOkStatus() throws {
  561. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  562. let headers = self.makeResponseHeaders(status: "\(HTTPResponseStatus.paymentRequired.code)")
  563. stateMachine.receiveResponseHeaders(headers).assertFailure {
  564. XCTAssertEqual($0, .invalidHTTPStatus(.paymentRequired))
  565. }
  566. }
  567. func testReceiveResponseHeadersWithoutContentType() throws {
  568. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  569. let headers = self.makeResponseHeaders(contentType: nil)
  570. stateMachine.receiveResponseHeaders(headers).assertFailure {
  571. XCTAssertEqual($0, .invalidContentType)
  572. }
  573. }
  574. func testReceiveResponseHeadersWithInvalidContentType() throws {
  575. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  576. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  577. stateMachine.receiveResponseHeaders(headers).assertFailure {
  578. XCTAssertEqual($0, .invalidContentType)
  579. }
  580. }
  581. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  582. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  583. var headers = self.makeResponseHeaders()
  584. // Identity should always be supported.
  585. headers.add(name: "grpc-encoding", value: "identity")
  586. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  587. switch stateMachine.state {
  588. case let .clientActiveServerActive(_, .reading(_, reader)):
  589. XCTAssertEqual(reader.compressionMechanism, .identity)
  590. default:
  591. XCTFail("unexpected state \(stateMachine.state)")
  592. }
  593. }
  594. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  595. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  596. var headers = self.makeResponseHeaders()
  597. headers.add(name: "grpc-encoding", value: "snappy")
  598. stateMachine.receiveResponseHeaders(headers).assertFailure {
  599. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  600. }
  601. }
  602. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  603. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  604. var headers = self.makeResponseHeaders()
  605. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  606. stateMachine.receiveResponseHeaders(headers).assertFailure {
  607. XCTAssertEqual($0, .unsupportedMessageEncoding("unknown"))
  608. }
  609. }
  610. func testReceiveEndOfResponseStreamWithStatus() throws {
  611. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  612. let trailers: HPACKHeaders = ["grpc-status": "0"]
  613. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  614. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  615. XCTAssertEqual(status.message, nil)
  616. }
  617. }
  618. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  619. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  620. let trailers: HPACKHeaders = ["grpc-status": "999"]
  621. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  622. XCTAssertEqual(status.code, .unknown)
  623. }
  624. }
  625. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  626. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  627. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  628. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  629. XCTAssertEqual(status.code, .unknown)
  630. }
  631. }
  632. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  633. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  634. let trailers: HPACKHeaders = [
  635. "grpc-status": "5",
  636. "grpc-message": "foo bar 🚀"
  637. ]
  638. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  639. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  640. XCTAssertEqual(status.message, "foo bar 🚀")
  641. }
  642. }
  643. }
  644. class ReadStateTests: GRPCTestCase {
  645. var allocator = ByteBufferAllocator()
  646. func testReadWhenNoExpectedMessages() {
  647. var state: ReadState = .notReading
  648. var buffer = self.allocator.buffer(capacity: 0)
  649. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  650. XCTAssertEqual($0, .cardinalityViolation)
  651. }
  652. state.assertNotReading()
  653. }
  654. func testReadWhenBufferContainsLengthPrefixedJunk() {
  655. var state: ReadState = .many()
  656. var buffer = self.allocator.buffer(capacity: 9)
  657. let bytes: [UInt8] = [
  658. 0x00, // compression flag
  659. 0x00, 0x00, 0x00, 0x04, // message length
  660. 0xaa, 0xbb, 0xcc, 0xdd // message
  661. ]
  662. buffer.writeBytes(bytes)
  663. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  664. XCTAssertEqual($0, .deserializationFailed)
  665. }
  666. state.assertNotReading()
  667. }
  668. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  669. // Write a message into the buffer:
  670. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  671. let writer = LengthPrefixedMessageWriter(compression: .none)
  672. var buffer = self.allocator.buffer(capacity: 0)
  673. writer.write(try message.serializedData(), into: &buffer)
  674. // And some extra junk bytes:
  675. let bytes: [UInt8] = [0x00]
  676. buffer.writeBytes(bytes)
  677. var state: ReadState = .one()
  678. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  679. XCTAssertEqual($0, .leftOverBytes)
  680. }
  681. state.assertNotReading()
  682. }
  683. func testReadTooManyMessagesForOneExpectedMessages() throws {
  684. // Write a message into the buffer twice:
  685. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  686. let writer = LengthPrefixedMessageWriter(compression: .none)
  687. var buffer = self.allocator.buffer(capacity: 0)
  688. writer.write(try message.serializedData(), into: &buffer)
  689. writer.write(try message.serializedData(), into: &buffer)
  690. var state: ReadState = .one()
  691. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  692. XCTAssertEqual($0, .cardinalityViolation)
  693. }
  694. state.assertNotReading()
  695. }
  696. func testReadOneMessageForOneExpectedMessages() throws {
  697. // Write a message into the buffer twice:
  698. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  699. let writer = LengthPrefixedMessageWriter(compression: .none)
  700. var buffer = self.allocator.buffer(capacity: 0)
  701. writer.write(try message.serializedData(), into: &buffer)
  702. var state: ReadState = .one()
  703. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
  704. XCTAssertEqual($0, [message])
  705. }
  706. // We shouldn't be able to read anymore.
  707. state.assertNotReading()
  708. }
  709. func testReadOneMessageForManyExpectedMessages() throws {
  710. // Write a message into the buffer twice:
  711. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  712. let writer = LengthPrefixedMessageWriter(compression: .none)
  713. var buffer = self.allocator.buffer(capacity: 0)
  714. writer.write(try message.serializedData(), into: &buffer)
  715. var state: ReadState = .many()
  716. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
  717. XCTAssertEqual($0, [message])
  718. }
  719. // We should still be able to read.
  720. state.assertReading()
  721. }
  722. func testReadManyMessagesForManyExpectedMessages() throws {
  723. // Write a message into the buffer twice:
  724. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  725. let writer = LengthPrefixedMessageWriter(compression: .none)
  726. var buffer = self.allocator.buffer(capacity: 0)
  727. writer.write(try message.serializedData(), into: &buffer)
  728. writer.write(try message.serializedData(), into: &buffer)
  729. writer.write(try message.serializedData(), into: &buffer)
  730. var state: ReadState = .many()
  731. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
  732. XCTAssertEqual($0, [message, message, message])
  733. }
  734. // We should still be able to read.
  735. state.assertReading()
  736. }
  737. }
  738. // MARK: Result helpers
  739. extension Result {
  740. /// Asserts the `Result` was a success.
  741. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  742. switch self {
  743. case .success(let success):
  744. do {
  745. try verify(success)
  746. } catch {
  747. XCTFail("verify threw: \(error)")
  748. }
  749. case .failure(let error):
  750. XCTFail("unexpected .failure: \(error)")
  751. }
  752. }
  753. /// Asserts the `Result` was a failure.
  754. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  755. switch self {
  756. case .success(let success):
  757. XCTFail("unexpected .success: \(success)")
  758. case .failure(let error):
  759. do {
  760. try verify(error)
  761. } catch {
  762. XCTFail("verify threw: \(error)")
  763. }
  764. }
  765. }
  766. }
  767. // MARK: ReadState, PendingWriteState, and WriteState helpers
  768. extension ReadState {
  769. static func one() -> ReadState {
  770. let reader = LengthPrefixedMessageReader(
  771. mode: .client,
  772. compressionMechanism: .none
  773. )
  774. return .reading(.one, reader)
  775. }
  776. static func many() -> ReadState {
  777. let reader = LengthPrefixedMessageReader(
  778. mode: .client,
  779. compressionMechanism: .none
  780. )
  781. return .reading(.many, reader)
  782. }
  783. func assertReading() {
  784. switch self {
  785. case .reading:
  786. ()
  787. case .notReading:
  788. XCTFail("unexpected state .notReading")
  789. }
  790. }
  791. func assertNotReading() {
  792. switch self {
  793. case .reading:
  794. XCTFail("unexpected state .reading")
  795. case .notReading:
  796. ()
  797. }
  798. }
  799. }
  800. extension PendingWriteState {
  801. static func one() -> PendingWriteState {
  802. return .init(arity: .one, compression: .none, contentType: .protobuf)
  803. }
  804. static func many() -> PendingWriteState {
  805. return .init(arity: .many, compression: .none, contentType: .protobuf)
  806. }
  807. }
  808. extension WriteState {
  809. static func one() -> WriteState {
  810. return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  811. }
  812. static func many() -> WriteState {
  813. return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  814. }
  815. }