GRPCClientStateMachineTests.swift 42 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Foundation
  18. @testable import GRPC
  19. import Logging
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOHTTP1
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage(_ message: String) throws -> ByteBuffer {
  35. let buffer = self.allocator.buffer(string: message)
  36. var writer = LengthPrefixedMessageWriter(compression: .none, allocator: .init())
  37. var (buffer1, buffer2) = try writer.write(
  38. buffer: buffer,
  39. compressed: false
  40. )
  41. if var buffer2 = buffer2 {
  42. buffer1.writeBuffer(&buffer2)
  43. }
  44. return buffer1
  45. }
  46. /// Writes a message into the given `buffer`.
  47. func writeMessage(_ message: String, into buffer: inout ByteBuffer) throws {
  48. var other = try self.writeMessage(message)
  49. buffer.writeBuffer(&other)
  50. }
  51. /// Returns a minimally valid `HPACKHeaders` for a response.
  52. func makeResponseHeaders(
  53. status: String? = "200",
  54. contentType: String? = "application/grpc+proto"
  55. ) -> HPACKHeaders {
  56. var headers: HPACKHeaders = [:]
  57. status.map { headers.add(name: ":status", value: $0) }
  58. contentType.map { headers.add(name: "content-type", value: $0) }
  59. return headers
  60. }
  61. }
  62. // MARK: - Send Request Headers
  63. extension GRPCClientStateMachineTests {
  64. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  65. var stateMachine = self.makeStateMachine(state)
  66. stateMachine.sendRequestHeaders(requestHead: .init(
  67. method: "POST",
  68. scheme: "http",
  69. path: "/echo/Get",
  70. host: "host",
  71. deadline: .distantFuture,
  72. customMetadata: [:],
  73. encoding: .disabled
  74. ), allocator: .init()).assertFailure {
  75. XCTAssertEqual($0, .invalidState)
  76. }
  77. }
  78. func testSendRequestHeadersFromIdle() {
  79. var stateMachine = self
  80. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  81. stateMachine.sendRequestHeaders(requestHead: .init(
  82. method: "POST",
  83. scheme: "http",
  84. path: "/echo/Get",
  85. host: "host",
  86. deadline: .distantFuture,
  87. customMetadata: [:],
  88. encoding: .disabled
  89. ), allocator: .init()).assertSuccess()
  90. }
  91. func testSendRequestHeadersFromClientActiveServerIdle() {
  92. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(
  93. writeState: .one(),
  94. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  95. ))
  96. }
  97. func testSendRequestHeadersFromClientClosedServerIdle() {
  98. self
  99. .doTestSendRequestHeadersFromInvalidState(
  100. .clientClosedServerIdle(pendingReadState: .init(
  101. arity: .one,
  102. messageEncoding: .disabled
  103. ))
  104. )
  105. }
  106. func testSendRequestHeadersFromActive() {
  107. self
  108. .doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(
  109. writeState: .one(),
  110. readState: .one()
  111. ))
  112. }
  113. func testSendRequestHeadersFromClientClosedServerActive() {
  114. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  115. }
  116. func testSendRequestHeadersFromClosed() {
  117. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  118. }
  119. }
  120. // MARK: - Send Request
  121. extension GRPCClientStateMachineTests {
  122. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  123. var stateMachine = self.makeStateMachine(state)
  124. stateMachine.sendRequest(
  125. ByteBuffer(string: "Hello!"),
  126. compressed: false
  127. ).assertFailure {
  128. XCTAssertEqual($0, expected)
  129. }
  130. }
  131. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  132. var stateMachine = self.makeStateMachine(state)
  133. let request = "Hello!"
  134. stateMachine.sendRequest(
  135. ByteBuffer(string: request),
  136. compressed: false
  137. ).assertSuccess()
  138. }
  139. func testSendRequestFromIdle() {
  140. self.doTestSendRequestFromInvalidState(
  141. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  142. expected: .invalidState
  143. )
  144. }
  145. func testSendRequestFromClientActiveServerIdle() {
  146. self.doTestSendRequestFromValidState(.clientActiveServerIdle(
  147. writeState: .one(),
  148. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  149. ))
  150. }
  151. func testSendRequestFromClientClosedServerIdle() {
  152. self.doTestSendRequestFromInvalidState(
  153. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  154. expected: .cardinalityViolation
  155. )
  156. }
  157. func testSendRequestFromActive() {
  158. self
  159. .doTestSendRequestFromValidState(.clientActiveServerActive(
  160. writeState: .one(),
  161. readState: .one()
  162. ))
  163. }
  164. func testSendRequestFromClientClosedServerActive() {
  165. self.doTestSendRequestFromInvalidState(
  166. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  167. expected: .cardinalityViolation
  168. )
  169. }
  170. func testSendRequestFromClosed() {
  171. self.doTestSendRequestFromInvalidState(
  172. .clientClosedServerClosed,
  173. expected: .cardinalityViolation
  174. )
  175. }
  176. }
  177. // MARK: - Send End of Request Stream
  178. extension GRPCClientStateMachineTests {
  179. func doTestSendEndOfRequestStreamFromInvalidState(
  180. _ state: StateMachine.State,
  181. expected: SendEndOfRequestStreamError
  182. ) {
  183. var stateMachine = self.makeStateMachine(state)
  184. stateMachine.sendEndOfRequestStream().assertFailure {
  185. XCTAssertEqual($0, expected)
  186. }
  187. }
  188. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  189. var stateMachine = self.makeStateMachine(state)
  190. stateMachine.sendEndOfRequestStream().assertSuccess()
  191. }
  192. func testSendEndOfRequestStreamFromIdle() {
  193. self.doTestSendEndOfRequestStreamFromInvalidState(
  194. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  195. expected: .invalidState
  196. )
  197. }
  198. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  199. self.doTestSendEndOfRequestStreamFromValidState(
  200. .clientActiveServerIdle(
  201. writeState: .one(),
  202. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  203. )
  204. )
  205. }
  206. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  207. self.doTestSendEndOfRequestStreamFromInvalidState(
  208. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  209. expected: .alreadyClosed
  210. )
  211. }
  212. func testSendEndOfRequestStreamFromActive() {
  213. self.doTestSendEndOfRequestStreamFromValidState(
  214. .clientActiveServerActive(writeState: .one(), readState: .one())
  215. )
  216. }
  217. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  218. self.doTestSendEndOfRequestStreamFromInvalidState(
  219. .clientClosedServerActive(readState: .one()),
  220. expected: .alreadyClosed
  221. )
  222. }
  223. func testSendEndOfRequestStreamFromClosed() {
  224. self.doTestSendEndOfRequestStreamFromInvalidState(
  225. .clientClosedServerClosed,
  226. expected: .alreadyClosed
  227. )
  228. }
  229. }
  230. // MARK: - Receive Response Headers
  231. extension GRPCClientStateMachineTests {
  232. func doTestReceiveResponseHeadersFromInvalidState(
  233. _ state: StateMachine.State,
  234. expected: ReceiveResponseHeadError
  235. ) {
  236. var stateMachine = self.makeStateMachine(state)
  237. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  238. XCTAssertEqual($0, expected)
  239. }
  240. }
  241. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  242. var stateMachine = self.makeStateMachine(state)
  243. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  244. }
  245. func testReceiveResponseHeadersFromIdle() {
  246. self.doTestReceiveResponseHeadersFromInvalidState(
  247. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  248. expected: .invalidState
  249. )
  250. }
  251. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  252. self.doTestReceiveResponseHeadersFromValidState(
  253. .clientActiveServerIdle(
  254. writeState: .one(),
  255. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  256. )
  257. )
  258. }
  259. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  260. self.doTestReceiveResponseHeadersFromValidState(
  261. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  262. )
  263. }
  264. func testReceiveResponseHeadersFromActive() {
  265. self.doTestReceiveResponseHeadersFromInvalidState(
  266. .clientActiveServerActive(writeState: .one(), readState: .one()),
  267. expected: .invalidState
  268. )
  269. }
  270. func testReceiveResponseHeadersFromClientClosedServerActive() {
  271. self.doTestReceiveResponseHeadersFromInvalidState(
  272. .clientClosedServerActive(readState: .one()),
  273. expected: .invalidState
  274. )
  275. }
  276. func testReceiveResponseHeadersFromClosed() {
  277. self.doTestReceiveResponseHeadersFromInvalidState(
  278. .clientClosedServerClosed,
  279. expected: .invalidState
  280. )
  281. }
  282. }
  283. // MARK: - Receive Response
  284. extension GRPCClientStateMachineTests {
  285. func doTestReceiveResponseFromInvalidState(
  286. _ state: StateMachine.State,
  287. expected: MessageReadError
  288. ) throws {
  289. var stateMachine = self.makeStateMachine(state)
  290. let message = "Hello!"
  291. var buffer = try self.writeMessage(message)
  292. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  293. XCTAssertEqual($0, expected)
  294. }
  295. }
  296. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  297. var stateMachine = self.makeStateMachine(state)
  298. let message = "Hello!"
  299. var buffer = try self.writeMessage(message)
  300. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess { messages in
  301. XCTAssertEqual(messages, [ByteBuffer(string: message)])
  302. }
  303. }
  304. func testReceiveResponseFromIdle() throws {
  305. try self.doTestReceiveResponseFromInvalidState(
  306. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  307. expected: .invalidState
  308. )
  309. }
  310. func testReceiveResponseFromClientActiveServerIdle() throws {
  311. try self.doTestReceiveResponseFromInvalidState(
  312. .clientActiveServerIdle(
  313. writeState: .one(),
  314. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  315. ),
  316. expected: .invalidState
  317. )
  318. }
  319. func testReceiveResponseFromClientClosedServerIdle() throws {
  320. try self.doTestReceiveResponseFromInvalidState(
  321. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  322. expected: .invalidState
  323. )
  324. }
  325. func testReceiveResponseFromActive() throws {
  326. try self.doTestReceiveResponseFromValidState(
  327. .clientActiveServerActive(writeState: .one(), readState: .one())
  328. )
  329. }
  330. func testReceiveResponseFromClientClosedServerActive() throws {
  331. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  332. }
  333. func testReceiveResponseFromClosed() throws {
  334. try self.doTestReceiveResponseFromInvalidState(
  335. .clientClosedServerClosed,
  336. expected: .invalidState
  337. )
  338. }
  339. }
  340. // MARK: - Receive End of Response Stream
  341. extension GRPCClientStateMachineTests {
  342. func doTestReceiveEndOfResponseStreamFromInvalidState(
  343. _ state: StateMachine.State,
  344. expected: ReceiveEndOfResponseStreamError
  345. ) {
  346. var stateMachine = self.makeStateMachine(state)
  347. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  348. }
  349. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  350. var stateMachine = self.makeStateMachine(state)
  351. var trailers: HPACKHeaders = [
  352. GRPCHeaderName.statusCode: "0",
  353. GRPCHeaderName.statusMessage: "ok",
  354. ]
  355. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  356. // content-type to make a valid set of trailers.
  357. switch state {
  358. case .clientActiveServerIdle,
  359. .clientClosedServerIdle:
  360. trailers.add(name: ":status", value: "200")
  361. trailers.add(name: "content-type", value: "application/grpc+proto")
  362. default:
  363. break
  364. }
  365. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  366. XCTAssertEqual(status.code, .ok)
  367. XCTAssertEqual(status.message, "ok")
  368. }
  369. }
  370. func testReceiveEndOfResponseStreamFromIdle() {
  371. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  372. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  373. expected: .invalidState
  374. )
  375. }
  376. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  377. self.doTestReceiveEndOfResponseStreamFromValidState(
  378. .clientActiveServerIdle(
  379. writeState: .one(),
  380. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  381. )
  382. )
  383. }
  384. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  385. self.doTestReceiveEndOfResponseStreamFromValidState(
  386. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  387. )
  388. }
  389. func testReceiveEndOfResponseStreamFromActive() {
  390. self.doTestReceiveEndOfResponseStreamFromValidState(
  391. .clientActiveServerActive(writeState: .one(), readState: .one())
  392. )
  393. }
  394. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  395. self.doTestReceiveEndOfResponseStreamFromValidState(
  396. .clientClosedServerActive(readState: .one())
  397. )
  398. }
  399. func testReceiveEndOfResponseStreamFromClosed() {
  400. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  401. .clientClosedServerClosed,
  402. expected: .invalidState
  403. )
  404. }
  405. private func doTestReceiveEndStreamOnDataWhenActive(_ state: StateMachine.State) throws {
  406. var stateMachine = self.makeStateMachine(state)
  407. let status = try assertNotNil(stateMachine.receiveEndOfResponseStream())
  408. XCTAssertEqual(status.code, .internalError)
  409. }
  410. func testReceiveEndStreamOnDataClientActiveServerIdle() throws {
  411. try self.doTestReceiveEndStreamOnDataWhenActive(
  412. .clientActiveServerIdle(
  413. writeState: .one(),
  414. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  415. )
  416. )
  417. }
  418. func testReceiveEndStreamOnDataClientClosedServerIdle() throws {
  419. try self.doTestReceiveEndStreamOnDataWhenActive(
  420. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  421. )
  422. }
  423. func testReceiveEndStreamOnDataClientActiveServerActive() throws {
  424. try self.doTestReceiveEndStreamOnDataWhenActive(
  425. .clientActiveServerActive(writeState: .one(), readState: .one())
  426. )
  427. }
  428. func testReceiveEndStreamOnDataClientClosedServerActive() throws {
  429. try self.doTestReceiveEndStreamOnDataWhenActive(
  430. .clientClosedServerActive(readState: .one())
  431. )
  432. }
  433. func testReceiveEndStreamOnDataWhenClosed() {
  434. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  435. // Already closed, end stream is ignored.
  436. XCTAssertNil(stateMachine.receiveEndOfResponseStream())
  437. }
  438. }
  439. // MARK: - Basic RPC flow.
  440. extension GRPCClientStateMachineTests {
  441. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  442. var headers = HPACKHeaders()
  443. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  444. if let message = message {
  445. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  446. }
  447. return headers
  448. }
  449. func testSimpleUnaryFlow() throws {
  450. var stateMachine = self
  451. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  452. // Initiate the RPC
  453. stateMachine.sendRequestHeaders(requestHead: .init(
  454. method: "POST",
  455. scheme: "https",
  456. path: "/echo/Get",
  457. host: "foo",
  458. deadline: .distantFuture,
  459. customMetadata: [:],
  460. encoding: .disabled
  461. ), allocator: .init()).assertSuccess()
  462. // Receive acknowledgement.
  463. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  464. // Send a request.
  465. stateMachine.sendRequest(
  466. ByteBuffer(string: "Hello!"),
  467. compressed: false
  468. ).assertSuccess()
  469. // Close the request stream.
  470. stateMachine.sendEndOfRequestStream().assertSuccess()
  471. // Receive a response.
  472. var buffer = try self.writeMessage("Hello!")
  473. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  474. // Receive the status.
  475. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  476. }
  477. func testSimpleClientActiveFlow() throws {
  478. var stateMachine = self
  479. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  480. // Initiate the RPC
  481. stateMachine.sendRequestHeaders(requestHead: .init(
  482. method: "POST",
  483. scheme: "https",
  484. path: "/echo/Get",
  485. host: "foo",
  486. deadline: .distantFuture,
  487. customMetadata: [:],
  488. encoding: .disabled
  489. ), allocator: .init()).assertSuccess()
  490. // Receive acknowledgement.
  491. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  492. // Send some requests.
  493. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  494. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertSuccess()
  495. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false).assertSuccess()
  496. // Close the request stream.
  497. stateMachine.sendEndOfRequestStream().assertSuccess()
  498. // Receive a response.
  499. var buffer = try self.writeMessage("Hello!")
  500. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  501. // Receive the status.
  502. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  503. }
  504. func testSimpleServerActiveFlow() throws {
  505. var stateMachine = self
  506. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  507. // Initiate the RPC
  508. stateMachine.sendRequestHeaders(requestHead: .init(
  509. method: "POST",
  510. scheme: "https",
  511. path: "/echo/Get",
  512. host: "foo",
  513. deadline: .distantFuture,
  514. customMetadata: [:],
  515. encoding: .disabled
  516. ), allocator: .init()).assertSuccess()
  517. // Receive acknowledgement.
  518. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  519. // Send a request.
  520. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  521. // Close the request stream.
  522. stateMachine.sendEndOfRequestStream().assertSuccess()
  523. // Receive a response.
  524. var firstBuffer = try self.writeMessage("1")
  525. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  526. // Receive two responses in one buffer.
  527. var secondBuffer = try self.writeMessage("2")
  528. try self.writeMessage("3", into: &secondBuffer)
  529. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  530. // Receive the status.
  531. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  532. }
  533. func testSimpleBidirectionalActiveFlow() throws {
  534. var stateMachine = self
  535. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  536. // Initiate the RPC
  537. stateMachine.sendRequestHeaders(requestHead: .init(
  538. method: "POST",
  539. scheme: "https",
  540. path: "/echo/Get",
  541. host: "foo",
  542. deadline: .distantFuture,
  543. customMetadata: [:],
  544. encoding: .disabled
  545. ), allocator: .init()).assertSuccess()
  546. // Receive acknowledgement.
  547. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  548. // Interleave requests and responses:
  549. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  550. // Receive a response.
  551. var firstBuffer = try self.writeMessage("1")
  552. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  553. // Send two more requests.
  554. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertSuccess()
  555. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false).assertSuccess()
  556. // Receive two responses in one buffer.
  557. var secondBuffer = try self.writeMessage("2")
  558. try self.writeMessage("3", into: &secondBuffer)
  559. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  560. // Close the request stream.
  561. stateMachine.sendEndOfRequestStream().assertSuccess()
  562. // Receive the status.
  563. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  564. }
  565. }
  566. // MARK: - Too many requests / responses.
  567. extension GRPCClientStateMachineTests {
  568. func testSendTooManyRequestsFromClientActiveServerIdle() {
  569. for messageCount in [MessageArity.one, MessageArity.many] {
  570. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  571. writeState: .one(),
  572. pendingReadState: .init(arity: messageCount, messageEncoding: .disabled)
  573. ))
  574. // One is fine.
  575. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  576. // Two is not.
  577. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertFailure {
  578. XCTAssertEqual($0, .cardinalityViolation)
  579. }
  580. }
  581. }
  582. func testSendTooManyRequestsFromActive() {
  583. for readState in [ReadState.one(), ReadState.many()] {
  584. var stateMachine = self
  585. .makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  586. // One is fine.
  587. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  588. // Two is not.
  589. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertFailure {
  590. XCTAssertEqual($0, .cardinalityViolation)
  591. }
  592. }
  593. }
  594. func testSendTooManyRequestsFromClosed() {
  595. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  596. // No requests allowed!
  597. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertFailure {
  598. XCTAssertEqual($0, .cardinalityViolation)
  599. }
  600. }
  601. func testReceiveTooManyRequests() throws {
  602. for writeState in [WriteState.one(), WriteState.many()] {
  603. var stateMachine = self
  604. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  605. // One response is fine.
  606. var firstBuffer = try self.writeMessage("foo")
  607. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  608. var secondBuffer = try self.writeMessage("bar")
  609. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertFailure {
  610. XCTAssertEqual($0, .cardinalityViolation)
  611. }
  612. }
  613. }
  614. func testReceiveTooManyRequestsInOneBuffer() throws {
  615. for writeState in [WriteState.one(), WriteState.many()] {
  616. var stateMachine = self
  617. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  618. // Write two responses into a single buffer.
  619. var buffer = try self.writeMessage("foo")
  620. var other = try self.writeMessage("bar")
  621. buffer.writeBuffer(&other)
  622. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  623. XCTAssertEqual($0, .cardinalityViolation)
  624. }
  625. }
  626. }
  627. }
  628. // MARK: - Send Request Headers
  629. extension GRPCClientStateMachineTests {
  630. func testSendRequestHeaders() throws {
  631. var stateMachine = self
  632. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  633. stateMachine.sendRequestHeaders(requestHead: .init(
  634. method: "POST",
  635. scheme: "http",
  636. path: "/echo/Get",
  637. host: "localhost",
  638. deadline: .now() + .hours(1),
  639. customMetadata: ["x-grpc-id": "request-id"],
  640. encoding: .enabled(.init(
  641. forRequests: .identity,
  642. acceptableForResponses: [.identity],
  643. decompressionLimit: .ratio(10)
  644. ))
  645. ), allocator: .init()).assertSuccess { headers in
  646. XCTAssertEqual(headers[":method"], ["POST"])
  647. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  648. XCTAssertEqual(headers[":authority"], ["localhost"])
  649. XCTAssertEqual(headers[":scheme"], ["http"])
  650. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  651. XCTAssertEqual(headers["te"], ["trailers"])
  652. // We convert the deadline into a timeout, we can't be exactly sure what that timeout is.
  653. XCTAssertTrue(headers.contains(name: "grpc-timeout"))
  654. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  655. XCTAssertEqual(headers["grpc-encoding"], ["identity"])
  656. XCTAssertTrue(headers["grpc-accept-encoding"].contains("identity"))
  657. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  658. }
  659. }
  660. func testSendRequestHeadersNormalizesCustomMetadata() throws {
  661. // `HPACKHeaders` uses case-insensitive lookup for header names so we can't check the equality
  662. // for individual headers. We'll pull out the entries we care about by matching a sentinel value
  663. // and then compare `HPACKHeaders` instances (since the equality check *is* case sensitive).
  664. let filterKey = "a-key-for-filtering"
  665. let customMetadata: HPACKHeaders = [
  666. "partiallyLower": filterKey,
  667. "ALLUPPER": filterKey,
  668. ]
  669. var stateMachine = self
  670. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  671. stateMachine.sendRequestHeaders(requestHead: .init(
  672. method: "POST",
  673. scheme: "http",
  674. path: "/echo/Get",
  675. host: "localhost",
  676. deadline: .distantFuture,
  677. customMetadata: customMetadata,
  678. encoding: .disabled
  679. ), allocator: .init()).assertSuccess { headers in
  680. // Pull out the entries we care about by matching values
  681. let filtered = headers.filter { _, value, _ in
  682. value == filterKey
  683. }.map { name, value, _ in
  684. (name, value)
  685. }
  686. let justCustomMetadata = HPACKHeaders(filtered)
  687. let expected: HPACKHeaders = [
  688. "partiallylower": filterKey,
  689. "allupper": filterKey,
  690. ]
  691. XCTAssertEqual(justCustomMetadata, expected)
  692. }
  693. }
  694. func testSendRequestHeadersWithCustomUserAgent() throws {
  695. let customMetadata: HPACKHeaders = [
  696. "user-agent": "test-user-agent",
  697. ]
  698. var stateMachine = self
  699. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  700. stateMachine.sendRequestHeaders(requestHead: .init(
  701. method: "POST",
  702. scheme: "http",
  703. path: "/echo/Get",
  704. host: "localhost",
  705. deadline: .distantFuture,
  706. customMetadata: customMetadata,
  707. encoding: .enabled(.init(
  708. forRequests: nil,
  709. acceptableForResponses: [],
  710. decompressionLimit: .ratio(10)
  711. ))
  712. ), allocator: .init()).assertSuccess { headers in
  713. XCTAssertEqual(headers["user-agent"], ["test-user-agent"])
  714. }
  715. }
  716. func testSendRequestHeadersWithNoCompressionInEitherDirection() throws {
  717. var stateMachine = self
  718. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  719. stateMachine.sendRequestHeaders(requestHead: .init(
  720. method: "POST",
  721. scheme: "http",
  722. path: "/echo/Get",
  723. host: "localhost",
  724. deadline: .distantFuture,
  725. customMetadata: ["x-grpc-id": "request-id"],
  726. encoding: .enabled(.init(
  727. forRequests: nil,
  728. acceptableForResponses: [],
  729. decompressionLimit: .ratio(10)
  730. ))
  731. ), allocator: .init()).assertSuccess { headers in
  732. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  733. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  734. }
  735. }
  736. func testSendRequestHeadersWithNoCompressionForRequests() throws {
  737. var stateMachine = self
  738. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  739. stateMachine.sendRequestHeaders(requestHead: .init(
  740. method: "POST",
  741. scheme: "http",
  742. path: "/echo/Get",
  743. host: "localhost",
  744. deadline: .distantFuture,
  745. customMetadata: ["x-grpc-id": "request-id"],
  746. encoding: .enabled(.init(
  747. forRequests: nil,
  748. acceptableForResponses: [.identity, .gzip],
  749. decompressionLimit: .ratio(10)
  750. ))
  751. ), allocator: .init()).assertSuccess { headers in
  752. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  753. XCTAssertTrue(headers.contains(name: "grpc-accept-encoding"))
  754. }
  755. }
  756. func testSendRequestHeadersWithNoCompressionForResponses() throws {
  757. var stateMachine = self
  758. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  759. stateMachine.sendRequestHeaders(requestHead: .init(
  760. method: "POST",
  761. scheme: "http",
  762. path: "/echo/Get",
  763. host: "localhost",
  764. deadline: .distantFuture,
  765. customMetadata: ["x-grpc-id": "request-id"],
  766. encoding: .enabled(.init(
  767. forRequests: .gzip,
  768. acceptableForResponses: [],
  769. decompressionLimit: .ratio(10)
  770. ))
  771. ), allocator: .init()).assertSuccess { headers in
  772. XCTAssertEqual(headers["grpc-encoding"], ["gzip"])
  773. // This asymmetry is strange but allowed: if a client does not advertise support of the
  774. // compression it is using, the server may still process the message so long as it too
  775. // supports the compression.
  776. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  777. }
  778. }
  779. func testReceiveResponseHeadersWithOkStatus() throws {
  780. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  781. writeState: .one(),
  782. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  783. ))
  784. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  785. }
  786. func testReceiveResponseHeadersWithNotOkStatus() throws {
  787. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  788. writeState: .one(),
  789. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  790. ))
  791. let code = "\(HTTPResponseStatus.paymentRequired.code)"
  792. let headers = self.makeResponseHeaders(status: code)
  793. stateMachine.receiveResponseHeaders(headers).assertFailure {
  794. XCTAssertEqual($0, .invalidHTTPStatus(code))
  795. }
  796. }
  797. func testReceiveResponseHeadersWithoutContentType() throws {
  798. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  799. writeState: .one(),
  800. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  801. ))
  802. let headers = self.makeResponseHeaders(contentType: nil)
  803. stateMachine.receiveResponseHeaders(headers).assertFailure {
  804. XCTAssertEqual($0, .invalidContentType(nil))
  805. }
  806. }
  807. func testReceiveResponseHeadersWithInvalidContentType() throws {
  808. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  809. writeState: .one(),
  810. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  811. ))
  812. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  813. stateMachine.receiveResponseHeaders(headers).assertFailure {
  814. XCTAssertEqual($0, .invalidContentType("video/mpeg"))
  815. }
  816. }
  817. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  818. let configuration = ClientMessageEncoding.Configuration(
  819. forRequests: .none,
  820. acceptableForResponses: [.identity],
  821. decompressionLimit: .ratio(1)
  822. )
  823. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  824. writeState: .one(),
  825. pendingReadState: .init(arity: .one, messageEncoding: .enabled(configuration))
  826. ))
  827. var headers = self.makeResponseHeaders()
  828. // Identity should always be supported.
  829. headers.add(name: "grpc-encoding", value: "identity")
  830. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  831. switch stateMachine.state {
  832. case let .clientActiveServerActive(_, .reading(_, reader)):
  833. XCTAssertEqual(reader.compression?.algorithm, .identity)
  834. default:
  835. XCTFail("unexpected state \(stateMachine.state)")
  836. }
  837. }
  838. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  839. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  840. writeState: .one(),
  841. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  842. ))
  843. var headers = self.makeResponseHeaders()
  844. headers.add(name: "grpc-encoding", value: "snappy")
  845. stateMachine.receiveResponseHeaders(headers).assertFailure {
  846. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  847. }
  848. }
  849. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  850. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  851. writeState: .one(),
  852. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  853. ))
  854. var headers = self.makeResponseHeaders()
  855. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  856. stateMachine.receiveResponseHeaders(headers).assertFailure {
  857. XCTAssertEqual($0, .unsupportedMessageEncoding("not-a-known-compression-(probably)"))
  858. }
  859. }
  860. func testReceiveEndOfResponseStreamWithStatus() throws {
  861. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  862. let trailers: HPACKHeaders = ["grpc-status": "0"]
  863. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  864. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  865. XCTAssertEqual(status.message, nil)
  866. }
  867. }
  868. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  869. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  870. let trailers: HPACKHeaders = ["grpc-status": "999"]
  871. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  872. XCTAssertEqual(status.code, .unknown)
  873. }
  874. }
  875. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  876. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  877. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  878. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  879. XCTAssertEqual(status.code, .unknown)
  880. }
  881. }
  882. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  883. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  884. let trailers: HPACKHeaders = [
  885. "grpc-status": "5",
  886. "grpc-message": "foo bar 🚀",
  887. ]
  888. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  889. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  890. XCTAssertEqual(status.message, "foo bar 🚀")
  891. }
  892. }
  893. func testReceiveTrailersOnlyEndOfResponseStreamWithoutContentType() throws {
  894. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  895. writeState: .one(),
  896. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  897. ))
  898. let trailers: HPACKHeaders = [
  899. ":status": "200",
  900. "grpc-status": "5",
  901. "grpc-message": "foo bar 🚀",
  902. ]
  903. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  904. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  905. XCTAssertEqual(status.message, "foo bar 🚀")
  906. }
  907. }
  908. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidContentType() throws {
  909. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  910. writeState: .one(),
  911. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  912. ))
  913. let trailers: HPACKHeaders = [
  914. ":status": "200",
  915. "grpc-status": "5",
  916. "grpc-message": "foo bar 🚀",
  917. "content-type": "invalid",
  918. ]
  919. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  920. XCTAssertEqual(error, .invalidContentType("invalid"))
  921. }
  922. }
  923. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndValidGRPCStatus() throws {
  924. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  925. writeState: .one(),
  926. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  927. ))
  928. let trailers: HPACKHeaders = [
  929. ":status": "418",
  930. "grpc-status": "5",
  931. ]
  932. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  933. XCTAssertEqual(
  934. error,
  935. .invalidHTTPStatusWithGRPCStatus(GRPCStatus(
  936. code: GRPCStatus.Code(rawValue: 5)!,
  937. message: nil
  938. ))
  939. )
  940. }
  941. }
  942. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndNoGRPCStatus() throws {
  943. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  944. writeState: .one(),
  945. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  946. ))
  947. let trailers: HPACKHeaders = [":status": "418"]
  948. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  949. XCTAssertEqual(error, .invalidHTTPStatus("418"))
  950. }
  951. }
  952. }
  953. class ReadStateTests: GRPCTestCase {
  954. var allocator = ByteBufferAllocator()
  955. func testReadWhenNoExpectedMessages() {
  956. var state: ReadState = .notReading
  957. var buffer = self.allocator.buffer(capacity: 0)
  958. state.readMessages(&buffer, maxLength: .max).assertFailure {
  959. XCTAssertEqual($0, .cardinalityViolation)
  960. }
  961. state.assertNotReading()
  962. }
  963. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  964. // Write a message into the buffer:
  965. let message = ByteBuffer(string: "Hello!")
  966. var writer = LengthPrefixedMessageWriter(compression: .none)
  967. var buffers = try writer.write(buffer: message)
  968. XCTAssertNil(buffers.1)
  969. // And some extra junk bytes:
  970. let bytes: [UInt8] = [0x00]
  971. buffers.0.writeBytes(bytes)
  972. var state: ReadState = .one()
  973. state.readMessages(&buffers.0, maxLength: .max).assertFailure {
  974. XCTAssertEqual($0, .leftOverBytes)
  975. }
  976. state.assertNotReading()
  977. }
  978. func testReadTooManyMessagesForOneExpectedMessages() throws {
  979. // Write a message into the buffer twice:
  980. let message = ByteBuffer(string: "Hello!")
  981. var writer = LengthPrefixedMessageWriter(compression: .none)
  982. var buffers1 = try writer.write(buffer: message)
  983. var buffers2 = try writer.write(buffer: message)
  984. XCTAssertNil(buffers1.1)
  985. XCTAssertNil(buffers2.1)
  986. buffers1.0.writeBuffer(&buffers2.0)
  987. var state: ReadState = .one()
  988. state.readMessages(&buffers1.0, maxLength: .max).assertFailure {
  989. XCTAssertEqual($0, .cardinalityViolation)
  990. }
  991. state.assertNotReading()
  992. }
  993. func testReadOneMessageForOneExpectedMessages() throws {
  994. // Write a message into the buffer twice:
  995. let message = ByteBuffer(string: "Hello!")
  996. var writer = LengthPrefixedMessageWriter(compression: .none)
  997. var (buffer, other) = try writer.write(buffer: message)
  998. XCTAssertNil(other)
  999. var state: ReadState = .one()
  1000. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1001. XCTAssertEqual($0, [message])
  1002. }
  1003. // We shouldn't be able to read anymore.
  1004. state.assertNotReading()
  1005. }
  1006. func testReadOneMessageForManyExpectedMessages() throws {
  1007. // Write a message into the buffer twice:
  1008. let message = ByteBuffer(string: "Hello!")
  1009. var writer = LengthPrefixedMessageWriter(compression: .none)
  1010. var (buffer, other) = try writer.write(buffer: message)
  1011. XCTAssertNil(other)
  1012. var state: ReadState = .many()
  1013. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1014. XCTAssertEqual($0, [message])
  1015. }
  1016. // We should still be able to read.
  1017. state.assertReading()
  1018. }
  1019. func testReadManyMessagesForManyExpectedMessages() throws {
  1020. // Write a message into the buffer twice:
  1021. let message = ByteBuffer(string: "Hello!")
  1022. var writer = LengthPrefixedMessageWriter(compression: .none)
  1023. var (first, _) = try writer.write(buffer: message)
  1024. var (second, _) = try writer.write(buffer: message)
  1025. var (third, _) = try writer.write(buffer: message)
  1026. first.writeBuffer(&second)
  1027. first.writeBuffer(&third)
  1028. var state: ReadState = .many()
  1029. state.readMessages(&first, maxLength: .max).assertSuccess {
  1030. XCTAssertEqual($0, [message, message, message])
  1031. }
  1032. // We should still be able to read.
  1033. state.assertReading()
  1034. }
  1035. }
  1036. // MARK: Result helpers
  1037. extension Result {
  1038. /// Asserts the `Result` was a success.
  1039. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  1040. switch self {
  1041. case let .success(success):
  1042. do {
  1043. try verify(success)
  1044. } catch {
  1045. XCTFail("verify threw: \(error)")
  1046. }
  1047. case let .failure(error):
  1048. XCTFail("unexpected .failure: \(error)")
  1049. }
  1050. }
  1051. /// Asserts the `Result` was a failure.
  1052. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  1053. switch self {
  1054. case let .success(success):
  1055. XCTFail("unexpected .success: \(success)")
  1056. case let .failure(error):
  1057. do {
  1058. try verify(error)
  1059. } catch {
  1060. XCTFail("verify threw: \(error)")
  1061. }
  1062. }
  1063. }
  1064. }
  1065. // MARK: ReadState, PendingWriteState, and WriteState helpers
  1066. extension ReadState {
  1067. static func one() -> ReadState {
  1068. let reader = LengthPrefixedMessageReader()
  1069. return .reading(.one, reader)
  1070. }
  1071. static func many() -> ReadState {
  1072. let reader = LengthPrefixedMessageReader()
  1073. return .reading(.many, reader)
  1074. }
  1075. func assertReading() {
  1076. switch self {
  1077. case .reading:
  1078. ()
  1079. case .notReading:
  1080. XCTFail("unexpected state .notReading")
  1081. }
  1082. }
  1083. func assertNotReading() {
  1084. switch self {
  1085. case .reading:
  1086. XCTFail("unexpected state .reading")
  1087. case .notReading:
  1088. ()
  1089. }
  1090. }
  1091. }
  1092. extension PendingWriteState {
  1093. static func one() -> PendingWriteState {
  1094. return .init(arity: .one, contentType: .protobuf)
  1095. }
  1096. static func many() -> PendingWriteState {
  1097. return .init(arity: .many, contentType: .protobuf)
  1098. }
  1099. }
  1100. extension WriteState {
  1101. static func one() -> WriteState {
  1102. return .init(
  1103. arity: .one,
  1104. contentType: .protobuf,
  1105. writer: .init(compression: .none, allocator: .init())
  1106. )
  1107. }
  1108. static func many() -> WriteState {
  1109. return .init(
  1110. arity: .many,
  1111. contentType: .protobuf,
  1112. writer: .init(compression: .none, allocator: .init())
  1113. )
  1114. }
  1115. }