2
0

GRPCClientStateMachineTests.swift 42 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Foundation
  18. @testable import GRPC
  19. import Logging
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOHTTP1
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage(_ message: String) throws -> ByteBuffer {
  35. let buffer = self.allocator.buffer(string: message)
  36. var writer = LengthPrefixedMessageWriter(compression: .none, allocator: .init())
  37. var (buffer1, buffer2) = try writer.write(
  38. buffer: buffer,
  39. compressed: false
  40. )
  41. if var buffer2 = buffer2 {
  42. buffer1.writeBuffer(&buffer2)
  43. }
  44. return buffer1
  45. }
  46. /// Writes a message into the given `buffer`.
  47. func writeMessage(_ message: String, into buffer: inout ByteBuffer) throws {
  48. var other = try self.writeMessage(message)
  49. buffer.writeBuffer(&other)
  50. }
  51. /// Returns a minimally valid `HPACKHeaders` for a response.
  52. func makeResponseHeaders(
  53. status: String? = "200",
  54. contentType: String? = "application/grpc+proto"
  55. ) -> HPACKHeaders {
  56. var headers: HPACKHeaders = [:]
  57. status.map { headers.add(name: ":status", value: $0) }
  58. contentType.map { headers.add(name: "content-type", value: $0) }
  59. return headers
  60. }
  61. }
  62. // MARK: - Send Request Headers
  63. extension GRPCClientStateMachineTests {
  64. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  65. var stateMachine = self.makeStateMachine(state)
  66. stateMachine.sendRequestHeaders(requestHead: .init(
  67. method: "POST",
  68. scheme: "http",
  69. path: "/echo/Get",
  70. host: "host",
  71. deadline: .distantFuture,
  72. customMetadata: [:],
  73. encoding: .disabled
  74. ), allocator: .init()).assertFailure {
  75. XCTAssertEqual($0, .invalidState)
  76. }
  77. }
  78. func testSendRequestHeadersFromIdle() {
  79. var stateMachine = self
  80. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  81. stateMachine.sendRequestHeaders(requestHead: .init(
  82. method: "POST",
  83. scheme: "http",
  84. path: "/echo/Get",
  85. host: "host",
  86. deadline: .distantFuture,
  87. customMetadata: [:],
  88. encoding: .disabled
  89. ), allocator: .init()).assertSuccess()
  90. }
  91. func testSendRequestHeadersFromClientActiveServerIdle() {
  92. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(
  93. writeState: .one(),
  94. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  95. ))
  96. }
  97. func testSendRequestHeadersFromClientClosedServerIdle() {
  98. self
  99. .doTestSendRequestHeadersFromInvalidState(
  100. .clientClosedServerIdle(pendingReadState: .init(
  101. arity: .one,
  102. messageEncoding: .disabled
  103. ))
  104. )
  105. }
  106. func testSendRequestHeadersFromActive() {
  107. self
  108. .doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(
  109. writeState: .one(),
  110. readState: .one()
  111. ))
  112. }
  113. func testSendRequestHeadersFromClientClosedServerActive() {
  114. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  115. }
  116. func testSendRequestHeadersFromClosed() {
  117. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  118. }
  119. }
  120. // MARK: - Send Request
  121. extension GRPCClientStateMachineTests {
  122. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  123. var stateMachine = self.makeStateMachine(state)
  124. stateMachine.sendRequest(
  125. ByteBuffer(string: "Hello!"),
  126. compressed: false
  127. ).assertFailure {
  128. XCTAssertEqual($0, expected)
  129. }
  130. }
  131. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  132. var stateMachine = self.makeStateMachine(state)
  133. let request = "Hello!"
  134. stateMachine.sendRequest(
  135. ByteBuffer(string: request),
  136. compressed: false
  137. ).assertSuccess { buffers in
  138. var buffer = buffers.0
  139. XCTAssertNil(buffers.1)
  140. // Remove the length and compression flag prefix.
  141. buffer.moveReaderIndex(forwardBy: 5)
  142. let data = buffer.readString(length: buffer.readableBytes)!
  143. XCTAssertEqual(request, data)
  144. }
  145. }
  146. func testSendRequestFromIdle() {
  147. self.doTestSendRequestFromInvalidState(
  148. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  149. expected: .invalidState
  150. )
  151. }
  152. func testSendRequestFromClientActiveServerIdle() {
  153. self.doTestSendRequestFromValidState(.clientActiveServerIdle(
  154. writeState: .one(),
  155. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  156. ))
  157. }
  158. func testSendRequestFromClientClosedServerIdle() {
  159. self.doTestSendRequestFromInvalidState(
  160. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  161. expected: .cardinalityViolation
  162. )
  163. }
  164. func testSendRequestFromActive() {
  165. self
  166. .doTestSendRequestFromValidState(.clientActiveServerActive(
  167. writeState: .one(),
  168. readState: .one()
  169. ))
  170. }
  171. func testSendRequestFromClientClosedServerActive() {
  172. self.doTestSendRequestFromInvalidState(
  173. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  174. expected: .cardinalityViolation
  175. )
  176. }
  177. func testSendRequestFromClosed() {
  178. self.doTestSendRequestFromInvalidState(
  179. .clientClosedServerClosed,
  180. expected: .cardinalityViolation
  181. )
  182. }
  183. }
  184. // MARK: - Send End of Request Stream
  185. extension GRPCClientStateMachineTests {
  186. func doTestSendEndOfRequestStreamFromInvalidState(
  187. _ state: StateMachine.State,
  188. expected: SendEndOfRequestStreamError
  189. ) {
  190. var stateMachine = self.makeStateMachine(state)
  191. stateMachine.sendEndOfRequestStream().assertFailure {
  192. XCTAssertEqual($0, expected)
  193. }
  194. }
  195. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  196. var stateMachine = self.makeStateMachine(state)
  197. stateMachine.sendEndOfRequestStream().assertSuccess()
  198. }
  199. func testSendEndOfRequestStreamFromIdle() {
  200. self.doTestSendEndOfRequestStreamFromInvalidState(
  201. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  202. expected: .invalidState
  203. )
  204. }
  205. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  206. self.doTestSendEndOfRequestStreamFromValidState(
  207. .clientActiveServerIdle(
  208. writeState: .one(),
  209. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  210. )
  211. )
  212. }
  213. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  214. self.doTestSendEndOfRequestStreamFromInvalidState(
  215. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  216. expected: .alreadyClosed
  217. )
  218. }
  219. func testSendEndOfRequestStreamFromActive() {
  220. self.doTestSendEndOfRequestStreamFromValidState(
  221. .clientActiveServerActive(writeState: .one(), readState: .one())
  222. )
  223. }
  224. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  225. self.doTestSendEndOfRequestStreamFromInvalidState(
  226. .clientClosedServerActive(readState: .one()),
  227. expected: .alreadyClosed
  228. )
  229. }
  230. func testSendEndOfRequestStreamFromClosed() {
  231. self.doTestSendEndOfRequestStreamFromInvalidState(
  232. .clientClosedServerClosed,
  233. expected: .alreadyClosed
  234. )
  235. }
  236. }
  237. // MARK: - Receive Response Headers
  238. extension GRPCClientStateMachineTests {
  239. func doTestReceiveResponseHeadersFromInvalidState(
  240. _ state: StateMachine.State,
  241. expected: ReceiveResponseHeadError
  242. ) {
  243. var stateMachine = self.makeStateMachine(state)
  244. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  245. XCTAssertEqual($0, expected)
  246. }
  247. }
  248. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  249. var stateMachine = self.makeStateMachine(state)
  250. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  251. }
  252. func testReceiveResponseHeadersFromIdle() {
  253. self.doTestReceiveResponseHeadersFromInvalidState(
  254. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  255. expected: .invalidState
  256. )
  257. }
  258. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  259. self.doTestReceiveResponseHeadersFromValidState(
  260. .clientActiveServerIdle(
  261. writeState: .one(),
  262. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  263. )
  264. )
  265. }
  266. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  267. self.doTestReceiveResponseHeadersFromValidState(
  268. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  269. )
  270. }
  271. func testReceiveResponseHeadersFromActive() {
  272. self.doTestReceiveResponseHeadersFromInvalidState(
  273. .clientActiveServerActive(writeState: .one(), readState: .one()),
  274. expected: .invalidState
  275. )
  276. }
  277. func testReceiveResponseHeadersFromClientClosedServerActive() {
  278. self.doTestReceiveResponseHeadersFromInvalidState(
  279. .clientClosedServerActive(readState: .one()),
  280. expected: .invalidState
  281. )
  282. }
  283. func testReceiveResponseHeadersFromClosed() {
  284. self.doTestReceiveResponseHeadersFromInvalidState(
  285. .clientClosedServerClosed,
  286. expected: .invalidState
  287. )
  288. }
  289. }
  290. // MARK: - Receive Response
  291. extension GRPCClientStateMachineTests {
  292. func doTestReceiveResponseFromInvalidState(
  293. _ state: StateMachine.State,
  294. expected: MessageReadError
  295. ) throws {
  296. var stateMachine = self.makeStateMachine(state)
  297. let message = "Hello!"
  298. var buffer = try self.writeMessage(message)
  299. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  300. XCTAssertEqual($0, expected)
  301. }
  302. }
  303. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  304. var stateMachine = self.makeStateMachine(state)
  305. let message = "Hello!"
  306. var buffer = try self.writeMessage(message)
  307. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess { messages in
  308. XCTAssertEqual(messages, [ByteBuffer(string: message)])
  309. }
  310. }
  311. func testReceiveResponseFromIdle() throws {
  312. try self.doTestReceiveResponseFromInvalidState(
  313. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  314. expected: .invalidState
  315. )
  316. }
  317. func testReceiveResponseFromClientActiveServerIdle() throws {
  318. try self.doTestReceiveResponseFromInvalidState(
  319. .clientActiveServerIdle(
  320. writeState: .one(),
  321. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  322. ),
  323. expected: .invalidState
  324. )
  325. }
  326. func testReceiveResponseFromClientClosedServerIdle() throws {
  327. try self.doTestReceiveResponseFromInvalidState(
  328. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  329. expected: .invalidState
  330. )
  331. }
  332. func testReceiveResponseFromActive() throws {
  333. try self.doTestReceiveResponseFromValidState(
  334. .clientActiveServerActive(writeState: .one(), readState: .one())
  335. )
  336. }
  337. func testReceiveResponseFromClientClosedServerActive() throws {
  338. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  339. }
  340. func testReceiveResponseFromClosed() throws {
  341. try self.doTestReceiveResponseFromInvalidState(
  342. .clientClosedServerClosed,
  343. expected: .invalidState
  344. )
  345. }
  346. }
  347. // MARK: - Receive End of Response Stream
  348. extension GRPCClientStateMachineTests {
  349. func doTestReceiveEndOfResponseStreamFromInvalidState(
  350. _ state: StateMachine.State,
  351. expected: ReceiveEndOfResponseStreamError
  352. ) {
  353. var stateMachine = self.makeStateMachine(state)
  354. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  355. }
  356. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  357. var stateMachine = self.makeStateMachine(state)
  358. var trailers: HPACKHeaders = [
  359. GRPCHeaderName.statusCode: "0",
  360. GRPCHeaderName.statusMessage: "ok",
  361. ]
  362. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  363. // content-type to make a valid set of trailers.
  364. switch state {
  365. case .clientActiveServerIdle,
  366. .clientClosedServerIdle:
  367. trailers.add(name: ":status", value: "200")
  368. trailers.add(name: "content-type", value: "application/grpc+proto")
  369. default:
  370. break
  371. }
  372. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  373. XCTAssertEqual(status.code, .ok)
  374. XCTAssertEqual(status.message, "ok")
  375. }
  376. }
  377. func testReceiveEndOfResponseStreamFromIdle() {
  378. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  379. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  380. expected: .invalidState
  381. )
  382. }
  383. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  384. self.doTestReceiveEndOfResponseStreamFromValidState(
  385. .clientActiveServerIdle(
  386. writeState: .one(),
  387. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  388. )
  389. )
  390. }
  391. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  392. self.doTestReceiveEndOfResponseStreamFromValidState(
  393. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  394. )
  395. }
  396. func testReceiveEndOfResponseStreamFromActive() {
  397. self.doTestReceiveEndOfResponseStreamFromValidState(
  398. .clientActiveServerActive(writeState: .one(), readState: .one())
  399. )
  400. }
  401. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  402. self.doTestReceiveEndOfResponseStreamFromValidState(
  403. .clientClosedServerActive(readState: .one())
  404. )
  405. }
  406. func testReceiveEndOfResponseStreamFromClosed() {
  407. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  408. .clientClosedServerClosed,
  409. expected: .invalidState
  410. )
  411. }
  412. private func doTestReceiveEndStreamOnDataWhenActive(_ state: StateMachine.State) throws {
  413. var stateMachine = self.makeStateMachine(state)
  414. let status = try assertNotNil(stateMachine.receiveEndOfResponseStream())
  415. XCTAssertEqual(status.code, .internalError)
  416. }
  417. func testReceiveEndStreamOnDataClientActiveServerIdle() throws {
  418. try self.doTestReceiveEndStreamOnDataWhenActive(
  419. .clientActiveServerIdle(
  420. writeState: .one(),
  421. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  422. )
  423. )
  424. }
  425. func testReceiveEndStreamOnDataClientClosedServerIdle() throws {
  426. try self.doTestReceiveEndStreamOnDataWhenActive(
  427. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  428. )
  429. }
  430. func testReceiveEndStreamOnDataClientActiveServerActive() throws {
  431. try self.doTestReceiveEndStreamOnDataWhenActive(
  432. .clientActiveServerActive(writeState: .one(), readState: .one())
  433. )
  434. }
  435. func testReceiveEndStreamOnDataClientClosedServerActive() throws {
  436. try self.doTestReceiveEndStreamOnDataWhenActive(
  437. .clientClosedServerActive(readState: .one())
  438. )
  439. }
  440. func testReceiveEndStreamOnDataWhenClosed() {
  441. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  442. // Already closed, end stream is ignored.
  443. XCTAssertNil(stateMachine.receiveEndOfResponseStream())
  444. }
  445. }
  446. // MARK: - Basic RPC flow.
  447. extension GRPCClientStateMachineTests {
  448. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  449. var headers = HPACKHeaders()
  450. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  451. if let message = message {
  452. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  453. }
  454. return headers
  455. }
  456. func testSimpleUnaryFlow() throws {
  457. var stateMachine = self
  458. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  459. // Initiate the RPC
  460. stateMachine.sendRequestHeaders(requestHead: .init(
  461. method: "POST",
  462. scheme: "https",
  463. path: "/echo/Get",
  464. host: "foo",
  465. deadline: .distantFuture,
  466. customMetadata: [:],
  467. encoding: .disabled
  468. ), allocator: .init()).assertSuccess()
  469. // Receive acknowledgement.
  470. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  471. // Send a request.
  472. stateMachine.sendRequest(
  473. ByteBuffer(string: "Hello!"),
  474. compressed: false
  475. ).assertSuccess()
  476. // Close the request stream.
  477. stateMachine.sendEndOfRequestStream().assertSuccess()
  478. // Receive a response.
  479. var buffer = try self.writeMessage("Hello!")
  480. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  481. // Receive the status.
  482. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  483. }
  484. func testSimpleClientActiveFlow() throws {
  485. var stateMachine = self
  486. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  487. // Initiate the RPC
  488. stateMachine.sendRequestHeaders(requestHead: .init(
  489. method: "POST",
  490. scheme: "https",
  491. path: "/echo/Get",
  492. host: "foo",
  493. deadline: .distantFuture,
  494. customMetadata: [:],
  495. encoding: .disabled
  496. ), allocator: .init()).assertSuccess()
  497. // Receive acknowledgement.
  498. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  499. // Send some requests.
  500. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  501. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertSuccess()
  502. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false).assertSuccess()
  503. // Close the request stream.
  504. stateMachine.sendEndOfRequestStream().assertSuccess()
  505. // Receive a response.
  506. var buffer = try self.writeMessage("Hello!")
  507. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  508. // Receive the status.
  509. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  510. }
  511. func testSimpleServerActiveFlow() throws {
  512. var stateMachine = self
  513. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  514. // Initiate the RPC
  515. stateMachine.sendRequestHeaders(requestHead: .init(
  516. method: "POST",
  517. scheme: "https",
  518. path: "/echo/Get",
  519. host: "foo",
  520. deadline: .distantFuture,
  521. customMetadata: [:],
  522. encoding: .disabled
  523. ), allocator: .init()).assertSuccess()
  524. // Receive acknowledgement.
  525. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  526. // Send a request.
  527. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  528. // Close the request stream.
  529. stateMachine.sendEndOfRequestStream().assertSuccess()
  530. // Receive a response.
  531. var firstBuffer = try self.writeMessage("1")
  532. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  533. // Receive two responses in one buffer.
  534. var secondBuffer = try self.writeMessage("2")
  535. try self.writeMessage("3", into: &secondBuffer)
  536. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  537. // Receive the status.
  538. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  539. }
  540. func testSimpleBidirectionalActiveFlow() throws {
  541. var stateMachine = self
  542. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  543. // Initiate the RPC
  544. stateMachine.sendRequestHeaders(requestHead: .init(
  545. method: "POST",
  546. scheme: "https",
  547. path: "/echo/Get",
  548. host: "foo",
  549. deadline: .distantFuture,
  550. customMetadata: [:],
  551. encoding: .disabled
  552. ), allocator: .init()).assertSuccess()
  553. // Receive acknowledgement.
  554. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  555. // Interleave requests and responses:
  556. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  557. // Receive a response.
  558. var firstBuffer = try self.writeMessage("1")
  559. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  560. // Send two more requests.
  561. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertSuccess()
  562. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false).assertSuccess()
  563. // Receive two responses in one buffer.
  564. var secondBuffer = try self.writeMessage("2")
  565. try self.writeMessage("3", into: &secondBuffer)
  566. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  567. // Close the request stream.
  568. stateMachine.sendEndOfRequestStream().assertSuccess()
  569. // Receive the status.
  570. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  571. }
  572. }
  573. // MARK: - Too many requests / responses.
  574. extension GRPCClientStateMachineTests {
  575. func testSendTooManyRequestsFromClientActiveServerIdle() {
  576. for messageCount in [MessageArity.one, MessageArity.many] {
  577. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  578. writeState: .one(),
  579. pendingReadState: .init(arity: messageCount, messageEncoding: .disabled)
  580. ))
  581. // One is fine.
  582. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  583. // Two is not.
  584. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertFailure {
  585. XCTAssertEqual($0, .cardinalityViolation)
  586. }
  587. }
  588. }
  589. func testSendTooManyRequestsFromActive() {
  590. for readState in [ReadState.one(), ReadState.many()] {
  591. var stateMachine = self
  592. .makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  593. // One is fine.
  594. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  595. // Two is not.
  596. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertFailure {
  597. XCTAssertEqual($0, .cardinalityViolation)
  598. }
  599. }
  600. }
  601. func testSendTooManyRequestsFromClosed() {
  602. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  603. // No requests allowed!
  604. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertFailure {
  605. XCTAssertEqual($0, .cardinalityViolation)
  606. }
  607. }
  608. func testReceiveTooManyRequests() throws {
  609. for writeState in [WriteState.one(), WriteState.many()] {
  610. var stateMachine = self
  611. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  612. // One response is fine.
  613. var firstBuffer = try self.writeMessage("foo")
  614. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  615. var secondBuffer = try self.writeMessage("bar")
  616. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertFailure {
  617. XCTAssertEqual($0, .cardinalityViolation)
  618. }
  619. }
  620. }
  621. func testReceiveTooManyRequestsInOneBuffer() throws {
  622. for writeState in [WriteState.one(), WriteState.many()] {
  623. var stateMachine = self
  624. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  625. // Write two responses into a single buffer.
  626. var buffer = try self.writeMessage("foo")
  627. var other = try self.writeMessage("bar")
  628. buffer.writeBuffer(&other)
  629. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  630. XCTAssertEqual($0, .cardinalityViolation)
  631. }
  632. }
  633. }
  634. }
  635. // MARK: - Send Request Headers
  636. extension GRPCClientStateMachineTests {
  637. func testSendRequestHeaders() throws {
  638. var stateMachine = self
  639. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  640. stateMachine.sendRequestHeaders(requestHead: .init(
  641. method: "POST",
  642. scheme: "http",
  643. path: "/echo/Get",
  644. host: "localhost",
  645. deadline: .now() + .hours(1),
  646. customMetadata: ["x-grpc-id": "request-id"],
  647. encoding: .enabled(.init(
  648. forRequests: .identity,
  649. acceptableForResponses: [.identity],
  650. decompressionLimit: .ratio(10)
  651. ))
  652. ), allocator: .init()).assertSuccess { headers in
  653. XCTAssertEqual(headers[":method"], ["POST"])
  654. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  655. XCTAssertEqual(headers[":authority"], ["localhost"])
  656. XCTAssertEqual(headers[":scheme"], ["http"])
  657. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  658. XCTAssertEqual(headers["te"], ["trailers"])
  659. // We convert the deadline into a timeout, we can't be exactly sure what that timeout is.
  660. XCTAssertTrue(headers.contains(name: "grpc-timeout"))
  661. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  662. XCTAssertEqual(headers["grpc-encoding"], ["identity"])
  663. XCTAssertTrue(headers["grpc-accept-encoding"].contains("identity"))
  664. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  665. }
  666. }
  667. func testSendRequestHeadersNormalizesCustomMetadata() throws {
  668. // `HPACKHeaders` uses case-insensitive lookup for header names so we can't check the equality
  669. // for individual headers. We'll pull out the entries we care about by matching a sentinel value
  670. // and then compare `HPACKHeaders` instances (since the equality check *is* case sensitive).
  671. let filterKey = "a-key-for-filtering"
  672. let customMetadata: HPACKHeaders = [
  673. "partiallyLower": filterKey,
  674. "ALLUPPER": filterKey,
  675. ]
  676. var stateMachine = self
  677. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  678. stateMachine.sendRequestHeaders(requestHead: .init(
  679. method: "POST",
  680. scheme: "http",
  681. path: "/echo/Get",
  682. host: "localhost",
  683. deadline: .distantFuture,
  684. customMetadata: customMetadata,
  685. encoding: .disabled
  686. ), allocator: .init()).assertSuccess { headers in
  687. // Pull out the entries we care about by matching values
  688. let filtered = headers.filter { _, value, _ in
  689. value == filterKey
  690. }.map { name, value, _ in
  691. (name, value)
  692. }
  693. let justCustomMetadata = HPACKHeaders(filtered)
  694. let expected: HPACKHeaders = [
  695. "partiallylower": filterKey,
  696. "allupper": filterKey,
  697. ]
  698. XCTAssertEqual(justCustomMetadata, expected)
  699. }
  700. }
  701. func testSendRequestHeadersWithCustomUserAgent() throws {
  702. let customMetadata: HPACKHeaders = [
  703. "user-agent": "test-user-agent",
  704. ]
  705. var stateMachine = self
  706. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  707. stateMachine.sendRequestHeaders(requestHead: .init(
  708. method: "POST",
  709. scheme: "http",
  710. path: "/echo/Get",
  711. host: "localhost",
  712. deadline: .distantFuture,
  713. customMetadata: customMetadata,
  714. encoding: .enabled(.init(
  715. forRequests: nil,
  716. acceptableForResponses: [],
  717. decompressionLimit: .ratio(10)
  718. ))
  719. ), allocator: .init()).assertSuccess { headers in
  720. XCTAssertEqual(headers["user-agent"], ["test-user-agent"])
  721. }
  722. }
  723. func testSendRequestHeadersWithNoCompressionInEitherDirection() throws {
  724. var stateMachine = self
  725. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  726. stateMachine.sendRequestHeaders(requestHead: .init(
  727. method: "POST",
  728. scheme: "http",
  729. path: "/echo/Get",
  730. host: "localhost",
  731. deadline: .distantFuture,
  732. customMetadata: ["x-grpc-id": "request-id"],
  733. encoding: .enabled(.init(
  734. forRequests: nil,
  735. acceptableForResponses: [],
  736. decompressionLimit: .ratio(10)
  737. ))
  738. ), allocator: .init()).assertSuccess { headers in
  739. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  740. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  741. }
  742. }
  743. func testSendRequestHeadersWithNoCompressionForRequests() throws {
  744. var stateMachine = self
  745. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  746. stateMachine.sendRequestHeaders(requestHead: .init(
  747. method: "POST",
  748. scheme: "http",
  749. path: "/echo/Get",
  750. host: "localhost",
  751. deadline: .distantFuture,
  752. customMetadata: ["x-grpc-id": "request-id"],
  753. encoding: .enabled(.init(
  754. forRequests: nil,
  755. acceptableForResponses: [.identity, .gzip],
  756. decompressionLimit: .ratio(10)
  757. ))
  758. ), allocator: .init()).assertSuccess { headers in
  759. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  760. XCTAssertTrue(headers.contains(name: "grpc-accept-encoding"))
  761. }
  762. }
  763. func testSendRequestHeadersWithNoCompressionForResponses() throws {
  764. var stateMachine = self
  765. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  766. stateMachine.sendRequestHeaders(requestHead: .init(
  767. method: "POST",
  768. scheme: "http",
  769. path: "/echo/Get",
  770. host: "localhost",
  771. deadline: .distantFuture,
  772. customMetadata: ["x-grpc-id": "request-id"],
  773. encoding: .enabled(.init(
  774. forRequests: .gzip,
  775. acceptableForResponses: [],
  776. decompressionLimit: .ratio(10)
  777. ))
  778. ), allocator: .init()).assertSuccess { headers in
  779. XCTAssertEqual(headers["grpc-encoding"], ["gzip"])
  780. // This asymmetry is strange but allowed: if a client does not advertise support of the
  781. // compression it is using, the server may still process the message so long as it too
  782. // supports the compression.
  783. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  784. }
  785. }
  786. func testReceiveResponseHeadersWithOkStatus() throws {
  787. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  788. writeState: .one(),
  789. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  790. ))
  791. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  792. }
  793. func testReceiveResponseHeadersWithNotOkStatus() throws {
  794. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  795. writeState: .one(),
  796. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  797. ))
  798. let code = "\(HTTPResponseStatus.paymentRequired.code)"
  799. let headers = self.makeResponseHeaders(status: code)
  800. stateMachine.receiveResponseHeaders(headers).assertFailure {
  801. XCTAssertEqual($0, .invalidHTTPStatus(code))
  802. }
  803. }
  804. func testReceiveResponseHeadersWithoutContentType() throws {
  805. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  806. writeState: .one(),
  807. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  808. ))
  809. let headers = self.makeResponseHeaders(contentType: nil)
  810. stateMachine.receiveResponseHeaders(headers).assertFailure {
  811. XCTAssertEqual($0, .invalidContentType(nil))
  812. }
  813. }
  814. func testReceiveResponseHeadersWithInvalidContentType() throws {
  815. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  816. writeState: .one(),
  817. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  818. ))
  819. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  820. stateMachine.receiveResponseHeaders(headers).assertFailure {
  821. XCTAssertEqual($0, .invalidContentType("video/mpeg"))
  822. }
  823. }
  824. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  825. let configuration = ClientMessageEncoding.Configuration(
  826. forRequests: .none,
  827. acceptableForResponses: [.identity],
  828. decompressionLimit: .ratio(1)
  829. )
  830. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  831. writeState: .one(),
  832. pendingReadState: .init(arity: .one, messageEncoding: .enabled(configuration))
  833. ))
  834. var headers = self.makeResponseHeaders()
  835. // Identity should always be supported.
  836. headers.add(name: "grpc-encoding", value: "identity")
  837. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  838. switch stateMachine.state {
  839. case let .clientActiveServerActive(_, .reading(_, reader)):
  840. XCTAssertEqual(reader.compression?.algorithm, .identity)
  841. default:
  842. XCTFail("unexpected state \(stateMachine.state)")
  843. }
  844. }
  845. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  846. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  847. writeState: .one(),
  848. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  849. ))
  850. var headers = self.makeResponseHeaders()
  851. headers.add(name: "grpc-encoding", value: "snappy")
  852. stateMachine.receiveResponseHeaders(headers).assertFailure {
  853. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  854. }
  855. }
  856. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  857. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  858. writeState: .one(),
  859. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  860. ))
  861. var headers = self.makeResponseHeaders()
  862. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  863. stateMachine.receiveResponseHeaders(headers).assertFailure {
  864. XCTAssertEqual($0, .unsupportedMessageEncoding("not-a-known-compression-(probably)"))
  865. }
  866. }
  867. func testReceiveEndOfResponseStreamWithStatus() throws {
  868. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  869. let trailers: HPACKHeaders = ["grpc-status": "0"]
  870. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  871. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  872. XCTAssertEqual(status.message, nil)
  873. }
  874. }
  875. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  876. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  877. let trailers: HPACKHeaders = ["grpc-status": "999"]
  878. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  879. XCTAssertEqual(status.code, .unknown)
  880. }
  881. }
  882. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  883. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  884. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  885. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  886. XCTAssertEqual(status.code, .unknown)
  887. }
  888. }
  889. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  890. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  891. let trailers: HPACKHeaders = [
  892. "grpc-status": "5",
  893. "grpc-message": "foo bar 🚀",
  894. ]
  895. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  896. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  897. XCTAssertEqual(status.message, "foo bar 🚀")
  898. }
  899. }
  900. func testReceiveTrailersOnlyEndOfResponseStreamWithoutContentType() throws {
  901. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  902. writeState: .one(),
  903. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  904. ))
  905. let trailers: HPACKHeaders = [
  906. ":status": "200",
  907. "grpc-status": "5",
  908. "grpc-message": "foo bar 🚀",
  909. ]
  910. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  911. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  912. XCTAssertEqual(status.message, "foo bar 🚀")
  913. }
  914. }
  915. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidContentType() throws {
  916. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  917. writeState: .one(),
  918. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  919. ))
  920. let trailers: HPACKHeaders = [
  921. ":status": "200",
  922. "grpc-status": "5",
  923. "grpc-message": "foo bar 🚀",
  924. "content-type": "invalid",
  925. ]
  926. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  927. XCTAssertEqual(error, .invalidContentType("invalid"))
  928. }
  929. }
  930. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndValidGRPCStatus() throws {
  931. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  932. writeState: .one(),
  933. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  934. ))
  935. let trailers: HPACKHeaders = [
  936. ":status": "418",
  937. "grpc-status": "5",
  938. ]
  939. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  940. XCTAssertEqual(
  941. error,
  942. .invalidHTTPStatusWithGRPCStatus(GRPCStatus(
  943. code: GRPCStatus.Code(rawValue: 5)!,
  944. message: nil
  945. ))
  946. )
  947. }
  948. }
  949. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndNoGRPCStatus() throws {
  950. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  951. writeState: .one(),
  952. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  953. ))
  954. let trailers: HPACKHeaders = [":status": "418"]
  955. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  956. XCTAssertEqual(error, .invalidHTTPStatus("418"))
  957. }
  958. }
  959. }
  960. class ReadStateTests: GRPCTestCase {
  961. var allocator = ByteBufferAllocator()
  962. func testReadWhenNoExpectedMessages() {
  963. var state: ReadState = .notReading
  964. var buffer = self.allocator.buffer(capacity: 0)
  965. state.readMessages(&buffer, maxLength: .max).assertFailure {
  966. XCTAssertEqual($0, .cardinalityViolation)
  967. }
  968. state.assertNotReading()
  969. }
  970. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  971. // Write a message into the buffer:
  972. let message = ByteBuffer(string: "Hello!")
  973. var writer = LengthPrefixedMessageWriter(compression: .none)
  974. var buffers = try writer.write(buffer: message)
  975. XCTAssertNil(buffers.1)
  976. // And some extra junk bytes:
  977. let bytes: [UInt8] = [0x00]
  978. buffers.0.writeBytes(bytes)
  979. var state: ReadState = .one()
  980. state.readMessages(&buffers.0, maxLength: .max).assertFailure {
  981. XCTAssertEqual($0, .leftOverBytes)
  982. }
  983. state.assertNotReading()
  984. }
  985. func testReadTooManyMessagesForOneExpectedMessages() throws {
  986. // Write a message into the buffer twice:
  987. let message = ByteBuffer(string: "Hello!")
  988. var writer = LengthPrefixedMessageWriter(compression: .none)
  989. var buffers1 = try writer.write(buffer: message)
  990. var buffers2 = try writer.write(buffer: message)
  991. XCTAssertNil(buffers1.1)
  992. XCTAssertNil(buffers2.1)
  993. buffers1.0.writeBuffer(&buffers2.0)
  994. var state: ReadState = .one()
  995. state.readMessages(&buffers1.0, maxLength: .max).assertFailure {
  996. XCTAssertEqual($0, .cardinalityViolation)
  997. }
  998. state.assertNotReading()
  999. }
  1000. func testReadOneMessageForOneExpectedMessages() throws {
  1001. // Write a message into the buffer twice:
  1002. let message = ByteBuffer(string: "Hello!")
  1003. var writer = LengthPrefixedMessageWriter(compression: .none)
  1004. var (buffer, other) = try writer.write(buffer: message)
  1005. XCTAssertNil(other)
  1006. var state: ReadState = .one()
  1007. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1008. XCTAssertEqual($0, [message])
  1009. }
  1010. // We shouldn't be able to read anymore.
  1011. state.assertNotReading()
  1012. }
  1013. func testReadOneMessageForManyExpectedMessages() throws {
  1014. // Write a message into the buffer twice:
  1015. let message = ByteBuffer(string: "Hello!")
  1016. var writer = LengthPrefixedMessageWriter(compression: .none)
  1017. var (buffer, other) = try writer.write(buffer: message)
  1018. XCTAssertNil(other)
  1019. var state: ReadState = .many()
  1020. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1021. XCTAssertEqual($0, [message])
  1022. }
  1023. // We should still be able to read.
  1024. state.assertReading()
  1025. }
  1026. func testReadManyMessagesForManyExpectedMessages() throws {
  1027. // Write a message into the buffer twice:
  1028. let message = ByteBuffer(string: "Hello!")
  1029. var writer = LengthPrefixedMessageWriter(compression: .none)
  1030. var (first, _) = try writer.write(buffer: message)
  1031. var (second, _) = try writer.write(buffer: message)
  1032. var (third, _) = try writer.write(buffer: message)
  1033. first.writeBuffer(&second)
  1034. first.writeBuffer(&third)
  1035. var state: ReadState = .many()
  1036. state.readMessages(&first, maxLength: .max).assertSuccess {
  1037. XCTAssertEqual($0, [message, message, message])
  1038. }
  1039. // We should still be able to read.
  1040. state.assertReading()
  1041. }
  1042. }
  1043. // MARK: Result helpers
  1044. extension Result {
  1045. /// Asserts the `Result` was a success.
  1046. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  1047. switch self {
  1048. case let .success(success):
  1049. do {
  1050. try verify(success)
  1051. } catch {
  1052. XCTFail("verify threw: \(error)")
  1053. }
  1054. case let .failure(error):
  1055. XCTFail("unexpected .failure: \(error)")
  1056. }
  1057. }
  1058. /// Asserts the `Result` was a failure.
  1059. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  1060. switch self {
  1061. case let .success(success):
  1062. XCTFail("unexpected .success: \(success)")
  1063. case let .failure(error):
  1064. do {
  1065. try verify(error)
  1066. } catch {
  1067. XCTFail("verify threw: \(error)")
  1068. }
  1069. }
  1070. }
  1071. }
  1072. // MARK: ReadState, PendingWriteState, and WriteState helpers
  1073. extension ReadState {
  1074. static func one() -> ReadState {
  1075. let reader = LengthPrefixedMessageReader()
  1076. return .reading(.one, reader)
  1077. }
  1078. static func many() -> ReadState {
  1079. let reader = LengthPrefixedMessageReader()
  1080. return .reading(.many, reader)
  1081. }
  1082. func assertReading() {
  1083. switch self {
  1084. case .reading:
  1085. ()
  1086. case .notReading:
  1087. XCTFail("unexpected state .notReading")
  1088. }
  1089. }
  1090. func assertNotReading() {
  1091. switch self {
  1092. case .reading:
  1093. XCTFail("unexpected state .reading")
  1094. case .notReading:
  1095. ()
  1096. }
  1097. }
  1098. }
  1099. extension PendingWriteState {
  1100. static func one() -> PendingWriteState {
  1101. return .init(arity: .one, contentType: .protobuf)
  1102. }
  1103. static func many() -> PendingWriteState {
  1104. return .init(arity: .many, contentType: .protobuf)
  1105. }
  1106. }
  1107. extension WriteState {
  1108. static func one() -> WriteState {
  1109. return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  1110. }
  1111. static func many() -> WriteState {
  1112. return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  1113. }
  1114. }