GRPCClientStateMachineTests.swift 33 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. @testable import GRPC
  18. import EchoModel
  19. import Logging
  20. import NIOHTTP1
  21. import NIO
  22. import SwiftProtobuf
  23. import XCTest
  24. class GRPCClientStateMachineTests: GRPCTestCase {
  25. typealias Request = Echo_EchoRequest
  26. typealias Response = Echo_EchoResponse
  27. typealias StateMachine = GRPCClientStateMachine<Request, Response>
  28. var allocator = ByteBufferAllocator()
  29. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  30. return StateMachine(
  31. state: state,
  32. logger: Logger(label: "io.grpc.testing")
  33. )
  34. }
  35. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  36. func writeMessage<T: Message>(_ message: T) throws -> ByteBuffer {
  37. var buffer = self.allocator.buffer(capacity: 0)
  38. try self.writeMessage(message, into: &buffer)
  39. return buffer
  40. }
  41. /// Writes the given messages into a new `ByteBuffer` (with length-prefixing).
  42. func writeMessages<T: Message>(_ messages: T...) throws -> ByteBuffer {
  43. var buffer = self.allocator.buffer(capacity: 0)
  44. try messages.forEach {
  45. try self.writeMessage($0, into: &buffer)
  46. }
  47. return buffer
  48. }
  49. /// Writes a message into the given `buffer`.
  50. func writeMessage<T: Message>(_ message: T, into buffer: inout ByteBuffer) throws {
  51. let messageData = try message.serializedData()
  52. let writer = LengthPrefixedMessageWriter(compression: .none)
  53. writer.write(messageData, into: &buffer)
  54. }
  55. /// Returns a minimally valid `HTTPResponseHead`.
  56. func makeResponseHead() -> HTTPResponseHead {
  57. var headers = HTTPHeaders()
  58. headers.add(name: "content-type", value: "application/grpc")
  59. return .init(version: .init(major: 2, minor: 0), status: .ok, headers: headers)
  60. }
  61. }
  62. // MARK: - Send Request Headers
  63. extension GRPCClientStateMachineTests {
  64. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  65. var stateMachine = self.makeStateMachine(state)
  66. stateMachine.sendRequestHead(
  67. host: "host",
  68. path: "/echo/Get",
  69. options: .init(),
  70. requestID: "bar"
  71. ).assertFailure {
  72. XCTAssertEqual($0, .invalidState)
  73. }
  74. }
  75. func testSendRequestHeadersFromIdle() {
  76. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  77. stateMachine.sendRequestHead(
  78. host: "host",
  79. path: "/echo/Get",
  80. options: .init(),
  81. requestID: "bar"
  82. ).assertSuccess()
  83. }
  84. func testSendRequestHeadersFromClientActiveServerIdle() {
  85. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  86. }
  87. func testSendRequestHeadersFromClientClosedServerIdle() {
  88. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerIdle(readArity: .one))
  89. }
  90. func testSendRequestHeadersFromActive() {
  91. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(writeState: .one(), readState: .one()))
  92. }
  93. func testSendRequestHeadersFromClientClosedServerActive() {
  94. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  95. }
  96. func testSendRequestHeadersFromClosed() {
  97. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  98. }
  99. }
  100. // MARK: - Send Request
  101. extension GRPCClientStateMachineTests {
  102. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  103. var stateMachine = self.makeStateMachine(state)
  104. stateMachine.sendRequest(.init(text: "Hello!"), allocator: self.allocator).assertFailure {
  105. XCTAssertEqual($0, expected)
  106. }
  107. }
  108. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  109. var stateMachine = self.makeStateMachine(state)
  110. let request: Request = .with { $0.text = "Hello!" }
  111. stateMachine.sendRequest(request, allocator: self.allocator).assertSuccess() { buffer in
  112. var buffer = buffer
  113. // Remove the length and compression flag prefix.
  114. buffer.moveReaderIndex(forwardBy: 5)
  115. let data = buffer.readData(length: buffer.readableBytes)!
  116. XCTAssertEqual(request, try Request(serializedData: data))
  117. }
  118. }
  119. func testSendRequestFromIdle() {
  120. self.doTestSendRequestFromInvalidState(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one), expected: .invalidState)
  121. }
  122. func testSendRequestFromClientActiveServerIdle() {
  123. self.doTestSendRequestFromValidState(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  124. }
  125. func testSendRequestFromClientClosedServerIdle() {
  126. self.doTestSendRequestFromInvalidState(.clientClosedServerIdle(readArity: .one), expected: .cardinalityViolation)
  127. }
  128. func testSendRequestFromActive() {
  129. self.doTestSendRequestFromValidState(.clientActiveServerActive(writeState: .one(), readState: .one()))
  130. }
  131. func testSendRequestFromClientClosedServerActive() {
  132. self.doTestSendRequestFromInvalidState(.clientClosedServerIdle(readArity: .one), expected: .cardinalityViolation)
  133. }
  134. func testSendRequestFromClosed() {
  135. self.doTestSendRequestFromInvalidState(.clientClosedServerClosed, expected: .cardinalityViolation)
  136. }
  137. }
  138. // MARK: - Send End of Request Stream
  139. extension GRPCClientStateMachineTests {
  140. func doTestSendEndOfRequestStreamFromInvalidState(
  141. _ state: StateMachine.State,
  142. expected: SendEndOfRequestStreamError
  143. ) {
  144. var stateMachine = self.makeStateMachine(state)
  145. stateMachine.sendEndOfRequestStream().assertFailure {
  146. XCTAssertEqual($0, expected)
  147. }
  148. }
  149. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  150. var stateMachine = self.makeStateMachine(state)
  151. stateMachine.sendEndOfRequestStream().assertSuccess()
  152. }
  153. func testSendEndOfRequestStreamFromIdle() {
  154. self.doTestSendEndOfRequestStreamFromInvalidState(
  155. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  156. expected: .invalidState
  157. )
  158. }
  159. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  160. self.doTestSendEndOfRequestStreamFromValidState(
  161. .clientActiveServerIdle(writeState: .one(), readArity: .one)
  162. )
  163. }
  164. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  165. self.doTestSendEndOfRequestStreamFromInvalidState(
  166. .clientClosedServerIdle(readArity: .one),
  167. expected: .alreadyClosed
  168. )
  169. }
  170. func testSendEndOfRequestStreamFromActive() {
  171. self.doTestSendEndOfRequestStreamFromValidState(
  172. .clientActiveServerActive(writeState: .one(), readState: .one())
  173. )
  174. }
  175. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  176. self.doTestSendEndOfRequestStreamFromInvalidState(
  177. .clientClosedServerActive(readState: .one()),
  178. expected: .alreadyClosed
  179. )
  180. }
  181. func testSendEndOfRequestStreamFromClosed() {
  182. self.doTestSendEndOfRequestStreamFromInvalidState(
  183. .clientClosedServerClosed,
  184. expected: .alreadyClosed
  185. )
  186. }
  187. }
  188. // MARK: - Receive Response Headers
  189. extension GRPCClientStateMachineTests {
  190. func doTestReceiveResponseHeadersFromInvalidState(
  191. _ state: StateMachine.State,
  192. expected: ReceiveResponseHeadError
  193. ) {
  194. var stateMachine = self.makeStateMachine(state)
  195. stateMachine.receiveResponseHead(self.makeResponseHead()).assertFailure {
  196. XCTAssertEqual($0, expected)
  197. }
  198. }
  199. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  200. var stateMachine = self.makeStateMachine(state)
  201. stateMachine.receiveResponseHead(self.makeResponseHead()).assertSuccess()
  202. }
  203. func testReceiveResponseHeadersFromIdle() {
  204. self.doTestReceiveResponseHeadersFromInvalidState(
  205. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  206. expected: .invalidState
  207. )
  208. }
  209. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  210. self.doTestReceiveResponseHeadersFromValidState(
  211. .clientActiveServerIdle(writeState: .one(), readArity: .one)
  212. )
  213. }
  214. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  215. self.doTestReceiveResponseHeadersFromValidState(
  216. .clientClosedServerIdle(readArity: .one)
  217. )
  218. }
  219. func testReceiveResponseHeadersFromActive() {
  220. self.doTestReceiveResponseHeadersFromInvalidState(
  221. .clientActiveServerActive(writeState: .one(), readState: .one()),
  222. expected: .invalidState
  223. )
  224. }
  225. func testReceiveResponseHeadersFromClientClosedServerActive() {
  226. self.doTestReceiveResponseHeadersFromInvalidState(
  227. .clientClosedServerActive(readState: .one()),
  228. expected: .invalidState
  229. )
  230. }
  231. func testReceiveResponseHeadersFromClosed() {
  232. self.doTestReceiveResponseHeadersFromInvalidState(
  233. .clientClosedServerClosed,
  234. expected: .invalidState
  235. )
  236. }
  237. }
  238. // MARK: - Receive Response
  239. extension GRPCClientStateMachineTests {
  240. func doTestReceiveResponseFromInvalidState(
  241. _ state: StateMachine.State,
  242. expected: MessageReadError
  243. ) throws {
  244. var stateMachine = self.makeStateMachine(state)
  245. let message = Response.with { $0.text = "Hello!" }
  246. var buffer = try self.writeMessage(message)
  247. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  248. XCTAssertEqual($0, expected)
  249. }
  250. }
  251. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  252. var stateMachine = self.makeStateMachine(state)
  253. let message = Response.with { $0.text = "Hello!" }
  254. var buffer = try self.writeMessage(message)
  255. stateMachine.receiveResponseBuffer(&buffer).assertSuccess { messages in
  256. XCTAssertEqual(messages, [message])
  257. }
  258. }
  259. func testReceiveResponseFromIdle() throws {
  260. try self.doTestReceiveResponseFromInvalidState(
  261. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  262. expected: .invalidState
  263. )
  264. }
  265. func testReceiveResponseFromClientActiveServerIdle() throws {
  266. try self.doTestReceiveResponseFromInvalidState(
  267. .clientActiveServerIdle(writeState: .one(), readArity: .one),
  268. expected: .invalidState
  269. )
  270. }
  271. func testReceiveResponseFromClientClosedServerIdle() throws {
  272. try self.doTestReceiveResponseFromInvalidState(
  273. .clientClosedServerIdle(readArity: .one),
  274. expected: .invalidState
  275. )
  276. }
  277. func testReceiveResponseFromActive() throws {
  278. try self.doTestReceiveResponseFromValidState(
  279. .clientActiveServerActive(writeState: .one(), readState: .one())
  280. )
  281. }
  282. func testReceiveResponseFromClientClosedServerActive() throws {
  283. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  284. }
  285. func testReceiveResponseFromClosed() throws {
  286. try self.doTestReceiveResponseFromInvalidState(
  287. .clientClosedServerClosed,
  288. expected: .invalidState
  289. )
  290. }
  291. }
  292. // MARK: - Receive End of Response Stream
  293. extension GRPCClientStateMachineTests {
  294. func doTestReceiveEndOfResponseStreamFromInvalidState(
  295. _ state: StateMachine.State,
  296. expected: ReceiveEndOfResponseStreamError
  297. ) {
  298. var stateMachine = self.makeStateMachine(state)
  299. stateMachine.receiveEndOfResponseStream(HTTPHeaders()).assertFailure {
  300. XCTAssertEqual($0, expected)
  301. }
  302. }
  303. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  304. var stateMachine = self.makeStateMachine(state)
  305. var trailers = HTTPHeaders()
  306. trailers.add(name: GRPCHeaderName.statusCode, value: "\(GRPCStatus.Code.ok.rawValue)")
  307. trailers.add(name: GRPCHeaderName.statusMessage, value: "ok")
  308. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  309. XCTAssertEqual(status.code, .ok)
  310. XCTAssertEqual(status.message, "ok")
  311. }
  312. }
  313. func testReceiveEndOfResponseStreamFromIdle() {
  314. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  315. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  316. expected: .invalidState
  317. )
  318. }
  319. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  320. self.doTestReceiveEndOfResponseStreamFromValidState(
  321. .clientActiveServerIdle(writeState: .one(), readArity: .one)
  322. )
  323. }
  324. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  325. self.doTestReceiveEndOfResponseStreamFromValidState(
  326. .clientClosedServerIdle(readArity: .one)
  327. )
  328. }
  329. func testReceiveEndOfResponseStreamFromActive() {
  330. self.doTestReceiveEndOfResponseStreamFromValidState(
  331. .clientActiveServerActive(writeState: .one(), readState: .one())
  332. )
  333. }
  334. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  335. self.doTestReceiveEndOfResponseStreamFromValidState(
  336. .clientClosedServerActive(readState: .one())
  337. )
  338. }
  339. func testReceiveEndOfResponseStreamFromClosed() {
  340. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  341. .clientClosedServerClosed,
  342. expected: .invalidState
  343. )
  344. }
  345. }
  346. // MARK: - Basic RPC flow.
  347. extension GRPCClientStateMachineTests {
  348. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HTTPHeaders {
  349. var headers = HTTPHeaders()
  350. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  351. if let message = message {
  352. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  353. }
  354. return headers
  355. }
  356. func testSimpleUnaryFlow() throws {
  357. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  358. // Initiate the RPC
  359. stateMachine.sendRequestHead(host: "foo", path: "/echo/Get", options: .init(), requestID: "bar").assertSuccess()
  360. // Receive acknowledgement.
  361. stateMachine.receiveResponseHead(self.makeResponseHead()).assertSuccess()
  362. // Send a request.
  363. stateMachine.sendRequest(.with { $0.text = "Hello!" }, allocator: self.allocator).assertSuccess()
  364. // Close the request stream.
  365. stateMachine.sendEndOfRequestStream().assertSuccess()
  366. // Receive a response.
  367. var buffer = try self.writeMessage(Response.with { $0.text = "Hello!" })
  368. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  369. // Receive the status.
  370. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  371. }
  372. func testSimpleClientActiveFlow() throws {
  373. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  374. // Initiate the RPC
  375. stateMachine.sendRequestHead(host: "foo", path: "/echo/Get", options: .init(), requestID: "bar").assertSuccess()
  376. // Receive acknowledgement.
  377. stateMachine.receiveResponseHead(self.makeResponseHead()).assertSuccess()
  378. // Send some requests.
  379. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  380. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertSuccess()
  381. stateMachine.sendRequest(.with { $0.text = "3" }, allocator: self.allocator).assertSuccess()
  382. // Close the request stream.
  383. stateMachine.sendEndOfRequestStream().assertSuccess()
  384. // Receive a response.
  385. var buffer = try self.writeMessage(Response.with { $0.text = "Hello!" })
  386. stateMachine.receiveResponseBuffer(&buffer).assertSuccess()
  387. // Receive the status.
  388. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  389. }
  390. func testSimpleServerActiveFlow() throws {
  391. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  392. // Initiate the RPC
  393. stateMachine.sendRequestHead(host: "foo", path: "/echo/Get", options: .init(), requestID: "bar").assertSuccess()
  394. // Receive acknowledgement.
  395. stateMachine.receiveResponseHead(self.makeResponseHead()).assertSuccess()
  396. // Send a request.
  397. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  398. // Close the request stream.
  399. stateMachine.sendEndOfRequestStream().assertSuccess()
  400. // Receive a response.
  401. var firstBuffer = try self.writeMessage(Response.with { $0.text = "1" })
  402. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  403. // Receive two responses in one buffer.
  404. var secondBuffer = try self.writeMessage(Response.with { $0.text = "2" })
  405. try self.writeMessage(Response.with { $0.text = "3" }, into: &secondBuffer)
  406. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  407. // Receive the status.
  408. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  409. }
  410. func testSimpleBidirectionalActiveFlow() throws {
  411. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  412. // Initiate the RPC
  413. stateMachine.sendRequestHead(host: "foo", path: "/echo/Get", options: .init(), requestID: "bar").assertSuccess()
  414. // Receive acknowledgement.
  415. stateMachine.receiveResponseHead(self.makeResponseHead()).assertSuccess()
  416. // Interleave requests and responses:
  417. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  418. // Receive a response.
  419. var firstBuffer = try self.writeMessage(Response.with { $0.text = "1" })
  420. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  421. // Send two more requests.
  422. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertSuccess()
  423. stateMachine.sendRequest(.with { $0.text = "3" }, allocator: self.allocator).assertSuccess()
  424. // Receive two responses in one buffer.
  425. var secondBuffer = try self.writeMessage(Response.with { $0.text = "2" })
  426. try self.writeMessage(Response.with { $0.text = "3" }, into: &secondBuffer)
  427. stateMachine.receiveResponseBuffer(&secondBuffer).assertSuccess()
  428. // Close the request stream.
  429. stateMachine.sendEndOfRequestStream().assertSuccess()
  430. // Receive the status.
  431. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  432. }
  433. }
  434. // MARK: - Too many requests / responses.
  435. extension GRPCClientStateMachineTests {
  436. func testSendTooManyRequestsFromClientActiveServerIdle() {
  437. for messageCount in [MessageArity.one, MessageArity.many] {
  438. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: messageCount))
  439. // One is fine.
  440. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  441. // Two is not.
  442. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertFailure {
  443. XCTAssertEqual($0, .cardinalityViolation)
  444. }
  445. }
  446. }
  447. func testSendTooManyRequestsFromActive() {
  448. for readState in [ReadState.one(), ReadState.many()] {
  449. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  450. // One is fine.
  451. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertSuccess()
  452. // Two is not.
  453. stateMachine.sendRequest(.with { $0.text = "2" }, allocator: self.allocator).assertFailure {
  454. XCTAssertEqual($0, .cardinalityViolation)
  455. }
  456. }
  457. }
  458. func testSendTooManyRequestsFromClosed() {
  459. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  460. // No requests allowed!
  461. stateMachine.sendRequest(.with { $0.text = "1" }, allocator: self.allocator).assertFailure {
  462. XCTAssertEqual($0, .cardinalityViolation)
  463. }
  464. }
  465. func testReceiveTooManyRequests() throws {
  466. for writeState in [WriteState.one(), WriteState.many()] {
  467. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  468. let response = Response.with { $0.text = "foo" }
  469. // One response is fine.
  470. var firstBuffer = try self.writeMessage(response)
  471. stateMachine.receiveResponseBuffer(&firstBuffer).assertSuccess()
  472. var secondBuffer = try self.writeMessage(response)
  473. stateMachine.receiveResponseBuffer(&secondBuffer).assertFailure {
  474. XCTAssertEqual($0, .cardinalityViolation)
  475. }
  476. }
  477. }
  478. func testReceiveTooManyRequestsInOneBuffer() throws {
  479. for writeState in [WriteState.one(), WriteState.many()] {
  480. var stateMachine = self.makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  481. // Write two responses into a single buffer.
  482. let response = Response.with { $0.text = "foo" }
  483. var buffer = try self.writeMessages(response, response)
  484. stateMachine.receiveResponseBuffer(&buffer).assertFailure {
  485. XCTAssertEqual($0, .cardinalityViolation)
  486. }
  487. }
  488. }
  489. }
  490. // MARK: - Send Request Headers
  491. extension GRPCClientStateMachineTests {
  492. func testSendRequestHeaders() throws {
  493. var stateMachine = self.makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  494. stateMachine.sendRequestHead(
  495. host: "localhost",
  496. path: "/echo/Get",
  497. options: CallOptions(timeout: .hours(rounding: 1), requestIDHeader: "x-grpc-id"),
  498. requestID: "request-id"
  499. ).assertSuccess { requestHead in
  500. XCTAssertEqual(requestHead.method, .POST)
  501. XCTAssertEqual(requestHead.uri, "/echo/Get")
  502. XCTAssertEqual(requestHead.headers["content-type"], ["application/grpc"])
  503. XCTAssertEqual(requestHead.headers["host"], ["localhost"])
  504. XCTAssertEqual(requestHead.headers["te"], ["trailers"])
  505. XCTAssertEqual(requestHead.headers["grpc-timeout"], ["1H"])
  506. XCTAssertEqual(requestHead.headers["x-grpc-id"], ["request-id"])
  507. XCTAssertTrue(requestHead.headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  508. }
  509. }
  510. func testReceiveResponseHeadersWithOkStatus() throws {
  511. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  512. var headers = HTTPHeaders()
  513. headers.add(name: "content-type", value: "application/grpc")
  514. let responseHead = HTTPResponseHead(
  515. version: .init(major: 2, minor: 0),
  516. status: .ok,
  517. headers: headers
  518. )
  519. stateMachine.receiveResponseHead(responseHead).assertSuccess()
  520. }
  521. func testReceiveResponseHeadersWithNotOkStatus() throws {
  522. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  523. let responseHead = HTTPResponseHead(
  524. version: .init(major: 2, minor: 0),
  525. status: .imATeapot,
  526. headers: HTTPHeaders()
  527. )
  528. stateMachine.receiveResponseHead(responseHead).assertFailure {
  529. XCTAssertEqual($0, .invalidHTTPStatus(.imATeapot))
  530. }
  531. }
  532. func testReceiveResponseHeadersWithoutContentType() throws {
  533. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  534. let responseHead = HTTPResponseHead(
  535. version: .init(major: 2, minor: 0),
  536. status: .ok
  537. )
  538. stateMachine.receiveResponseHead(responseHead).assertFailure {
  539. XCTAssertEqual($0, .invalidContentType)
  540. }
  541. }
  542. func testReceiveResponseHeadersWithInvalidContentType() throws {
  543. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  544. var headers = HTTPHeaders()
  545. headers.add(name: "content-type", value: "video/mpeg")
  546. let responseHead = HTTPResponseHead(
  547. version: .init(major: 2, minor: 0),
  548. status: .ok,
  549. headers: headers
  550. )
  551. stateMachine.receiveResponseHead(responseHead).assertFailure {
  552. XCTAssertEqual($0, .invalidContentType)
  553. }
  554. }
  555. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  556. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  557. var headers = HTTPHeaders()
  558. headers.add(name: "content-type", value: "application/grpc")
  559. // Identity should always be supported.
  560. headers.add(name: "grpc-encoding", value: "identity")
  561. let responseHead = HTTPResponseHead(
  562. version: .init(major: 2, minor: 0),
  563. status: .ok,
  564. headers: headers
  565. )
  566. stateMachine.receiveResponseHead(responseHead).assertSuccess()
  567. switch stateMachine.state {
  568. case let .clientActiveServerActive(_, .reading(_, reader)):
  569. XCTAssertEqual(reader.compressionMechanism, .identity)
  570. default:
  571. XCTFail("unexpected state \(stateMachine.state)")
  572. }
  573. }
  574. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  575. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  576. var headers = HTTPHeaders()
  577. headers.add(name: "content-type", value: "application/grpc")
  578. headers.add(name: "grpc-encoding", value: "snappy")
  579. let responseHead = HTTPResponseHead(
  580. version: .init(major: 2, minor: 0),
  581. status: .ok,
  582. headers: headers
  583. )
  584. stateMachine.receiveResponseHead(responseHead).assertFailure {
  585. XCTAssertEqual($0, .unsupportedMessageEncoding)
  586. }
  587. }
  588. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  589. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(writeState: .one(), readArity: .one))
  590. var headers = HTTPHeaders()
  591. headers.add(name: "content-type", value: "application/grpc")
  592. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  593. let responseHead = HTTPResponseHead(
  594. version: .init(major: 2, minor: 0),
  595. status: .ok,
  596. headers: headers
  597. )
  598. stateMachine.receiveResponseHead(responseHead).assertFailure {
  599. XCTAssertEqual($0, .unsupportedMessageEncoding)
  600. }
  601. }
  602. func testReceiveEndOfResponseStreamWithStatus() throws {
  603. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  604. var trailers = HTTPHeaders()
  605. trailers.add(name: "grpc-status", value: "0")
  606. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  607. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  608. XCTAssertEqual(status.message, nil)
  609. }
  610. }
  611. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  612. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  613. var trailers = HTTPHeaders()
  614. trailers.add(name: "grpc-status", value: "999")
  615. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  616. XCTAssertEqual(status.code, .unknown)
  617. }
  618. }
  619. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  620. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  621. var trailers = HTTPHeaders()
  622. trailers.add(name: "grpc-status", value: "not-a-real-status-code")
  623. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  624. XCTAssertEqual(status.code, .unknown)
  625. }
  626. }
  627. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  628. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  629. var trailers = HTTPHeaders()
  630. trailers.add(name: "grpc-status", value: "5")
  631. trailers.add(name: "grpc-message", value: "foo bar 🚀")
  632. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  633. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  634. XCTAssertEqual(status.message, "foo bar 🚀")
  635. }
  636. }
  637. }
  638. class ReadStateTests: GRPCTestCase {
  639. var allocator = ByteBufferAllocator()
  640. func testReadWhenNoExpectedMessages() {
  641. var state: ReadState = .notReading
  642. var buffer = self.allocator.buffer(capacity: 0)
  643. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  644. XCTAssertEqual($0, .cardinalityViolation)
  645. }
  646. state.assertNotReading()
  647. }
  648. func testReadWhenBufferContainsLengthPrefixedJunk() {
  649. var state: ReadState = .many()
  650. var buffer = self.allocator.buffer(capacity: 9)
  651. let bytes: [UInt8] = [
  652. 0x00, // compression flag
  653. 0x00, 0x00, 0x00, 0x04, // message length
  654. 0xaa, 0xbb, 0xcc, 0xdd // message
  655. ]
  656. buffer.writeBytes(bytes)
  657. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  658. XCTAssertEqual($0, .deserializationFailed)
  659. }
  660. state.assertNotReading()
  661. }
  662. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  663. // Write a message into the buffer:
  664. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  665. let writer = LengthPrefixedMessageWriter(compression: .none)
  666. var buffer = self.allocator.buffer(capacity: 0)
  667. writer.write(try message.serializedData(), into: &buffer)
  668. // And some extra junk bytes:
  669. let bytes: [UInt8] = [0x00]
  670. buffer.writeBytes(bytes)
  671. var state: ReadState = .one()
  672. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  673. XCTAssertEqual($0, .leftOverBytes)
  674. }
  675. state.assertNotReading()
  676. }
  677. func testReadTooManyMessagesForOneExpectedMessages() throws {
  678. // Write a message into the buffer twice:
  679. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  680. let writer = LengthPrefixedMessageWriter(compression: .none)
  681. var buffer = self.allocator.buffer(capacity: 0)
  682. writer.write(try message.serializedData(), into: &buffer)
  683. writer.write(try message.serializedData(), into: &buffer)
  684. var state: ReadState = .one()
  685. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertFailure {
  686. XCTAssertEqual($0, .cardinalityViolation)
  687. }
  688. state.assertNotReading()
  689. }
  690. func testReadOneMessageForOneExpectedMessages() throws {
  691. // Write a message into the buffer twice:
  692. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  693. let writer = LengthPrefixedMessageWriter(compression: .none)
  694. var buffer = self.allocator.buffer(capacity: 0)
  695. writer.write(try message.serializedData(), into: &buffer)
  696. var state: ReadState = .one()
  697. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
  698. XCTAssertEqual($0, [message])
  699. }
  700. // We shouldn't be able to read anymore.
  701. state.assertNotReading()
  702. }
  703. func testReadOneMessageForManyExpectedMessages() throws {
  704. // Write a message into the buffer twice:
  705. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  706. let writer = LengthPrefixedMessageWriter(compression: .none)
  707. var buffer = self.allocator.buffer(capacity: 0)
  708. writer.write(try message.serializedData(), into: &buffer)
  709. var state: ReadState = .many()
  710. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
  711. XCTAssertEqual($0, [message])
  712. }
  713. // We should still be able to read.
  714. state.assertReading()
  715. }
  716. func testReadManyMessagesForManyExpectedMessages() throws {
  717. // Write a message into the buffer twice:
  718. let message = Echo_EchoRequest.with { $0.text = "Hello!" }
  719. let writer = LengthPrefixedMessageWriter(compression: .none)
  720. var buffer = self.allocator.buffer(capacity: 0)
  721. writer.write(try message.serializedData(), into: &buffer)
  722. writer.write(try message.serializedData(), into: &buffer)
  723. writer.write(try message.serializedData(), into: &buffer)
  724. var state: ReadState = .many()
  725. state.readMessages(&buffer, as: Echo_EchoRequest.self).assertSuccess {
  726. XCTAssertEqual($0, [message, message, message])
  727. }
  728. // We should still be able to read.
  729. state.assertReading()
  730. }
  731. }
  732. // MARK: Result helpers
  733. extension Result {
  734. /// Asserts the `Result` was a success.
  735. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  736. switch self {
  737. case .success(let success):
  738. do {
  739. try verify(success)
  740. } catch {
  741. XCTFail("verify threw: \(error)")
  742. }
  743. case .failure(let error):
  744. XCTFail("unexpected .failure: \(error)")
  745. }
  746. }
  747. /// Asserts the `Result` was a failure.
  748. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  749. switch self {
  750. case .success(let success):
  751. XCTFail("unexpected .success: \(success)")
  752. case .failure(let error):
  753. do {
  754. try verify(error)
  755. } catch {
  756. XCTFail("verify threw: \(error)")
  757. }
  758. }
  759. }
  760. }
  761. // MARK: ReadState, PendingWriteState, and WriteState helpers
  762. extension ReadState {
  763. static func one() -> ReadState {
  764. let reader = LengthPrefixedMessageReader(
  765. mode: .client,
  766. compressionMechanism: .none,
  767. logger: Logger(label: "io.grpc.reader")
  768. )
  769. return .reading(.one, reader)
  770. }
  771. static func many() -> ReadState {
  772. let reader = LengthPrefixedMessageReader(
  773. mode: .client,
  774. compressionMechanism: .none,
  775. logger: Logger(label: "io.grpc.reader")
  776. )
  777. return .reading(.many, reader)
  778. }
  779. func assertReading() {
  780. switch self {
  781. case .reading:
  782. ()
  783. case .notReading:
  784. XCTFail("unexpected state .notReading")
  785. }
  786. }
  787. func assertNotReading() {
  788. switch self {
  789. case .reading:
  790. XCTFail("unexpected state .reading")
  791. case .notReading:
  792. ()
  793. }
  794. }
  795. }
  796. extension PendingWriteState {
  797. static func one() -> PendingWriteState {
  798. return .init(arity: .one, compression: .none, contentType: .protobuf)
  799. }
  800. static func many() -> PendingWriteState {
  801. return .init(arity: .many, compression: .none, contentType: .protobuf)
  802. }
  803. }
  804. extension WriteState {
  805. static func one() -> WriteState {
  806. return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  807. }
  808. static func many() -> WriteState {
  809. return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  810. }
  811. }