GRPCClientStateMachineTests.swift 40 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. @testable import GRPC
  18. import EchoModel
  19. import Logging
  20. import NIOHTTP1
  21. import NIOHPACK
  22. import NIO
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage(_ message: String) throws -> ByteBuffer {
  35. let buffer = self.allocator.buffer(string: message)
  36. let writer = LengthPrefixedMessageWriter(compression: .none)
  37. return try writer.write(buffer: buffer, allocator: self.allocator, compressed: false)
  38. }
  39. /// Writes a message into the given `buffer`.
  40. func writeMessage(_ message: String, into buffer: inout ByteBuffer) throws {
  41. var other = try self.writeMessage(message)
  42. buffer.writeBuffer(&other)
  43. }
  44. /// Returns a minimally valid `HPACKHeaders` for a response.
  45. func makeResponseHeaders(
  46. status: String? = "200",
  47. contentType: String? = "application/grpc+proto"
  48. ) -> HPACKHeaders {
  49. var headers: HPACKHeaders = [:]
  50. status.map { headers.add(name: ":status", value: $0) }
  51. contentType.map { headers.add(name: "content-type", value: $0) }
  52. return headers
  53. }
  54. }
  55. // MARK: - Send Request Headers
  56. extension GRPCClientStateMachineTests {
  57. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  58. var stateMachine = self.makeStateMachine(state)
  59. stateMachine.sendRequestHeaders(requestHead: .init(
  60. method: "POST",
  61. scheme: "http",
  62. path: "/echo/Get",
  63. host: "host",
  64. deadline: .distantFuture,
  65. customMetadata: [:],
  66. encoding: .disabled
  67. )).assertFailure {
  68. XCTAssertEqual($0, .invalidState)
  69. }
  70. }
  71. func testSendRequestHeadersFromIdle() {
  72. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  73. stateMachine.sendRequestHeaders(requestHead: .init(
  74. method: "POST",
  75. scheme: "http",
  76. path: "/echo/Get",
  77. host: "host",
  78. deadline: .distantFuture,
  79. customMetadata: [:],
  80. encoding: .disabled
  81. )).assertSuccess()
  82. }
  83. func testSendRequestHeadersFromClientActiveServerIdle() {
  84. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  85. }
  86. func testSendRequestHeadersFromClientClosedServerIdle() {
  87. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  88. }
  89. func testSendRequestHeadersFromActive() {
  90. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(writeState: .one(), readState: .one()))
  91. }
  92. func testSendRequestHeadersFromClientClosedServerActive() {
  93. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  94. }
  95. func testSendRequestHeadersFromClosed() {
  96. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  97. }
  98. }
  99. // MARK: - Send Request
  100. extension GRPCClientStateMachineTests {
  101. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  102. var stateMachine = self.makeStateMachine(state)
  103. stateMachine.sendRequest(ByteBuffer(string: "Hello!"), compressed: false, allocator: self.allocator).assertFailure {
  104. XCTAssertEqual($0, expected)
  105. }
  106. }
  107. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  108. var stateMachine = self.makeStateMachine(state)
  109. let request = "Hello!"
  110. stateMachine.sendRequest(ByteBuffer(string: request), compressed: false, allocator: self.allocator).assertSuccess() { buffer in
  111. var buffer = buffer
  112. // Remove the length and compression flag prefix.
  113. buffer.moveReaderIndex(forwardBy: 5)
  114. let data = buffer.readString(length: buffer.readableBytes)!
  115. XCTAssertEqual(request, data)
  116. }
  117. }
  118. func testSendRequestFromIdle() {
  119. self.doTestSendRequestFromInvalidState(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one), expected: .invalidState)
  120. }
  121. func testSendRequestFromClientActiveServerIdle() {
  122. self.doTestSendRequestFromValidState(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  123. }
  124. func testSendRequestFromClientClosedServerIdle() {
  125. self.doTestSendRequestFromInvalidState(.clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)), expected: .cardinalityViolation)
  126. }
  127. func testSendRequestFromActive() {
  128. self.doTestSendRequestFromValidState(.clientActiveServerActive(writeState: .one(), readState: .one()))
  129. }
  130. func testSendRequestFromClientClosedServerActive() {
  131. self.doTestSendRequestFromInvalidState(.clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)), expected: .cardinalityViolation)
  132. }
  133. func testSendRequestFromClosed() {
  134. self.doTestSendRequestFromInvalidState(.clientClosedServerClosed, expected: .cardinalityViolation)
  135. }
  136. }
  137. // MARK: - Send End of Request Stream
  138. extension GRPCClientStateMachineTests {
  139. func doTestSendEndOfRequestStreamFromInvalidState(
  140. _ state: StateMachine.State,
  141. expected: SendEndOfRequestStreamError
  142. ) {
  143. var stateMachine = self.makeStateMachine(state)
  144. stateMachine.sendEndOfRequestStream().assertFailure {
  145. XCTAssertEqual($0, expected)
  146. }
  147. }
  148. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  149. var stateMachine = self.makeStateMachine(state)
  150. stateMachine.sendEndOfRequestStream().assertSuccess()
  151. }
  152. func testSendEndOfRequestStreamFromIdle() {
  153. self.doTestSendEndOfRequestStreamFromInvalidState(
  154. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  155. expected: .invalidState
  156. )
  157. }
  158. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  159. self.doTestSendEndOfRequestStreamFromValidState(
  160. .clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  161. )
  162. }
  163. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  164. self.doTestSendEndOfRequestStreamFromInvalidState(
  165. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  166. expected: .alreadyClosed
  167. )
  168. }
  169. func testSendEndOfRequestStreamFromActive() {
  170. self.doTestSendEndOfRequestStreamFromValidState(
  171. .clientActiveServerActive(writeState: .one(), readState: .one())
  172. )
  173. }
  174. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  175. self.doTestSendEndOfRequestStreamFromInvalidState(
  176. .clientClosedServerActive(readState: .one()),
  177. expected: .alreadyClosed
  178. )
  179. }
  180. func testSendEndOfRequestStreamFromClosed() {
  181. self.doTestSendEndOfRequestStreamFromInvalidState(
  182. .clientClosedServerClosed,
  183. expected: .alreadyClosed
  184. )
  185. }
  186. }
  187. // MARK: - Receive Response Headers
  188. extension GRPCClientStateMachineTests {
  189. func doTestReceiveResponseHeadersFromInvalidState(
  190. _ state: StateMachine.State,
  191. expected: ReceiveResponseHeadError
  192. ) {
  193. var stateMachine = self.makeStateMachine(state)
  194. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  195. XCTAssertEqual($0, expected)
  196. }
  197. }
  198. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  199. var stateMachine = self.makeStateMachine(state)
  200. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  201. }
  202. func testReceiveResponseHeadersFromIdle() {
  203. self.doTestReceiveResponseHeadersFromInvalidState(
  204. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  205. expected: .invalidState
  206. )
  207. }
  208. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  209. self.doTestReceiveResponseHeadersFromValidState(
  210. .clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  211. )
  212. }
  213. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  214. self.doTestReceiveResponseHeadersFromValidState(
  215. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  216. )
  217. }
  218. func testReceiveResponseHeadersFromActive() {
  219. self.doTestReceiveResponseHeadersFromInvalidState(
  220. .clientActiveServerActive(writeState: .one(), readState: .one()),
  221. expected: .invalidState
  222. )
  223. }
  224. func testReceiveResponseHeadersFromClientClosedServerActive() {
  225. self.doTestReceiveResponseHeadersFromInvalidState(
  226. .clientClosedServerActive(readState: .one()),
  227. expected: .invalidState
  228. )
  229. }
  230. func testReceiveResponseHeadersFromClosed() {
  231. self.doTestReceiveResponseHeadersFromInvalidState(
  232. .clientClosedServerClosed,
  233. expected: .invalidState
  234. )
  235. }
  236. }
  237. // MARK: - Receive Response
  238. extension GRPCClientStateMachineTests {
  239. func doTestReceiveResponseFromInvalidState(
  240. _ state: StateMachine.State,
  241. expected: MessageReadError
  242. ) throws {
  243. var stateMachine = self.makeStateMachine(state)
  244. let message = "Hello!"
  245. var buffer = try self.writeMessage(message)
  246. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  247. XCTAssertEqual($0, expected)
  248. }
  249. }
  250. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  251. var stateMachine = self.makeStateMachine(state)
  252. let message = "Hello!"
  253. var buffer = try self.writeMessage(message)
  254. stateMachine.receiveResponseBuffer(&buffer).assertSuccess { messages in
  255. XCTAssertEqual(messages, [ByteBuffer(string: message)])
  256. }
  257. }
  258. func testReceiveResponseFromIdle() throws {
  259. try self.doTestReceiveResponseFromInvalidState(
  260. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  261. expected: .invalidState
  262. )
  263. }
  264. func testReceiveResponseFromClientActiveServerIdle() throws {
  265. try self.doTestReceiveResponseFromInvalidState(
  266. .clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  267. expected: .invalidState
  268. )
  269. }
  270. func testReceiveResponseFromClientClosedServerIdle() throws {
  271. try self.doTestReceiveResponseFromInvalidState(
  272. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  273. expected: .invalidState
  274. )
  275. }
  276. func testReceiveResponseFromActive() throws {
  277. try self.doTestReceiveResponseFromValidState(
  278. .clientActiveServerActive(writeState: .one(), readState: .one())
  279. )
  280. }
  281. func testReceiveResponseFromClientClosedServerActive() throws {
  282. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  283. }
  284. func testReceiveResponseFromClosed() throws {
  285. try self.doTestReceiveResponseFromInvalidState(
  286. .clientClosedServerClosed,
  287. expected: .invalidState
  288. )
  289. }
  290. }
  291. // MARK: - Receive End of Response Stream
  292. extension GRPCClientStateMachineTests {
  293. func doTestReceiveEndOfResponseStreamFromInvalidState(
  294. _ state: StateMachine.State,
  295. expected: ReceiveEndOfResponseStreamError
  296. ) {
  297. var stateMachine = self.makeStateMachine(state)
  298. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  299. }
  300. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  301. var stateMachine = self.makeStateMachine(state)
  302. var trailers: HPACKHeaders = [
  303. GRPCHeaderName.statusCode: "0",
  304. GRPCHeaderName.statusMessage: "ok"
  305. ]
  306. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  307. // content-type to make a valid set of trailers.
  308. switch state {
  309. case .clientActiveServerIdle,
  310. .clientClosedServerIdle:
  311. trailers.add(name: ":status", value: "200")
  312. trailers.add(name: "content-type", value: "application/grpc+proto")
  313. default:
  314. break
  315. }
  316. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  317. XCTAssertEqual(status.code, .ok)
  318. XCTAssertEqual(status.message, "ok")
  319. }
  320. }
  321. func testReceiveEndOfResponseStreamFromIdle() {
  322. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  323. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  324. expected: .invalidState
  325. )
  326. }
  327. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  328. self.doTestReceiveEndOfResponseStreamFromValidState(
  329. .clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  330. )
  331. }
  332. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  333. self.doTestReceiveEndOfResponseStreamFromValidState(
  334. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  335. )
  336. }
  337. func testReceiveEndOfResponseStreamFromActive() {
  338. self.doTestReceiveEndOfResponseStreamFromValidState(
  339. .clientActiveServerActive(writeState: .one(), readState: .one())
  340. )
  341. }
  342. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  343. self.doTestReceiveEndOfResponseStreamFromValidState(
  344. .clientClosedServerActive(readState: .one())
  345. )
  346. }
  347. func testReceiveEndOfResponseStreamFromClosed() {
  348. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  349. .clientClosedServerClosed,
  350. expected: .invalidState
  351. )
  352. }
  353. }
  354. // MARK: - Basic RPC flow.
  355. extension GRPCClientStateMachineTests {
  356. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  357. var headers = HPACKHeaders()
  358. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  359. if let message = message {
  360. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  361. }
  362. return headers
  363. }
  364. func testSimpleUnaryFlow() throws {
  365. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  366. // Initiate the RPC
  367. stateMachine.sendRequestHeaders(requestHead: .init(
  368. method: "POST",
  369. scheme: "https",
  370. path: "/echo/Get",
  371. host: "foo",
  372. deadline: .distantFuture,
  373. customMetadata: [:],
  374. encoding: .disabled
  375. )).assertSuccess()
  376. // Receive acknowledgement.
  377. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  378. // Send a request.
  379. stateMachine.sendRequest(ByteBuffer(string: "Hello!"), compressed: false, allocator: self.allocator).assertSuccess()
  380. // Close the request stream.
  381. stateMachine.sendEndOfRequestStream().assertSuccess()
  382. // Receive a response.
  383. var buffer = try self.writeMessage("Hello!")
  384. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  385. // Receive the status.
  386. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  387. }
  388. func testSimpleClientActiveFlow() throws {
  389. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  390. // Initiate the RPC
  391. stateMachine.sendRequestHeaders(requestHead: .init(
  392. method: "POST",
  393. scheme: "https",
  394. path: "/echo/Get",
  395. host: "foo",
  396. deadline: .distantFuture,
  397. customMetadata: [:],
  398. encoding: .disabled
  399. )).assertSuccess()
  400. // Receive acknowledgement.
  401. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  402. // Send some requests.
  403. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator).assertSuccess()
  404. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator).assertSuccess()
  405. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false, allocator: self.allocator).assertSuccess()
  406. // Close the request stream.
  407. stateMachine.sendEndOfRequestStream().assertSuccess()
  408. // Receive a response.
  409. var buffer = try self.writeMessage("Hello!")
  410. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  411. // Receive the status.
  412. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  413. }
  414. func testSimpleServerActiveFlow() throws {
  415. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  416. // Initiate the RPC
  417. stateMachine.sendRequestHeaders(requestHead: .init(
  418. method: "POST",
  419. scheme: "https",
  420. path: "/echo/Get",
  421. host: "foo",
  422. deadline: .distantFuture,
  423. customMetadata: [:],
  424. encoding: .disabled
  425. )).assertSuccess()
  426. // Receive acknowledgement.
  427. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  428. // Send a request.
  429. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator).assertSuccess()
  430. // Close the request stream.
  431. stateMachine.sendEndOfRequestStream().assertSuccess()
  432. // Receive a response.
  433. var firstBuffer = try self.writeMessage("1")
  434. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  435. // Receive two responses in one buffer.
  436. var secondBuffer = try self.writeMessage("2")
  437. try self.writeMessage("3", into: &secondBuffer)
  438. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  439. // Receive the status.
  440. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  441. }
  442. func testSimpleBidirectionalActiveFlow() throws {
  443. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  444. // Initiate the RPC
  445. stateMachine.sendRequestHeaders(requestHead: .init(
  446. method: "POST",
  447. scheme: "https",
  448. path: "/echo/Get",
  449. host: "foo",
  450. deadline: .distantFuture,
  451. customMetadata: [:],
  452. encoding: .disabled
  453. )).assertSuccess()
  454. // Receive acknowledgement.
  455. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  456. // Interleave requests and responses:
  457. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator).assertSuccess()
  458. // Receive a response.
  459. var firstBuffer = try self.writeMessage("1")
  460. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  461. // Send two more requests.
  462. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator).assertSuccess()
  463. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false, allocator: self.allocator).assertSuccess()
  464. // Receive two responses in one buffer.
  465. var secondBuffer = try self.writeMessage("2")
  466. try self.writeMessage("3", into: &secondBuffer)
  467. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  468. // Close the request stream.
  469. stateMachine.sendEndOfRequestStream().assertSuccess()
  470. // Receive the status.
  471. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  472. }
  473. }
  474. // MARK: - Too many requests / responses.
  475. extension GRPCClientStateMachineTests {
  476. func testSendTooManyRequestsFromClientActiveServerIdle() {
  477. for messageCount in [MessageArity.one, MessageArity.many] {
  478. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: messageCount, messageEncoding: .disabled)))
  479. // One is fine.
  480. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator).assertSuccess()
  481. // Two is not.
  482. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator).assertFailure {
  483. XCTAssertEqual($0, .cardinalityViolation)
  484. }
  485. }
  486. }
  487. func testSendTooManyRequestsFromActive() {
  488. for readState in [ReadState.one(), ReadState.many()] {
  489. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  490. // One is fine.
  491. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator).assertSuccess()
  492. // Two is not.
  493. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator).assertFailure {
  494. XCTAssertEqual($0, .cardinalityViolation)
  495. }
  496. }
  497. }
  498. func testSendTooManyRequestsFromClosed() {
  499. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  500. // No requests allowed!
  501. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator).assertFailure {
  502. XCTAssertEqual($0, .cardinalityViolation)
  503. }
  504. }
  505. func testReceiveTooManyRequests() throws {
  506. for writeState in [WriteState.one(), WriteState.many()] {
  507. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  508. // One response is fine.
  509. var firstBuffer = try self.writeMessage("foo")
  510. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  511. var secondBuffer = try self.writeMessage("bar")
  512. stateMachine.receiveResponseBuffer(&secondBuffer).assertFailure {
  513. XCTAssertEqual($0, .cardinalityViolation)
  514. }
  515. }
  516. }
  517. func testReceiveTooManyRequestsInOneBuffer() throws {
  518. for writeState in [WriteState.one(), WriteState.many()] {
  519. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  520. // Write two responses into a single buffer.
  521. var buffer = try self.writeMessage("foo")
  522. var other = try self.writeMessage("bar")
  523. buffer.writeBuffer(&other)
  524. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  525. XCTAssertEqual($0, .cardinalityViolation)
  526. }
  527. }
  528. }
  529. }
  530. // MARK: - Send Request Headers
  531. extension GRPCClientStateMachineTests {
  532. func testSendRequestHeaders() throws {
  533. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  534. stateMachine.sendRequestHeaders(requestHead: .init(
  535. method: "POST",
  536. scheme: "http",
  537. path: "/echo/Get",
  538. host: "localhost",
  539. deadline: .now() + .hours(1),
  540. customMetadata: ["x-grpc-id": "request-id"],
  541. encoding: .enabled(.init(forRequests: .identity, acceptableForResponses: [.identity], decompressionLimit: .ratio(10)))
  542. )).assertSuccess { headers in
  543. XCTAssertEqual(headers[":method"], ["POST"])
  544. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  545. XCTAssertEqual(headers[":authority"], ["localhost"])
  546. XCTAssertEqual(headers[":scheme"], ["http"])
  547. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  548. XCTAssertEqual(headers["te"], ["trailers"])
  549. // We convert the deadline into a timeout, we can't be exactly sure what that timeout is.
  550. XCTAssertTrue(headers.contains(name: "grpc-timeout"))
  551. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  552. XCTAssertEqual(headers["grpc-encoding"], ["identity"])
  553. XCTAssertTrue(headers["grpc-accept-encoding"].contains("identity"))
  554. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  555. }
  556. }
  557. func testSendRequestHeadersNormalizesCustomMetadata() throws {
  558. // `HPACKHeaders` uses case-insensitive lookup for header names so we can't check the equality
  559. // for individual headers. We'll pull out the entries we care about by matching a sentinel value
  560. // and then compare `HPACKHeaders` instances (since the equality check *is* case sensitive).
  561. let filterKey = "a-key-for-filtering"
  562. let customMetadata: HPACKHeaders = [
  563. "partiallyLower": filterKey,
  564. "ALLUPPER": filterKey
  565. ]
  566. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  567. stateMachine.sendRequestHeaders(requestHead: .init(
  568. method: "POST",
  569. scheme: "http",
  570. path: "/echo/Get",
  571. host: "localhost",
  572. deadline: .distantFuture,
  573. customMetadata: customMetadata,
  574. encoding: .disabled
  575. )).assertSuccess { headers in
  576. // Pull out the entries we care about by matching values
  577. let filtered = headers.filter { (name, value, indexing) in
  578. return value == filterKey
  579. }.map { name, value, indexing in
  580. return (name, value)
  581. }
  582. let justCustomMetadata = HPACKHeaders(filtered)
  583. let expected: HPACKHeaders = [
  584. "partiallylower": filterKey,
  585. "allupper": filterKey
  586. ]
  587. XCTAssertEqual(justCustomMetadata, expected)
  588. }
  589. }
  590. func testSendRequestHeadersWithCustomUserAgent() throws {
  591. let customMetadata: HPACKHeaders = [
  592. "user-agent": "test-user-agent"
  593. ]
  594. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  595. stateMachine.sendRequestHeaders(requestHead: .init(
  596. method: "POST",
  597. scheme: "http",
  598. path: "/echo/Get",
  599. host: "localhost",
  600. deadline: .distantFuture,
  601. customMetadata: customMetadata,
  602. encoding: .enabled(.init(forRequests: nil, acceptableForResponses: [], decompressionLimit: .ratio(10)))
  603. )).assertSuccess { headers in
  604. XCTAssertEqual(headers["user-agent"], ["test-user-agent"])
  605. }
  606. }
  607. func testSendRequestHeadersWithNoCompressionInEitherDirection() throws {
  608. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  609. stateMachine.sendRequestHeaders(requestHead: .init(
  610. method: "POST",
  611. scheme: "http",
  612. path: "/echo/Get",
  613. host: "localhost",
  614. deadline: .distantFuture,
  615. customMetadata: ["x-grpc-id": "request-id"],
  616. encoding: .enabled(.init(forRequests: nil, acceptableForResponses: [], decompressionLimit: .ratio(10)))
  617. )).assertSuccess { headers in
  618. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  619. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  620. }
  621. }
  622. func testSendRequestHeadersWithNoCompressionForRequests() throws {
  623. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  624. stateMachine.sendRequestHeaders(requestHead: .init(
  625. method: "POST",
  626. scheme: "http",
  627. path: "/echo/Get",
  628. host: "localhost",
  629. deadline: .distantFuture,
  630. customMetadata: ["x-grpc-id": "request-id"],
  631. encoding: .enabled(.init(forRequests: nil, acceptableForResponses: [.identity, .gzip], decompressionLimit: .ratio(10)))
  632. )).assertSuccess { headers in
  633. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  634. XCTAssertTrue(headers.contains(name: "grpc-accept-encoding"))
  635. }
  636. }
  637. func testSendRequestHeadersWithNoCompressionForResponses() throws {
  638. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  639. stateMachine.sendRequestHeaders(requestHead: .init(
  640. method: "POST",
  641. scheme: "http",
  642. path: "/echo/Get",
  643. host: "localhost",
  644. deadline: .distantFuture,
  645. customMetadata: ["x-grpc-id": "request-id"],
  646. encoding: .enabled(.init(forRequests: .gzip, acceptableForResponses: [], decompressionLimit: .ratio(10)))
  647. )).assertSuccess { headers in
  648. XCTAssertEqual(headers["grpc-encoding"], ["gzip"])
  649. // This asymmetry is strange but allowed: if a client does not advertise support of the
  650. // compression it is using, the server may still process the message so long as it too
  651. // supports the compression.
  652. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  653. }
  654. }
  655. func testReceiveResponseHeadersWithOkStatus() throws {
  656. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  657. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  658. }
  659. func testReceiveResponseHeadersWithNotOkStatus() throws {
  660. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  661. let code = "\(HTTPResponseStatus.paymentRequired.code)"
  662. let headers = self.makeResponseHeaders(status: code)
  663. stateMachine.receiveResponseHeaders(headers).assertFailure {
  664. XCTAssertEqual($0, .invalidHTTPStatus(code))
  665. }
  666. }
  667. func testReceiveResponseHeadersWithoutContentType() throws {
  668. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  669. let headers = self.makeResponseHeaders(contentType: nil)
  670. stateMachine.receiveResponseHeaders(headers).assertFailure {
  671. XCTAssertEqual($0, .invalidContentType(nil))
  672. }
  673. }
  674. func testReceiveResponseHeadersWithInvalidContentType() throws {
  675. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  676. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  677. stateMachine.receiveResponseHeaders(headers).assertFailure {
  678. XCTAssertEqual($0, .invalidContentType("video/mpeg"))
  679. }
  680. }
  681. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  682. let configuration = ClientMessageEncoding.Configuration(forRequests: .none, acceptableForResponses: [.identity], decompressionLimit: .ratio(1))
  683. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .enabled(configuration))))
  684. var headers = self.makeResponseHeaders()
  685. // Identity should always be supported.
  686. headers.add(name: "grpc-encoding", value: "identity")
  687. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  688. switch stateMachine.state {
  689. case let .clientActiveServerActive(_, .reading(_, reader)):
  690. XCTAssertEqual(reader.compression?.algorithm, .identity)
  691. default:
  692. XCTFail("unexpected state \(stateMachine.state)")
  693. }
  694. }
  695. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  696. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  697. var headers = self.makeResponseHeaders()
  698. headers.add(name: "grpc-encoding", value: "snappy")
  699. stateMachine.receiveResponseHeaders(headers).assertFailure {
  700. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  701. }
  702. }
  703. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  704. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  705. var headers = self.makeResponseHeaders()
  706. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  707. stateMachine.receiveResponseHeaders(headers).assertFailure {
  708. XCTAssertEqual($0, .unsupportedMessageEncoding("not-a-known-compression-(probably)"))
  709. }
  710. }
  711. func testReceiveEndOfResponseStreamWithStatus() throws {
  712. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  713. let trailers: HPACKHeaders = ["grpc-status": "0"]
  714. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  715. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  716. XCTAssertEqual(status.message, nil)
  717. }
  718. }
  719. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  720. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  721. let trailers: HPACKHeaders = ["grpc-status": "999"]
  722. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  723. XCTAssertEqual(status.code, .unknown)
  724. }
  725. }
  726. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  727. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  728. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  729. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  730. XCTAssertEqual(status.code, .unknown)
  731. }
  732. }
  733. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  734. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  735. let trailers: HPACKHeaders = [
  736. "grpc-status": "5",
  737. "grpc-message": "foo bar 🚀"
  738. ]
  739. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  740. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  741. XCTAssertEqual(status.message, "foo bar 🚀")
  742. }
  743. }
  744. func testReceiveTrailersOnlyEndOfResponseStreamWithoutContentType() throws {
  745. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  746. let trailers: HPACKHeaders = [
  747. ":status": "200",
  748. "grpc-status": "5",
  749. "grpc-message": "foo bar 🚀"
  750. ]
  751. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  752. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  753. XCTAssertEqual(status.message, "foo bar 🚀")
  754. }
  755. }
  756. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidContentType() throws {
  757. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  758. let trailers: HPACKHeaders = [
  759. ":status": "200",
  760. "grpc-status": "5",
  761. "grpc-message": "foo bar 🚀",
  762. "content-type": "invalid"
  763. ]
  764. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  765. XCTAssertEqual(error, .invalidContentType("invalid"))
  766. }
  767. }
  768. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndValidGRPCStatus() throws {
  769. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  770. let trailers: HPACKHeaders = [
  771. ":status": "418",
  772. "grpc-status": "5",
  773. ]
  774. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  775. XCTAssertEqual(error, .invalidHTTPStatusWithGRPCStatus(GRPCStatus(code: GRPCStatus.Code(rawValue: 5)!, message: nil)))
  776. }
  777. }
  778. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndNoGRPCStatus() throws {
  779. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), pendingReadState: .init(arity: .one, messageEncoding: .disabled)))
  780. let trailers: HPACKHeaders = [":status": "418"]
  781. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  782. XCTAssertEqual(error, .invalidHTTPStatus("418"))
  783. }
  784. }
  785. }
  786. class ReadStateTests: GRPCTestCase {
  787. var allocator = ByteBufferAllocator()
  788. func testReadWhenNoExpectedMessages() {
  789. var state: ReadState = .notReading
  790. var buffer = self.allocator.buffer(capacity: 0)
  791. state.readMessages(&buffer).assertFailure {
  792. XCTAssertEqual($0, .cardinalityViolation)
  793. }
  794. state.assertNotReading()
  795. }
  796. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  797. // Write a message into the buffer:
  798. let message = ByteBuffer(string: "Hello!")
  799. let writer = LengthPrefixedMessageWriter(compression: .none)
  800. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  801. // And some extra junk bytes:
  802. let bytes: [UInt8] = [0x00]
  803. buffer.writeBytes(bytes)
  804. var state: ReadState = .one()
  805. state.readMessages(&buffer).assertFailure {
  806. XCTAssertEqual($0, .leftOverBytes)
  807. }
  808. state.assertNotReading()
  809. }
  810. func testReadTooManyMessagesForOneExpectedMessages() throws {
  811. // Write a message into the buffer twice:
  812. let message = ByteBuffer(string: "Hello!")
  813. let writer = LengthPrefixedMessageWriter(compression: .none)
  814. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  815. var second = try writer.write(buffer: message, allocator: self.allocator)
  816. buffer.writeBuffer(&second)
  817. var state: ReadState = .one()
  818. state.readMessages(&buffer).assertFailure {
  819. XCTAssertEqual($0, .cardinalityViolation)
  820. }
  821. state.assertNotReading()
  822. }
  823. func testReadOneMessageForOneExpectedMessages() throws {
  824. // Write a message into the buffer twice:
  825. let message = ByteBuffer(string: "Hello!")
  826. let writer = LengthPrefixedMessageWriter(compression: .none)
  827. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  828. var state: ReadState = .one()
  829. state.readMessages(&buffer).assertSuccess {
  830. XCTAssertEqual($0, [message])
  831. }
  832. // We shouldn't be able to read anymore.
  833. state.assertNotReading()
  834. }
  835. func testReadOneMessageForManyExpectedMessages() throws {
  836. // Write a message into the buffer twice:
  837. let message = ByteBuffer(string: "Hello!")
  838. let writer = LengthPrefixedMessageWriter(compression: .none)
  839. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  840. var state: ReadState = .many()
  841. state.readMessages(&buffer).assertSuccess {
  842. XCTAssertEqual($0, [message])
  843. }
  844. // We should still be able to read.
  845. state.assertReading()
  846. }
  847. func testReadManyMessagesForManyExpectedMessages() throws {
  848. // Write a message into the buffer twice:
  849. let message = ByteBuffer(string: "Hello!")
  850. let writer = LengthPrefixedMessageWriter(compression: .none)
  851. var first = try writer.write(buffer: message, allocator: self.allocator)
  852. var second = try writer.write(buffer: message, allocator: self.allocator)
  853. var third = try writer.write(buffer: message, allocator: self.allocator)
  854. first.writeBuffer(&second)
  855. first.writeBuffer(&third)
  856. var state: ReadState = .many()
  857. state.readMessages(&first).assertSuccess {
  858. XCTAssertEqual($0, [message, message, message])
  859. }
  860. // We should still be able to read.
  861. state.assertReading()
  862. }
  863. }
  864. // MARK: Result helpers
  865. extension Result {
  866. /// Asserts the `Result` was a success.
  867. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  868. switch self {
  869. case .success(let success):
  870. do {
  871. try verify(success)
  872. } catch {
  873. XCTFail("verify threw: \(error)")
  874. }
  875. case .failure(let error):
  876. XCTFail("unexpected .failure: \(error)")
  877. }
  878. }
  879. /// Asserts the `Result` was a failure.
  880. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  881. switch self {
  882. case .success(let success):
  883. XCTFail("unexpected .success: \(success)")
  884. case .failure(let error):
  885. do {
  886. try verify(error)
  887. } catch {
  888. XCTFail("verify threw: \(error)")
  889. }
  890. }
  891. }
  892. }
  893. // MARK: ReadState, PendingWriteState, and WriteState helpers
  894. extension ReadState {
  895. static func one() -> ReadState {
  896. let reader = LengthPrefixedMessageReader()
  897. return .reading(.one, reader)
  898. }
  899. static func many() -> ReadState {
  900. let reader = LengthPrefixedMessageReader()
  901. return .reading(.many, reader)
  902. }
  903. func assertReading() {
  904. switch self {
  905. case .reading:
  906. ()
  907. case .notReading:
  908. XCTFail("unexpected state .notReading")
  909. }
  910. }
  911. func assertNotReading() {
  912. switch self {
  913. case .reading:
  914. XCTFail("unexpected state .reading")
  915. case .notReading:
  916. ()
  917. }
  918. }
  919. }
  920. extension PendingWriteState {
  921. static func one() -> PendingWriteState {
  922. return .init(arity: .one, contentType: .protobuf)
  923. }
  924. static func many() -> PendingWriteState {
  925. return .init(arity: .many, contentType: .protobuf)
  926. }
  927. }
  928. extension WriteState {
  929. static func one() -> WriteState {
  930. return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  931. }
  932. static func many() -> WriteState {
  933. return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  934. }
  935. }