GRPCClientStateMachineTests.swift 42 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Foundation
  18. @testable import GRPC
  19. import Logging
  20. import NIO
  21. import NIOHPACK
  22. import NIOHTTP1
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage(_ message: String) throws -> ByteBuffer {
  35. let buffer = self.allocator.buffer(string: message)
  36. let writer = LengthPrefixedMessageWriter(compression: .none)
  37. return try writer.write(buffer: buffer, allocator: self.allocator, compressed: false)
  38. }
  39. /// Writes a message into the given `buffer`.
  40. func writeMessage(_ message: String, into buffer: inout ByteBuffer) throws {
  41. var other = try self.writeMessage(message)
  42. buffer.writeBuffer(&other)
  43. }
  44. /// Returns a minimally valid `HPACKHeaders` for a response.
  45. func makeResponseHeaders(
  46. status: String? = "200",
  47. contentType: String? = "application/grpc+proto"
  48. ) -> HPACKHeaders {
  49. var headers: HPACKHeaders = [:]
  50. status.map { headers.add(name: ":status", value: $0) }
  51. contentType.map { headers.add(name: "content-type", value: $0) }
  52. return headers
  53. }
  54. }
  55. // MARK: - Send Request Headers
  56. extension GRPCClientStateMachineTests {
  57. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  58. var stateMachine = self.makeStateMachine(state)
  59. stateMachine.sendRequestHeaders(requestHead: .init(
  60. method: "POST",
  61. scheme: "http",
  62. path: "/echo/Get",
  63. host: "host",
  64. deadline: .distantFuture,
  65. customMetadata: [:],
  66. encoding: .disabled
  67. )).assertFailure {
  68. XCTAssertEqual($0, .invalidState)
  69. }
  70. }
  71. func testSendRequestHeadersFromIdle() {
  72. var stateMachine = self
  73. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  74. stateMachine.sendRequestHeaders(requestHead: .init(
  75. method: "POST",
  76. scheme: "http",
  77. path: "/echo/Get",
  78. host: "host",
  79. deadline: .distantFuture,
  80. customMetadata: [:],
  81. encoding: .disabled
  82. )).assertSuccess()
  83. }
  84. func testSendRequestHeadersFromClientActiveServerIdle() {
  85. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(
  86. writeState: .one(),
  87. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  88. ))
  89. }
  90. func testSendRequestHeadersFromClientClosedServerIdle() {
  91. self
  92. .doTestSendRequestHeadersFromInvalidState(
  93. .clientClosedServerIdle(pendingReadState: .init(
  94. arity: .one,
  95. messageEncoding: .disabled
  96. ))
  97. )
  98. }
  99. func testSendRequestHeadersFromActive() {
  100. self
  101. .doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(
  102. writeState: .one(),
  103. readState: .one()
  104. ))
  105. }
  106. func testSendRequestHeadersFromClientClosedServerActive() {
  107. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  108. }
  109. func testSendRequestHeadersFromClosed() {
  110. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  111. }
  112. }
  113. // MARK: - Send Request
  114. extension GRPCClientStateMachineTests {
  115. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  116. var stateMachine = self.makeStateMachine(state)
  117. stateMachine.sendRequest(
  118. ByteBuffer(string: "Hello!"),
  119. compressed: false,
  120. allocator: self.allocator
  121. ).assertFailure {
  122. XCTAssertEqual($0, expected)
  123. }
  124. }
  125. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  126. var stateMachine = self.makeStateMachine(state)
  127. let request = "Hello!"
  128. stateMachine.sendRequest(
  129. ByteBuffer(string: request),
  130. compressed: false,
  131. allocator: self.allocator
  132. ).assertSuccess { buffer in
  133. var buffer = buffer
  134. // Remove the length and compression flag prefix.
  135. buffer.moveReaderIndex(forwardBy: 5)
  136. let data = buffer.readString(length: buffer.readableBytes)!
  137. XCTAssertEqual(request, data)
  138. }
  139. }
  140. func testSendRequestFromIdle() {
  141. self.doTestSendRequestFromInvalidState(
  142. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  143. expected: .invalidState
  144. )
  145. }
  146. func testSendRequestFromClientActiveServerIdle() {
  147. self.doTestSendRequestFromValidState(.clientActiveServerIdle(
  148. writeState: .one(),
  149. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  150. ))
  151. }
  152. func testSendRequestFromClientClosedServerIdle() {
  153. self.doTestSendRequestFromInvalidState(
  154. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  155. expected: .cardinalityViolation
  156. )
  157. }
  158. func testSendRequestFromActive() {
  159. self
  160. .doTestSendRequestFromValidState(.clientActiveServerActive(
  161. writeState: .one(),
  162. readState: .one()
  163. ))
  164. }
  165. func testSendRequestFromClientClosedServerActive() {
  166. self.doTestSendRequestFromInvalidState(
  167. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  168. expected: .cardinalityViolation
  169. )
  170. }
  171. func testSendRequestFromClosed() {
  172. self.doTestSendRequestFromInvalidState(
  173. .clientClosedServerClosed,
  174. expected: .cardinalityViolation
  175. )
  176. }
  177. }
  178. // MARK: - Send End of Request Stream
  179. extension GRPCClientStateMachineTests {
  180. func doTestSendEndOfRequestStreamFromInvalidState(
  181. _ state: StateMachine.State,
  182. expected: SendEndOfRequestStreamError
  183. ) {
  184. var stateMachine = self.makeStateMachine(state)
  185. stateMachine.sendEndOfRequestStream().assertFailure {
  186. XCTAssertEqual($0, expected)
  187. }
  188. }
  189. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  190. var stateMachine = self.makeStateMachine(state)
  191. stateMachine.sendEndOfRequestStream().assertSuccess()
  192. }
  193. func testSendEndOfRequestStreamFromIdle() {
  194. self.doTestSendEndOfRequestStreamFromInvalidState(
  195. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  196. expected: .invalidState
  197. )
  198. }
  199. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  200. self.doTestSendEndOfRequestStreamFromValidState(
  201. .clientActiveServerIdle(
  202. writeState: .one(),
  203. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  204. )
  205. )
  206. }
  207. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  208. self.doTestSendEndOfRequestStreamFromInvalidState(
  209. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  210. expected: .alreadyClosed
  211. )
  212. }
  213. func testSendEndOfRequestStreamFromActive() {
  214. self.doTestSendEndOfRequestStreamFromValidState(
  215. .clientActiveServerActive(writeState: .one(), readState: .one())
  216. )
  217. }
  218. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  219. self.doTestSendEndOfRequestStreamFromInvalidState(
  220. .clientClosedServerActive(readState: .one()),
  221. expected: .alreadyClosed
  222. )
  223. }
  224. func testSendEndOfRequestStreamFromClosed() {
  225. self.doTestSendEndOfRequestStreamFromInvalidState(
  226. .clientClosedServerClosed,
  227. expected: .alreadyClosed
  228. )
  229. }
  230. }
  231. // MARK: - Receive Response Headers
  232. extension GRPCClientStateMachineTests {
  233. func doTestReceiveResponseHeadersFromInvalidState(
  234. _ state: StateMachine.State,
  235. expected: ReceiveResponseHeadError
  236. ) {
  237. var stateMachine = self.makeStateMachine(state)
  238. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  239. XCTAssertEqual($0, expected)
  240. }
  241. }
  242. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  243. var stateMachine = self.makeStateMachine(state)
  244. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  245. }
  246. func testReceiveResponseHeadersFromIdle() {
  247. self.doTestReceiveResponseHeadersFromInvalidState(
  248. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  249. expected: .invalidState
  250. )
  251. }
  252. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  253. self.doTestReceiveResponseHeadersFromValidState(
  254. .clientActiveServerIdle(
  255. writeState: .one(),
  256. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  257. )
  258. )
  259. }
  260. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  261. self.doTestReceiveResponseHeadersFromValidState(
  262. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  263. )
  264. }
  265. func testReceiveResponseHeadersFromActive() {
  266. self.doTestReceiveResponseHeadersFromInvalidState(
  267. .clientActiveServerActive(writeState: .one(), readState: .one()),
  268. expected: .invalidState
  269. )
  270. }
  271. func testReceiveResponseHeadersFromClientClosedServerActive() {
  272. self.doTestReceiveResponseHeadersFromInvalidState(
  273. .clientClosedServerActive(readState: .one()),
  274. expected: .invalidState
  275. )
  276. }
  277. func testReceiveResponseHeadersFromClosed() {
  278. self.doTestReceiveResponseHeadersFromInvalidState(
  279. .clientClosedServerClosed,
  280. expected: .invalidState
  281. )
  282. }
  283. }
  284. // MARK: - Receive Response
  285. extension GRPCClientStateMachineTests {
  286. func doTestReceiveResponseFromInvalidState(
  287. _ state: StateMachine.State,
  288. expected: MessageReadError
  289. ) throws {
  290. var stateMachine = self.makeStateMachine(state)
  291. let message = "Hello!"
  292. var buffer = try self.writeMessage(message)
  293. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  294. XCTAssertEqual($0, expected)
  295. }
  296. }
  297. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  298. var stateMachine = self.makeStateMachine(state)
  299. let message = "Hello!"
  300. var buffer = try self.writeMessage(message)
  301. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess { messages in
  302. XCTAssertEqual(messages, [ByteBuffer(string: message)])
  303. }
  304. }
  305. func testReceiveResponseFromIdle() throws {
  306. try self.doTestReceiveResponseFromInvalidState(
  307. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  308. expected: .invalidState
  309. )
  310. }
  311. func testReceiveResponseFromClientActiveServerIdle() throws {
  312. try self.doTestReceiveResponseFromInvalidState(
  313. .clientActiveServerIdle(
  314. writeState: .one(),
  315. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  316. ),
  317. expected: .invalidState
  318. )
  319. }
  320. func testReceiveResponseFromClientClosedServerIdle() throws {
  321. try self.doTestReceiveResponseFromInvalidState(
  322. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  323. expected: .invalidState
  324. )
  325. }
  326. func testReceiveResponseFromActive() throws {
  327. try self.doTestReceiveResponseFromValidState(
  328. .clientActiveServerActive(writeState: .one(), readState: .one())
  329. )
  330. }
  331. func testReceiveResponseFromClientClosedServerActive() throws {
  332. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  333. }
  334. func testReceiveResponseFromClosed() throws {
  335. try self.doTestReceiveResponseFromInvalidState(
  336. .clientClosedServerClosed,
  337. expected: .invalidState
  338. )
  339. }
  340. }
  341. // MARK: - Receive End of Response Stream
  342. extension GRPCClientStateMachineTests {
  343. func doTestReceiveEndOfResponseStreamFromInvalidState(
  344. _ state: StateMachine.State,
  345. expected: ReceiveEndOfResponseStreamError
  346. ) {
  347. var stateMachine = self.makeStateMachine(state)
  348. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  349. }
  350. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  351. var stateMachine = self.makeStateMachine(state)
  352. var trailers: HPACKHeaders = [
  353. GRPCHeaderName.statusCode: "0",
  354. GRPCHeaderName.statusMessage: "ok",
  355. ]
  356. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  357. // content-type to make a valid set of trailers.
  358. switch state {
  359. case .clientActiveServerIdle,
  360. .clientClosedServerIdle:
  361. trailers.add(name: ":status", value: "200")
  362. trailers.add(name: "content-type", value: "application/grpc+proto")
  363. default:
  364. break
  365. }
  366. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  367. XCTAssertEqual(status.code, .ok)
  368. XCTAssertEqual(status.message, "ok")
  369. }
  370. }
  371. func testReceiveEndOfResponseStreamFromIdle() {
  372. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  373. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  374. expected: .invalidState
  375. )
  376. }
  377. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  378. self.doTestReceiveEndOfResponseStreamFromValidState(
  379. .clientActiveServerIdle(
  380. writeState: .one(),
  381. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  382. )
  383. )
  384. }
  385. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  386. self.doTestReceiveEndOfResponseStreamFromValidState(
  387. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  388. )
  389. }
  390. func testReceiveEndOfResponseStreamFromActive() {
  391. self.doTestReceiveEndOfResponseStreamFromValidState(
  392. .clientActiveServerActive(writeState: .one(), readState: .one())
  393. )
  394. }
  395. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  396. self.doTestReceiveEndOfResponseStreamFromValidState(
  397. .clientClosedServerActive(readState: .one())
  398. )
  399. }
  400. func testReceiveEndOfResponseStreamFromClosed() {
  401. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  402. .clientClosedServerClosed,
  403. expected: .invalidState
  404. )
  405. }
  406. private func doTestReceiveEndStreamOnDataWhenActive(_ state: StateMachine.State) throws {
  407. var stateMachine = self.makeStateMachine(state)
  408. let status = try assertNotNil(stateMachine.receiveEndOfResponseStream())
  409. XCTAssertEqual(status.code, .internalError)
  410. }
  411. func testReceiveEndStreamOnDataClientActiveServerIdle() throws {
  412. try self.doTestReceiveEndStreamOnDataWhenActive(
  413. .clientActiveServerIdle(
  414. writeState: .one(),
  415. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  416. )
  417. )
  418. }
  419. func testReceiveEndStreamOnDataClientClosedServerIdle() throws {
  420. try self.doTestReceiveEndStreamOnDataWhenActive(
  421. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  422. )
  423. }
  424. func testReceiveEndStreamOnDataClientActiveServerActive() throws {
  425. try self.doTestReceiveEndStreamOnDataWhenActive(
  426. .clientActiveServerActive(writeState: .one(), readState: .one())
  427. )
  428. }
  429. func testReceiveEndStreamOnDataClientClosedServerActive() throws {
  430. try self.doTestReceiveEndStreamOnDataWhenActive(
  431. .clientClosedServerActive(readState: .one())
  432. )
  433. }
  434. func testReceiveEndStreamOnDataWhenClosed() {
  435. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  436. // Already closed, end stream is ignored.
  437. XCTAssertNil(stateMachine.receiveEndOfResponseStream())
  438. }
  439. }
  440. // MARK: - Basic RPC flow.
  441. extension GRPCClientStateMachineTests {
  442. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  443. var headers = HPACKHeaders()
  444. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  445. if let message = message {
  446. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  447. }
  448. return headers
  449. }
  450. func testSimpleUnaryFlow() throws {
  451. var stateMachine = self
  452. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  453. // Initiate the RPC
  454. stateMachine.sendRequestHeaders(requestHead: .init(
  455. method: "POST",
  456. scheme: "https",
  457. path: "/echo/Get",
  458. host: "foo",
  459. deadline: .distantFuture,
  460. customMetadata: [:],
  461. encoding: .disabled
  462. )).assertSuccess()
  463. // Receive acknowledgement.
  464. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  465. // Send a request.
  466. stateMachine.sendRequest(
  467. ByteBuffer(string: "Hello!"),
  468. compressed: false,
  469. allocator: self.allocator
  470. ).assertSuccess()
  471. // Close the request stream.
  472. stateMachine.sendEndOfRequestStream().assertSuccess()
  473. // Receive a response.
  474. var buffer = try self.writeMessage("Hello!")
  475. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  476. // Receive the status.
  477. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  478. }
  479. func testSimpleClientActiveFlow() throws {
  480. var stateMachine = self
  481. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  482. // Initiate the RPC
  483. stateMachine.sendRequestHeaders(requestHead: .init(
  484. method: "POST",
  485. scheme: "https",
  486. path: "/echo/Get",
  487. host: "foo",
  488. deadline: .distantFuture,
  489. customMetadata: [:],
  490. encoding: .disabled
  491. )).assertSuccess()
  492. // Receive acknowledgement.
  493. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  494. // Send some requests.
  495. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  496. .assertSuccess()
  497. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  498. .assertSuccess()
  499. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false, allocator: self.allocator)
  500. .assertSuccess()
  501. // Close the request stream.
  502. stateMachine.sendEndOfRequestStream().assertSuccess()
  503. // Receive a response.
  504. var buffer = try self.writeMessage("Hello!")
  505. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  506. // Receive the status.
  507. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  508. }
  509. func testSimpleServerActiveFlow() throws {
  510. var stateMachine = self
  511. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  512. // Initiate the RPC
  513. stateMachine.sendRequestHeaders(requestHead: .init(
  514. method: "POST",
  515. scheme: "https",
  516. path: "/echo/Get",
  517. host: "foo",
  518. deadline: .distantFuture,
  519. customMetadata: [:],
  520. encoding: .disabled
  521. )).assertSuccess()
  522. // Receive acknowledgement.
  523. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  524. // Send a request.
  525. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  526. .assertSuccess()
  527. // Close the request stream.
  528. stateMachine.sendEndOfRequestStream().assertSuccess()
  529. // Receive a response.
  530. var firstBuffer = try self.writeMessage("1")
  531. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  532. // Receive two responses in one buffer.
  533. var secondBuffer = try self.writeMessage("2")
  534. try self.writeMessage("3", into: &secondBuffer)
  535. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  536. // Receive the status.
  537. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  538. }
  539. func testSimpleBidirectionalActiveFlow() throws {
  540. var stateMachine = self
  541. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  542. // Initiate the RPC
  543. stateMachine.sendRequestHeaders(requestHead: .init(
  544. method: "POST",
  545. scheme: "https",
  546. path: "/echo/Get",
  547. host: "foo",
  548. deadline: .distantFuture,
  549. customMetadata: [:],
  550. encoding: .disabled
  551. )).assertSuccess()
  552. // Receive acknowledgement.
  553. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  554. // Interleave requests and responses:
  555. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  556. .assertSuccess()
  557. // Receive a response.
  558. var firstBuffer = try self.writeMessage("1")
  559. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  560. // Send two more requests.
  561. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  562. .assertSuccess()
  563. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false, allocator: self.allocator)
  564. .assertSuccess()
  565. // Receive two responses in one buffer.
  566. var secondBuffer = try self.writeMessage("2")
  567. try self.writeMessage("3", into: &secondBuffer)
  568. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  569. // Close the request stream.
  570. stateMachine.sendEndOfRequestStream().assertSuccess()
  571. // Receive the status.
  572. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  573. }
  574. }
  575. // MARK: - Too many requests / responses.
  576. extension GRPCClientStateMachineTests {
  577. func testSendTooManyRequestsFromClientActiveServerIdle() {
  578. for messageCount in [MessageArity.one, MessageArity.many] {
  579. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  580. writeState: .one(),
  581. pendingReadState: .init(arity: messageCount, messageEncoding: .disabled)
  582. ))
  583. // One is fine.
  584. stateMachine
  585. .sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  586. .assertSuccess()
  587. // Two is not.
  588. stateMachine
  589. .sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  590. .assertFailure {
  591. XCTAssertEqual($0, .cardinalityViolation)
  592. }
  593. }
  594. }
  595. func testSendTooManyRequestsFromActive() {
  596. for readState in [ReadState.one(), ReadState.many()] {
  597. var stateMachine = self
  598. .makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  599. // One is fine.
  600. stateMachine
  601. .sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  602. .assertSuccess()
  603. // Two is not.
  604. stateMachine
  605. .sendRequest(ByteBuffer(string: "2"), compressed: false, allocator: self.allocator)
  606. .assertFailure {
  607. XCTAssertEqual($0, .cardinalityViolation)
  608. }
  609. }
  610. }
  611. func testSendTooManyRequestsFromClosed() {
  612. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  613. // No requests allowed!
  614. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false, allocator: self.allocator)
  615. .assertFailure {
  616. XCTAssertEqual($0, .cardinalityViolation)
  617. }
  618. }
  619. func testReceiveTooManyRequests() throws {
  620. for writeState in [WriteState.one(), WriteState.many()] {
  621. var stateMachine = self
  622. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  623. // One response is fine.
  624. var firstBuffer = try self.writeMessage("foo")
  625. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  626. var secondBuffer = try self.writeMessage("bar")
  627. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertFailure {
  628. XCTAssertEqual($0, .cardinalityViolation)
  629. }
  630. }
  631. }
  632. func testReceiveTooManyRequestsInOneBuffer() throws {
  633. for writeState in [WriteState.one(), WriteState.many()] {
  634. var stateMachine = self
  635. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  636. // Write two responses into a single buffer.
  637. var buffer = try self.writeMessage("foo")
  638. var other = try self.writeMessage("bar")
  639. buffer.writeBuffer(&other)
  640. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  641. XCTAssertEqual($0, .cardinalityViolation)
  642. }
  643. }
  644. }
  645. }
  646. // MARK: - Send Request Headers
  647. extension GRPCClientStateMachineTests {
  648. func testSendRequestHeaders() throws {
  649. var stateMachine = self
  650. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  651. stateMachine.sendRequestHeaders(requestHead: .init(
  652. method: "POST",
  653. scheme: "http",
  654. path: "/echo/Get",
  655. host: "localhost",
  656. deadline: .now() + .hours(1),
  657. customMetadata: ["x-grpc-id": "request-id"],
  658. encoding: .enabled(.init(
  659. forRequests: .identity,
  660. acceptableForResponses: [.identity],
  661. decompressionLimit: .ratio(10)
  662. ))
  663. )).assertSuccess { headers in
  664. XCTAssertEqual(headers[":method"], ["POST"])
  665. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  666. XCTAssertEqual(headers[":authority"], ["localhost"])
  667. XCTAssertEqual(headers[":scheme"], ["http"])
  668. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  669. XCTAssertEqual(headers["te"], ["trailers"])
  670. // We convert the deadline into a timeout, we can't be exactly sure what that timeout is.
  671. XCTAssertTrue(headers.contains(name: "grpc-timeout"))
  672. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  673. XCTAssertEqual(headers["grpc-encoding"], ["identity"])
  674. XCTAssertTrue(headers["grpc-accept-encoding"].contains("identity"))
  675. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  676. }
  677. }
  678. func testSendRequestHeadersNormalizesCustomMetadata() throws {
  679. // `HPACKHeaders` uses case-insensitive lookup for header names so we can't check the equality
  680. // for individual headers. We'll pull out the entries we care about by matching a sentinel value
  681. // and then compare `HPACKHeaders` instances (since the equality check *is* case sensitive).
  682. let filterKey = "a-key-for-filtering"
  683. let customMetadata: HPACKHeaders = [
  684. "partiallyLower": filterKey,
  685. "ALLUPPER": filterKey,
  686. ]
  687. var stateMachine = self
  688. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  689. stateMachine.sendRequestHeaders(requestHead: .init(
  690. method: "POST",
  691. scheme: "http",
  692. path: "/echo/Get",
  693. host: "localhost",
  694. deadline: .distantFuture,
  695. customMetadata: customMetadata,
  696. encoding: .disabled
  697. )).assertSuccess { headers in
  698. // Pull out the entries we care about by matching values
  699. let filtered = headers.filter { _, value, _ in
  700. value == filterKey
  701. }.map { name, value, _ in
  702. (name, value)
  703. }
  704. let justCustomMetadata = HPACKHeaders(filtered)
  705. let expected: HPACKHeaders = [
  706. "partiallylower": filterKey,
  707. "allupper": filterKey,
  708. ]
  709. XCTAssertEqual(justCustomMetadata, expected)
  710. }
  711. }
  712. func testSendRequestHeadersWithCustomUserAgent() throws {
  713. let customMetadata: HPACKHeaders = [
  714. "user-agent": "test-user-agent",
  715. ]
  716. var stateMachine = self
  717. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  718. stateMachine.sendRequestHeaders(requestHead: .init(
  719. method: "POST",
  720. scheme: "http",
  721. path: "/echo/Get",
  722. host: "localhost",
  723. deadline: .distantFuture,
  724. customMetadata: customMetadata,
  725. encoding: .enabled(.init(
  726. forRequests: nil,
  727. acceptableForResponses: [],
  728. decompressionLimit: .ratio(10)
  729. ))
  730. )).assertSuccess { headers in
  731. XCTAssertEqual(headers["user-agent"], ["test-user-agent"])
  732. }
  733. }
  734. func testSendRequestHeadersWithNoCompressionInEitherDirection() throws {
  735. var stateMachine = self
  736. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  737. stateMachine.sendRequestHeaders(requestHead: .init(
  738. method: "POST",
  739. scheme: "http",
  740. path: "/echo/Get",
  741. host: "localhost",
  742. deadline: .distantFuture,
  743. customMetadata: ["x-grpc-id": "request-id"],
  744. encoding: .enabled(.init(
  745. forRequests: nil,
  746. acceptableForResponses: [],
  747. decompressionLimit: .ratio(10)
  748. ))
  749. )).assertSuccess { headers in
  750. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  751. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  752. }
  753. }
  754. func testSendRequestHeadersWithNoCompressionForRequests() throws {
  755. var stateMachine = self
  756. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  757. stateMachine.sendRequestHeaders(requestHead: .init(
  758. method: "POST",
  759. scheme: "http",
  760. path: "/echo/Get",
  761. host: "localhost",
  762. deadline: .distantFuture,
  763. customMetadata: ["x-grpc-id": "request-id"],
  764. encoding: .enabled(.init(
  765. forRequests: nil,
  766. acceptableForResponses: [.identity, .gzip],
  767. decompressionLimit: .ratio(10)
  768. ))
  769. )).assertSuccess { headers in
  770. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  771. XCTAssertTrue(headers.contains(name: "grpc-accept-encoding"))
  772. }
  773. }
  774. func testSendRequestHeadersWithNoCompressionForResponses() throws {
  775. var stateMachine = self
  776. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  777. stateMachine.sendRequestHeaders(requestHead: .init(
  778. method: "POST",
  779. scheme: "http",
  780. path: "/echo/Get",
  781. host: "localhost",
  782. deadline: .distantFuture,
  783. customMetadata: ["x-grpc-id": "request-id"],
  784. encoding: .enabled(.init(
  785. forRequests: .gzip,
  786. acceptableForResponses: [],
  787. decompressionLimit: .ratio(10)
  788. ))
  789. )).assertSuccess { headers in
  790. XCTAssertEqual(headers["grpc-encoding"], ["gzip"])
  791. // This asymmetry is strange but allowed: if a client does not advertise support of the
  792. // compression it is using, the server may still process the message so long as it too
  793. // supports the compression.
  794. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  795. }
  796. }
  797. func testReceiveResponseHeadersWithOkStatus() throws {
  798. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  799. writeState: .one(),
  800. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  801. ))
  802. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  803. }
  804. func testReceiveResponseHeadersWithNotOkStatus() throws {
  805. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  806. writeState: .one(),
  807. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  808. ))
  809. let code = "\(HTTPResponseStatus.paymentRequired.code)"
  810. let headers = self.makeResponseHeaders(status: code)
  811. stateMachine.receiveResponseHeaders(headers).assertFailure {
  812. XCTAssertEqual($0, .invalidHTTPStatus(code))
  813. }
  814. }
  815. func testReceiveResponseHeadersWithoutContentType() throws {
  816. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  817. writeState: .one(),
  818. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  819. ))
  820. let headers = self.makeResponseHeaders(contentType: nil)
  821. stateMachine.receiveResponseHeaders(headers).assertFailure {
  822. XCTAssertEqual($0, .invalidContentType(nil))
  823. }
  824. }
  825. func testReceiveResponseHeadersWithInvalidContentType() throws {
  826. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  827. writeState: .one(),
  828. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  829. ))
  830. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  831. stateMachine.receiveResponseHeaders(headers).assertFailure {
  832. XCTAssertEqual($0, .invalidContentType("video/mpeg"))
  833. }
  834. }
  835. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  836. let configuration = ClientMessageEncoding.Configuration(
  837. forRequests: .none,
  838. acceptableForResponses: [.identity],
  839. decompressionLimit: .ratio(1)
  840. )
  841. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  842. writeState: .one(),
  843. pendingReadState: .init(arity: .one, messageEncoding: .enabled(configuration))
  844. ))
  845. var headers = self.makeResponseHeaders()
  846. // Identity should always be supported.
  847. headers.add(name: "grpc-encoding", value: "identity")
  848. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  849. switch stateMachine.state {
  850. case let .clientActiveServerActive(_, .reading(_, reader)):
  851. XCTAssertEqual(reader.compression?.algorithm, .identity)
  852. default:
  853. XCTFail("unexpected state \(stateMachine.state)")
  854. }
  855. }
  856. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  857. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  858. writeState: .one(),
  859. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  860. ))
  861. var headers = self.makeResponseHeaders()
  862. headers.add(name: "grpc-encoding", value: "snappy")
  863. stateMachine.receiveResponseHeaders(headers).assertFailure {
  864. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  865. }
  866. }
  867. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  868. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  869. writeState: .one(),
  870. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  871. ))
  872. var headers = self.makeResponseHeaders()
  873. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  874. stateMachine.receiveResponseHeaders(headers).assertFailure {
  875. XCTAssertEqual($0, .unsupportedMessageEncoding("not-a-known-compression-(probably)"))
  876. }
  877. }
  878. func testReceiveEndOfResponseStreamWithStatus() throws {
  879. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  880. let trailers: HPACKHeaders = ["grpc-status": "0"]
  881. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  882. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  883. XCTAssertEqual(status.message, nil)
  884. }
  885. }
  886. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  887. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  888. let trailers: HPACKHeaders = ["grpc-status": "999"]
  889. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  890. XCTAssertEqual(status.code, .unknown)
  891. }
  892. }
  893. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  894. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  895. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  896. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  897. XCTAssertEqual(status.code, .unknown)
  898. }
  899. }
  900. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  901. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  902. let trailers: HPACKHeaders = [
  903. "grpc-status": "5",
  904. "grpc-message": "foo bar 🚀",
  905. ]
  906. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  907. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  908. XCTAssertEqual(status.message, "foo bar 🚀")
  909. }
  910. }
  911. func testReceiveTrailersOnlyEndOfResponseStreamWithoutContentType() throws {
  912. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  913. writeState: .one(),
  914. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  915. ))
  916. let trailers: HPACKHeaders = [
  917. ":status": "200",
  918. "grpc-status": "5",
  919. "grpc-message": "foo bar 🚀",
  920. ]
  921. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  922. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  923. XCTAssertEqual(status.message, "foo bar 🚀")
  924. }
  925. }
  926. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidContentType() throws {
  927. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  928. writeState: .one(),
  929. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  930. ))
  931. let trailers: HPACKHeaders = [
  932. ":status": "200",
  933. "grpc-status": "5",
  934. "grpc-message": "foo bar 🚀",
  935. "content-type": "invalid",
  936. ]
  937. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  938. XCTAssertEqual(error, .invalidContentType("invalid"))
  939. }
  940. }
  941. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndValidGRPCStatus() throws {
  942. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  943. writeState: .one(),
  944. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  945. ))
  946. let trailers: HPACKHeaders = [
  947. ":status": "418",
  948. "grpc-status": "5",
  949. ]
  950. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  951. XCTAssertEqual(
  952. error,
  953. .invalidHTTPStatusWithGRPCStatus(GRPCStatus(
  954. code: GRPCStatus.Code(rawValue: 5)!,
  955. message: nil
  956. ))
  957. )
  958. }
  959. }
  960. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndNoGRPCStatus() throws {
  961. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  962. writeState: .one(),
  963. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  964. ))
  965. let trailers: HPACKHeaders = [":status": "418"]
  966. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  967. XCTAssertEqual(error, .invalidHTTPStatus("418"))
  968. }
  969. }
  970. }
  971. class ReadStateTests: GRPCTestCase {
  972. var allocator = ByteBufferAllocator()
  973. func testReadWhenNoExpectedMessages() {
  974. var state: ReadState = .notReading
  975. var buffer = self.allocator.buffer(capacity: 0)
  976. state.readMessages(&buffer, maxLength: .max).assertFailure {
  977. XCTAssertEqual($0, .cardinalityViolation)
  978. }
  979. state.assertNotReading()
  980. }
  981. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  982. // Write a message into the buffer:
  983. let message = ByteBuffer(string: "Hello!")
  984. let writer = LengthPrefixedMessageWriter(compression: .none)
  985. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  986. // And some extra junk bytes:
  987. let bytes: [UInt8] = [0x00]
  988. buffer.writeBytes(bytes)
  989. var state: ReadState = .one()
  990. state.readMessages(&buffer, maxLength: .max).assertFailure {
  991. XCTAssertEqual($0, .leftOverBytes)
  992. }
  993. state.assertNotReading()
  994. }
  995. func testReadTooManyMessagesForOneExpectedMessages() throws {
  996. // Write a message into the buffer twice:
  997. let message = ByteBuffer(string: "Hello!")
  998. let writer = LengthPrefixedMessageWriter(compression: .none)
  999. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  1000. var second = try writer.write(buffer: message, allocator: self.allocator)
  1001. buffer.writeBuffer(&second)
  1002. var state: ReadState = .one()
  1003. state.readMessages(&buffer, maxLength: .max).assertFailure {
  1004. XCTAssertEqual($0, .cardinalityViolation)
  1005. }
  1006. state.assertNotReading()
  1007. }
  1008. func testReadOneMessageForOneExpectedMessages() throws {
  1009. // Write a message into the buffer twice:
  1010. let message = ByteBuffer(string: "Hello!")
  1011. let writer = LengthPrefixedMessageWriter(compression: .none)
  1012. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  1013. var state: ReadState = .one()
  1014. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1015. XCTAssertEqual($0, [message])
  1016. }
  1017. // We shouldn't be able to read anymore.
  1018. state.assertNotReading()
  1019. }
  1020. func testReadOneMessageForManyExpectedMessages() throws {
  1021. // Write a message into the buffer twice:
  1022. let message = ByteBuffer(string: "Hello!")
  1023. let writer = LengthPrefixedMessageWriter(compression: .none)
  1024. var buffer = try writer.write(buffer: message, allocator: self.allocator)
  1025. var state: ReadState = .many()
  1026. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1027. XCTAssertEqual($0, [message])
  1028. }
  1029. // We should still be able to read.
  1030. state.assertReading()
  1031. }
  1032. func testReadManyMessagesForManyExpectedMessages() throws {
  1033. // Write a message into the buffer twice:
  1034. let message = ByteBuffer(string: "Hello!")
  1035. let writer = LengthPrefixedMessageWriter(compression: .none)
  1036. var first = try writer.write(buffer: message, allocator: self.allocator)
  1037. var second = try writer.write(buffer: message, allocator: self.allocator)
  1038. var third = try writer.write(buffer: message, allocator: self.allocator)
  1039. first.writeBuffer(&second)
  1040. first.writeBuffer(&third)
  1041. var state: ReadState = .many()
  1042. state.readMessages(&first, maxLength: .max).assertSuccess {
  1043. XCTAssertEqual($0, [message, message, message])
  1044. }
  1045. // We should still be able to read.
  1046. state.assertReading()
  1047. }
  1048. }
  1049. // MARK: Result helpers
  1050. extension Result {
  1051. /// Asserts the `Result` was a success.
  1052. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  1053. switch self {
  1054. case let .success(success):
  1055. do {
  1056. try verify(success)
  1057. } catch {
  1058. XCTFail("verify threw: \(error)")
  1059. }
  1060. case let .failure(error):
  1061. XCTFail("unexpected .failure: \(error)")
  1062. }
  1063. }
  1064. /// Asserts the `Result` was a failure.
  1065. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  1066. switch self {
  1067. case let .success(success):
  1068. XCTFail("unexpected .success: \(success)")
  1069. case let .failure(error):
  1070. do {
  1071. try verify(error)
  1072. } catch {
  1073. XCTFail("verify threw: \(error)")
  1074. }
  1075. }
  1076. }
  1077. }
  1078. // MARK: ReadState, PendingWriteState, and WriteState helpers
  1079. extension ReadState {
  1080. static func one() -> ReadState {
  1081. let reader = LengthPrefixedMessageReader()
  1082. return .reading(.one, reader)
  1083. }
  1084. static func many() -> ReadState {
  1085. let reader = LengthPrefixedMessageReader()
  1086. return .reading(.many, reader)
  1087. }
  1088. func assertReading() {
  1089. switch self {
  1090. case .reading:
  1091. ()
  1092. case .notReading:
  1093. XCTFail("unexpected state .notReading")
  1094. }
  1095. }
  1096. func assertNotReading() {
  1097. switch self {
  1098. case .reading:
  1099. XCTFail("unexpected state .reading")
  1100. case .notReading:
  1101. ()
  1102. }
  1103. }
  1104. }
  1105. extension PendingWriteState {
  1106. static func one() -> PendingWriteState {
  1107. return .init(arity: .one, contentType: .protobuf)
  1108. }
  1109. static func many() -> PendingWriteState {
  1110. return .init(arity: .many, contentType: .protobuf)
  1111. }
  1112. }
  1113. extension WriteState {
  1114. static func one() -> WriteState {
  1115. return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  1116. }
  1117. static func many() -> WriteState {
  1118. return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
  1119. }
  1120. }