GRPCClientStateMachineTests.swift 42 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. import Foundation
  18. @testable import GRPC
  19. import Logging
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOHTTP1
  23. import SwiftProtobuf
  24. import XCTest
  25. class GRPCClientStateMachineTests: GRPCTestCase {
  26. typealias Request = Echo_EchoRequest
  27. typealias Response = Echo_EchoResponse
  28. typealias StateMachine = GRPCClientStateMachine
  29. var allocator = ByteBufferAllocator()
  30. func makeStateMachine(_ state: StateMachine.State) -> StateMachine {
  31. return StateMachine(state: state)
  32. }
  33. /// Writes a message into a new `ByteBuffer` (with length-prefixing).
  34. func writeMessage(_ message: String) throws -> ByteBuffer {
  35. let buffer = self.allocator.buffer(string: message)
  36. var writer = CoalescingLengthPrefixedMessageWriter(compression: .none, allocator: .init())
  37. writer.append(buffer: buffer, compress: false, promise: nil)
  38. var result: ByteBuffer?
  39. while let next = writer.next() {
  40. switch next.0 {
  41. case let .success(buffer):
  42. result.setOrWriteImmutableBuffer(buffer)
  43. case let .failure(error):
  44. throw error
  45. }
  46. }
  47. // We wrote a message, we must get at least one buffer out (or throw).
  48. return result!
  49. }
  50. /// Writes a message into the given `buffer`.
  51. func writeMessage(_ message: String, into buffer: inout ByteBuffer) throws {
  52. var other = try self.writeMessage(message)
  53. buffer.writeBuffer(&other)
  54. }
  55. /// Returns a minimally valid `HPACKHeaders` for a response.
  56. func makeResponseHeaders(
  57. status: String? = "200",
  58. contentType: String? = "application/grpc+proto"
  59. ) -> HPACKHeaders {
  60. var headers: HPACKHeaders = [:]
  61. status.map { headers.add(name: ":status", value: $0) }
  62. contentType.map { headers.add(name: "content-type", value: $0) }
  63. return headers
  64. }
  65. }
  66. // MARK: - Send Request Headers
  67. extension GRPCClientStateMachineTests {
  68. func doTestSendRequestHeadersFromInvalidState(_ state: StateMachine.State) {
  69. var stateMachine = self.makeStateMachine(state)
  70. stateMachine.sendRequestHeaders(requestHead: .init(
  71. method: "POST",
  72. scheme: "http",
  73. path: "/echo/Get",
  74. host: "host",
  75. deadline: .distantFuture,
  76. customMetadata: [:],
  77. encoding: .disabled
  78. ), allocator: .init()).assertFailure {
  79. XCTAssertEqual($0, .invalidState)
  80. }
  81. }
  82. func testSendRequestHeadersFromIdle() {
  83. var stateMachine = self
  84. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  85. stateMachine.sendRequestHeaders(requestHead: .init(
  86. method: "POST",
  87. scheme: "http",
  88. path: "/echo/Get",
  89. host: "host",
  90. deadline: .distantFuture,
  91. customMetadata: [:],
  92. encoding: .disabled
  93. ), allocator: .init()).assertSuccess()
  94. }
  95. func testSendRequestHeadersFromClientActiveServerIdle() {
  96. self.doTestSendRequestHeadersFromInvalidState(.clientActiveServerIdle(
  97. writeState: .one(),
  98. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  99. ))
  100. }
  101. func testSendRequestHeadersFromClientClosedServerIdle() {
  102. self
  103. .doTestSendRequestHeadersFromInvalidState(
  104. .clientClosedServerIdle(pendingReadState: .init(
  105. arity: .one,
  106. messageEncoding: .disabled
  107. ))
  108. )
  109. }
  110. func testSendRequestHeadersFromActive() {
  111. self
  112. .doTestSendRequestHeadersFromInvalidState(.clientActiveServerActive(
  113. writeState: .one(),
  114. readState: .one()
  115. ))
  116. }
  117. func testSendRequestHeadersFromClientClosedServerActive() {
  118. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerActive(readState: .one()))
  119. }
  120. func testSendRequestHeadersFromClosed() {
  121. self.doTestSendRequestHeadersFromInvalidState(.clientClosedServerClosed)
  122. }
  123. }
  124. // MARK: - Send Request
  125. extension GRPCClientStateMachineTests {
  126. func doTestSendRequestFromInvalidState(_ state: StateMachine.State, expected: MessageWriteError) {
  127. var stateMachine = self.makeStateMachine(state)
  128. stateMachine.sendRequest(
  129. ByteBuffer(string: "Hello!"),
  130. compressed: false
  131. ).assertFailure {
  132. XCTAssertEqual($0, expected)
  133. }
  134. }
  135. func doTestSendRequestFromValidState(_ state: StateMachine.State) {
  136. var stateMachine = self.makeStateMachine(state)
  137. let request = "Hello!"
  138. stateMachine.sendRequest(
  139. ByteBuffer(string: request),
  140. compressed: false
  141. ).assertSuccess()
  142. }
  143. func testSendRequestFromIdle() {
  144. self.doTestSendRequestFromInvalidState(
  145. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  146. expected: .invalidState
  147. )
  148. }
  149. func testSendRequestFromClientActiveServerIdle() {
  150. self.doTestSendRequestFromValidState(.clientActiveServerIdle(
  151. writeState: .one(),
  152. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  153. ))
  154. }
  155. func testSendRequestFromClientClosedServerIdle() {
  156. self.doTestSendRequestFromInvalidState(
  157. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  158. expected: .cardinalityViolation
  159. )
  160. }
  161. func testSendRequestFromActive() {
  162. self
  163. .doTestSendRequestFromValidState(.clientActiveServerActive(
  164. writeState: .one(),
  165. readState: .one()
  166. ))
  167. }
  168. func testSendRequestFromClientClosedServerActive() {
  169. self.doTestSendRequestFromInvalidState(
  170. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  171. expected: .cardinalityViolation
  172. )
  173. }
  174. func testSendRequestFromClosed() {
  175. self.doTestSendRequestFromInvalidState(
  176. .clientClosedServerClosed,
  177. expected: .cardinalityViolation
  178. )
  179. }
  180. }
  181. // MARK: - Send End of Request Stream
  182. extension GRPCClientStateMachineTests {
  183. func doTestSendEndOfRequestStreamFromInvalidState(
  184. _ state: StateMachine.State,
  185. expected: SendEndOfRequestStreamError
  186. ) {
  187. var stateMachine = self.makeStateMachine(state)
  188. stateMachine.sendEndOfRequestStream().assertFailure {
  189. XCTAssertEqual($0, expected)
  190. }
  191. }
  192. func doTestSendEndOfRequestStreamFromValidState(_ state: StateMachine.State) {
  193. var stateMachine = self.makeStateMachine(state)
  194. stateMachine.sendEndOfRequestStream().assertSuccess()
  195. }
  196. func testSendEndOfRequestStreamFromIdle() {
  197. self.doTestSendEndOfRequestStreamFromInvalidState(
  198. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  199. expected: .invalidState
  200. )
  201. }
  202. func testSendEndOfRequestStreamFromClientActiveServerIdle() {
  203. self.doTestSendEndOfRequestStreamFromValidState(
  204. .clientActiveServerIdle(
  205. writeState: .one(),
  206. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  207. )
  208. )
  209. }
  210. func testSendEndOfRequestStreamFromClientClosedServerIdle() {
  211. self.doTestSendEndOfRequestStreamFromInvalidState(
  212. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  213. expected: .alreadyClosed
  214. )
  215. }
  216. func testSendEndOfRequestStreamFromActive() {
  217. self.doTestSendEndOfRequestStreamFromValidState(
  218. .clientActiveServerActive(writeState: .one(), readState: .one())
  219. )
  220. }
  221. func testSendEndOfRequestStreamFromClientClosedServerActive() {
  222. self.doTestSendEndOfRequestStreamFromInvalidState(
  223. .clientClosedServerActive(readState: .one()),
  224. expected: .alreadyClosed
  225. )
  226. }
  227. func testSendEndOfRequestStreamFromClosed() {
  228. self.doTestSendEndOfRequestStreamFromInvalidState(
  229. .clientClosedServerClosed,
  230. expected: .alreadyClosed
  231. )
  232. }
  233. }
  234. // MARK: - Receive Response Headers
  235. extension GRPCClientStateMachineTests {
  236. func doTestReceiveResponseHeadersFromInvalidState(
  237. _ state: StateMachine.State,
  238. expected: ReceiveResponseHeadError
  239. ) {
  240. var stateMachine = self.makeStateMachine(state)
  241. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertFailure {
  242. XCTAssertEqual($0, expected)
  243. }
  244. }
  245. func doTestReceiveResponseHeadersFromValidState(_ state: StateMachine.State) {
  246. var stateMachine = self.makeStateMachine(state)
  247. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  248. }
  249. func testReceiveResponseHeadersFromIdle() {
  250. self.doTestReceiveResponseHeadersFromInvalidState(
  251. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  252. expected: .invalidState
  253. )
  254. }
  255. func testReceiveResponseHeadersFromClientActiveServerIdle() {
  256. self.doTestReceiveResponseHeadersFromValidState(
  257. .clientActiveServerIdle(
  258. writeState: .one(),
  259. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  260. )
  261. )
  262. }
  263. func testReceiveResponseHeadersFromClientClosedServerIdle() {
  264. self.doTestReceiveResponseHeadersFromValidState(
  265. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  266. )
  267. }
  268. func testReceiveResponseHeadersFromActive() {
  269. self.doTestReceiveResponseHeadersFromInvalidState(
  270. .clientActiveServerActive(writeState: .one(), readState: .one()),
  271. expected: .invalidState
  272. )
  273. }
  274. func testReceiveResponseHeadersFromClientClosedServerActive() {
  275. self.doTestReceiveResponseHeadersFromInvalidState(
  276. .clientClosedServerActive(readState: .one()),
  277. expected: .invalidState
  278. )
  279. }
  280. func testReceiveResponseHeadersFromClosed() {
  281. self.doTestReceiveResponseHeadersFromInvalidState(
  282. .clientClosedServerClosed,
  283. expected: .invalidState
  284. )
  285. }
  286. }
  287. // MARK: - Receive Response
  288. extension GRPCClientStateMachineTests {
  289. func doTestReceiveResponseFromInvalidState(
  290. _ state: StateMachine.State,
  291. expected: MessageReadError
  292. ) throws {
  293. var stateMachine = self.makeStateMachine(state)
  294. let message = "Hello!"
  295. var buffer = try self.writeMessage(message)
  296. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  297. XCTAssertEqual($0, expected)
  298. }
  299. }
  300. func doTestReceiveResponseFromValidState(_ state: StateMachine.State) throws {
  301. var stateMachine = self.makeStateMachine(state)
  302. let message = "Hello!"
  303. var buffer = try self.writeMessage(message)
  304. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess { messages in
  305. XCTAssertEqual(messages, [ByteBuffer(string: message)])
  306. }
  307. }
  308. func testReceiveResponseFromIdle() throws {
  309. try self.doTestReceiveResponseFromInvalidState(
  310. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  311. expected: .invalidState
  312. )
  313. }
  314. func testReceiveResponseFromClientActiveServerIdle() throws {
  315. try self.doTestReceiveResponseFromInvalidState(
  316. .clientActiveServerIdle(
  317. writeState: .one(),
  318. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  319. ),
  320. expected: .invalidState
  321. )
  322. }
  323. func testReceiveResponseFromClientClosedServerIdle() throws {
  324. try self.doTestReceiveResponseFromInvalidState(
  325. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled)),
  326. expected: .invalidState
  327. )
  328. }
  329. func testReceiveResponseFromActive() throws {
  330. try self.doTestReceiveResponseFromValidState(
  331. .clientActiveServerActive(writeState: .one(), readState: .one())
  332. )
  333. }
  334. func testReceiveResponseFromClientClosedServerActive() throws {
  335. try self.doTestReceiveResponseFromValidState(.clientClosedServerActive(readState: .one()))
  336. }
  337. func testReceiveResponseFromClosed() throws {
  338. try self.doTestReceiveResponseFromInvalidState(
  339. .clientClosedServerClosed,
  340. expected: .invalidState
  341. )
  342. }
  343. }
  344. // MARK: - Receive End of Response Stream
  345. extension GRPCClientStateMachineTests {
  346. func doTestReceiveEndOfResponseStreamFromInvalidState(
  347. _ state: StateMachine.State,
  348. expected: ReceiveEndOfResponseStreamError
  349. ) {
  350. var stateMachine = self.makeStateMachine(state)
  351. stateMachine.receiveEndOfResponseStream(.init()).assertFailure()
  352. }
  353. func doTestReceiveEndOfResponseStreamFromValidState(_ state: StateMachine.State) {
  354. var stateMachine = self.makeStateMachine(state)
  355. var trailers: HPACKHeaders = [
  356. GRPCHeaderName.statusCode: "0",
  357. GRPCHeaderName.statusMessage: "ok",
  358. ]
  359. // When the server is idle it's a "Trailers-Only" response, we need the :status and
  360. // content-type to make a valid set of trailers.
  361. switch state {
  362. case .clientActiveServerIdle,
  363. .clientClosedServerIdle:
  364. trailers.add(name: ":status", value: "200")
  365. trailers.add(name: "content-type", value: "application/grpc+proto")
  366. default:
  367. break
  368. }
  369. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  370. XCTAssertEqual(status.code, .ok)
  371. XCTAssertEqual(status.message, "ok")
  372. }
  373. }
  374. func testReceiveEndOfResponseStreamFromIdle() {
  375. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  376. .clientIdleServerIdle(pendingWriteState: .one(), readArity: .one),
  377. expected: .invalidState
  378. )
  379. }
  380. func testReceiveEndOfResponseStreamFromClientActiveServerIdle() {
  381. self.doTestReceiveEndOfResponseStreamFromValidState(
  382. .clientActiveServerIdle(
  383. writeState: .one(),
  384. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  385. )
  386. )
  387. }
  388. func testReceiveEndOfResponseStreamFromClientClosedServerIdle() {
  389. self.doTestReceiveEndOfResponseStreamFromValidState(
  390. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  391. )
  392. }
  393. func testReceiveEndOfResponseStreamFromActive() {
  394. self.doTestReceiveEndOfResponseStreamFromValidState(
  395. .clientActiveServerActive(writeState: .one(), readState: .one())
  396. )
  397. }
  398. func testReceiveEndOfResponseStreamFromClientClosedServerActive() {
  399. self.doTestReceiveEndOfResponseStreamFromValidState(
  400. .clientClosedServerActive(readState: .one())
  401. )
  402. }
  403. func testReceiveEndOfResponseStreamFromClosed() {
  404. self.doTestReceiveEndOfResponseStreamFromInvalidState(
  405. .clientClosedServerClosed,
  406. expected: .invalidState
  407. )
  408. }
  409. private func doTestReceiveEndStreamOnDataWhenActive(_ state: StateMachine.State) throws {
  410. var stateMachine = self.makeStateMachine(state)
  411. let status = try assertNotNil(stateMachine.receiveEndOfResponseStream())
  412. XCTAssertEqual(status.code, .internalError)
  413. }
  414. func testReceiveEndStreamOnDataClientActiveServerIdle() throws {
  415. try self.doTestReceiveEndStreamOnDataWhenActive(
  416. .clientActiveServerIdle(
  417. writeState: .one(),
  418. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  419. )
  420. )
  421. }
  422. func testReceiveEndStreamOnDataClientClosedServerIdle() throws {
  423. try self.doTestReceiveEndStreamOnDataWhenActive(
  424. .clientClosedServerIdle(pendingReadState: .init(arity: .one, messageEncoding: .disabled))
  425. )
  426. }
  427. func testReceiveEndStreamOnDataClientActiveServerActive() throws {
  428. try self.doTestReceiveEndStreamOnDataWhenActive(
  429. .clientActiveServerActive(writeState: .one(), readState: .one())
  430. )
  431. }
  432. func testReceiveEndStreamOnDataClientClosedServerActive() throws {
  433. try self.doTestReceiveEndStreamOnDataWhenActive(
  434. .clientClosedServerActive(readState: .one())
  435. )
  436. }
  437. func testReceiveEndStreamOnDataWhenClosed() {
  438. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  439. // Already closed, end stream is ignored.
  440. XCTAssertNil(stateMachine.receiveEndOfResponseStream())
  441. }
  442. }
  443. // MARK: - Basic RPC flow.
  444. extension GRPCClientStateMachineTests {
  445. func makeTrailers(status: GRPCStatus.Code, message: String? = nil) -> HPACKHeaders {
  446. var headers = HPACKHeaders()
  447. headers.add(name: GRPCHeaderName.statusCode, value: "\(status.rawValue)")
  448. if let message = message {
  449. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  450. }
  451. return headers
  452. }
  453. func testSimpleUnaryFlow() throws {
  454. var stateMachine = self
  455. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  456. // Initiate the RPC
  457. stateMachine.sendRequestHeaders(requestHead: .init(
  458. method: "POST",
  459. scheme: "https",
  460. path: "/echo/Get",
  461. host: "foo",
  462. deadline: .distantFuture,
  463. customMetadata: [:],
  464. encoding: .disabled
  465. ), allocator: .init()).assertSuccess()
  466. // Receive acknowledgement.
  467. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  468. // Send a request.
  469. stateMachine.sendRequest(
  470. ByteBuffer(string: "Hello!"),
  471. compressed: false
  472. ).assertSuccess()
  473. // Close the request stream.
  474. stateMachine.sendEndOfRequestStream().assertSuccess()
  475. // Receive a response.
  476. var buffer = try self.writeMessage("Hello!")
  477. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  478. // Receive the status.
  479. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  480. }
  481. func testSimpleClientActiveFlow() throws {
  482. var stateMachine = self
  483. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .one))
  484. // Initiate the RPC
  485. stateMachine.sendRequestHeaders(requestHead: .init(
  486. method: "POST",
  487. scheme: "https",
  488. path: "/echo/Get",
  489. host: "foo",
  490. deadline: .distantFuture,
  491. customMetadata: [:],
  492. encoding: .disabled
  493. ), allocator: .init()).assertSuccess()
  494. // Receive acknowledgement.
  495. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  496. // Send some requests.
  497. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  498. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertSuccess()
  499. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false).assertSuccess()
  500. // Close the request stream.
  501. stateMachine.sendEndOfRequestStream().assertSuccess()
  502. // Receive a response.
  503. var buffer = try self.writeMessage("Hello!")
  504. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertSuccess()
  505. // Receive the status.
  506. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  507. }
  508. func testSimpleServerActiveFlow() throws {
  509. var stateMachine = self
  510. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .many))
  511. // Initiate the RPC
  512. stateMachine.sendRequestHeaders(requestHead: .init(
  513. method: "POST",
  514. scheme: "https",
  515. path: "/echo/Get",
  516. host: "foo",
  517. deadline: .distantFuture,
  518. customMetadata: [:],
  519. encoding: .disabled
  520. ), allocator: .init()).assertSuccess()
  521. // Receive acknowledgement.
  522. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  523. // Send a request.
  524. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  525. // Close the request stream.
  526. stateMachine.sendEndOfRequestStream().assertSuccess()
  527. // Receive a response.
  528. var firstBuffer = try self.writeMessage("1")
  529. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  530. // Receive two responses in one buffer.
  531. var secondBuffer = try self.writeMessage("2")
  532. try self.writeMessage("3", into: &secondBuffer)
  533. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  534. // Receive the status.
  535. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  536. }
  537. func testSimpleBidirectionalActiveFlow() throws {
  538. var stateMachine = self
  539. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .many(), readArity: .many))
  540. // Initiate the RPC
  541. stateMachine.sendRequestHeaders(requestHead: .init(
  542. method: "POST",
  543. scheme: "https",
  544. path: "/echo/Get",
  545. host: "foo",
  546. deadline: .distantFuture,
  547. customMetadata: [:],
  548. encoding: .disabled
  549. ), allocator: .init()).assertSuccess()
  550. // Receive acknowledgement.
  551. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  552. // Interleave requests and responses:
  553. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  554. // Receive a response.
  555. var firstBuffer = try self.writeMessage("1")
  556. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  557. // Send two more requests.
  558. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertSuccess()
  559. stateMachine.sendRequest(ByteBuffer(string: "3"), compressed: false).assertSuccess()
  560. // Receive two responses in one buffer.
  561. var secondBuffer = try self.writeMessage("2")
  562. try self.writeMessage("3", into: &secondBuffer)
  563. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertSuccess()
  564. // Close the request stream.
  565. stateMachine.sendEndOfRequestStream().assertSuccess()
  566. // Receive the status.
  567. stateMachine.receiveEndOfResponseStream(self.makeTrailers(status: .ok)).assertSuccess()
  568. }
  569. }
  570. // MARK: - Too many requests / responses.
  571. extension GRPCClientStateMachineTests {
  572. func testSendTooManyRequestsFromClientActiveServerIdle() {
  573. for messageCount in [MessageArity.one, MessageArity.many] {
  574. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  575. writeState: .one(),
  576. pendingReadState: .init(arity: messageCount, messageEncoding: .disabled)
  577. ))
  578. // One is fine.
  579. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  580. // Two is not.
  581. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertFailure {
  582. XCTAssertEqual($0, .cardinalityViolation)
  583. }
  584. }
  585. }
  586. func testSendTooManyRequestsFromActive() {
  587. for readState in [ReadState.one(), ReadState.many()] {
  588. var stateMachine = self
  589. .makeStateMachine(.clientActiveServerActive(writeState: .one(), readState: readState))
  590. // One is fine.
  591. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertSuccess()
  592. // Two is not.
  593. stateMachine.sendRequest(ByteBuffer(string: "2"), compressed: false).assertFailure {
  594. XCTAssertEqual($0, .cardinalityViolation)
  595. }
  596. }
  597. }
  598. func testSendTooManyRequestsFromClosed() {
  599. var stateMachine = self.makeStateMachine(.clientClosedServerClosed)
  600. // No requests allowed!
  601. stateMachine.sendRequest(ByteBuffer(string: "1"), compressed: false).assertFailure {
  602. XCTAssertEqual($0, .cardinalityViolation)
  603. }
  604. }
  605. func testReceiveTooManyRequests() throws {
  606. for writeState in [WriteState.one(), WriteState.many()] {
  607. var stateMachine = self
  608. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  609. // One response is fine.
  610. var firstBuffer = try self.writeMessage("foo")
  611. stateMachine.receiveResponseBuffer(&firstBuffer, maxMessageLength: .max).assertSuccess()
  612. var secondBuffer = try self.writeMessage("bar")
  613. stateMachine.receiveResponseBuffer(&secondBuffer, maxMessageLength: .max).assertFailure {
  614. XCTAssertEqual($0, .cardinalityViolation)
  615. }
  616. }
  617. }
  618. func testReceiveTooManyRequestsInOneBuffer() throws {
  619. for writeState in [WriteState.one(), WriteState.many()] {
  620. var stateMachine = self
  621. .makeStateMachine(.clientActiveServerActive(writeState: writeState, readState: .one()))
  622. // Write two responses into a single buffer.
  623. var buffer = try self.writeMessage("foo")
  624. var other = try self.writeMessage("bar")
  625. buffer.writeBuffer(&other)
  626. stateMachine.receiveResponseBuffer(&buffer, maxMessageLength: .max).assertFailure {
  627. XCTAssertEqual($0, .cardinalityViolation)
  628. }
  629. }
  630. }
  631. }
  632. // MARK: - Send Request Headers
  633. extension GRPCClientStateMachineTests {
  634. func testSendRequestHeaders() throws {
  635. var stateMachine = self
  636. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  637. stateMachine.sendRequestHeaders(requestHead: .init(
  638. method: "POST",
  639. scheme: "http",
  640. path: "/echo/Get",
  641. host: "localhost",
  642. deadline: .now() + .hours(1),
  643. customMetadata: ["x-grpc-id": "request-id"],
  644. encoding: .enabled(.init(
  645. forRequests: .identity,
  646. acceptableForResponses: [.identity],
  647. decompressionLimit: .ratio(10)
  648. ))
  649. ), allocator: .init()).assertSuccess { headers in
  650. XCTAssertEqual(headers[":method"], ["POST"])
  651. XCTAssertEqual(headers[":path"], ["/echo/Get"])
  652. XCTAssertEqual(headers[":authority"], ["localhost"])
  653. XCTAssertEqual(headers[":scheme"], ["http"])
  654. XCTAssertEqual(headers["content-type"], ["application/grpc"])
  655. XCTAssertEqual(headers["te"], ["trailers"])
  656. // We convert the deadline into a timeout, we can't be exactly sure what that timeout is.
  657. XCTAssertTrue(headers.contains(name: "grpc-timeout"))
  658. XCTAssertEqual(headers["x-grpc-id"], ["request-id"])
  659. XCTAssertEqual(headers["grpc-encoding"], ["identity"])
  660. XCTAssertTrue(headers["grpc-accept-encoding"].contains("identity"))
  661. XCTAssertTrue(headers["user-agent"].first?.starts(with: "grpc-swift") ?? false)
  662. }
  663. }
  664. func testSendRequestHeadersNormalizesCustomMetadata() throws {
  665. // `HPACKHeaders` uses case-insensitive lookup for header names so we can't check the equality
  666. // for individual headers. We'll pull out the entries we care about by matching a sentinel value
  667. // and then compare `HPACKHeaders` instances (since the equality check *is* case sensitive).
  668. let filterKey = "a-key-for-filtering"
  669. let customMetadata: HPACKHeaders = [
  670. "partiallyLower": filterKey,
  671. "ALLUPPER": filterKey,
  672. ]
  673. var stateMachine = self
  674. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  675. stateMachine.sendRequestHeaders(requestHead: .init(
  676. method: "POST",
  677. scheme: "http",
  678. path: "/echo/Get",
  679. host: "localhost",
  680. deadline: .distantFuture,
  681. customMetadata: customMetadata,
  682. encoding: .disabled
  683. ), allocator: .init()).assertSuccess { headers in
  684. // Pull out the entries we care about by matching values
  685. let filtered = headers.filter { _, value, _ in
  686. value == filterKey
  687. }.map { name, value, _ in
  688. (name, value)
  689. }
  690. let justCustomMetadata = HPACKHeaders(filtered)
  691. let expected: HPACKHeaders = [
  692. "partiallylower": filterKey,
  693. "allupper": filterKey,
  694. ]
  695. XCTAssertEqual(justCustomMetadata, expected)
  696. }
  697. }
  698. func testSendRequestHeadersWithCustomUserAgent() throws {
  699. let customMetadata: HPACKHeaders = [
  700. "user-agent": "test-user-agent",
  701. ]
  702. var stateMachine = self
  703. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  704. stateMachine.sendRequestHeaders(requestHead: .init(
  705. method: "POST",
  706. scheme: "http",
  707. path: "/echo/Get",
  708. host: "localhost",
  709. deadline: .distantFuture,
  710. customMetadata: customMetadata,
  711. encoding: .enabled(.init(
  712. forRequests: nil,
  713. acceptableForResponses: [],
  714. decompressionLimit: .ratio(10)
  715. ))
  716. ), allocator: .init()).assertSuccess { headers in
  717. XCTAssertEqual(headers["user-agent"], ["test-user-agent"])
  718. }
  719. }
  720. func testSendRequestHeadersWithNoCompressionInEitherDirection() throws {
  721. var stateMachine = self
  722. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  723. stateMachine.sendRequestHeaders(requestHead: .init(
  724. method: "POST",
  725. scheme: "http",
  726. path: "/echo/Get",
  727. host: "localhost",
  728. deadline: .distantFuture,
  729. customMetadata: ["x-grpc-id": "request-id"],
  730. encoding: .enabled(.init(
  731. forRequests: nil,
  732. acceptableForResponses: [],
  733. decompressionLimit: .ratio(10)
  734. ))
  735. ), allocator: .init()).assertSuccess { headers in
  736. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  737. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  738. }
  739. }
  740. func testSendRequestHeadersWithNoCompressionForRequests() throws {
  741. var stateMachine = self
  742. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  743. stateMachine.sendRequestHeaders(requestHead: .init(
  744. method: "POST",
  745. scheme: "http",
  746. path: "/echo/Get",
  747. host: "localhost",
  748. deadline: .distantFuture,
  749. customMetadata: ["x-grpc-id": "request-id"],
  750. encoding: .enabled(.init(
  751. forRequests: nil,
  752. acceptableForResponses: [.identity, .gzip],
  753. decompressionLimit: .ratio(10)
  754. ))
  755. ), allocator: .init()).assertSuccess { headers in
  756. XCTAssertFalse(headers.contains(name: "grpc-encoding"))
  757. XCTAssertTrue(headers.contains(name: "grpc-accept-encoding"))
  758. }
  759. }
  760. func testSendRequestHeadersWithNoCompressionForResponses() throws {
  761. var stateMachine = self
  762. .makeStateMachine(.clientIdleServerIdle(pendingWriteState: .one(), readArity: .one))
  763. stateMachine.sendRequestHeaders(requestHead: .init(
  764. method: "POST",
  765. scheme: "http",
  766. path: "/echo/Get",
  767. host: "localhost",
  768. deadline: .distantFuture,
  769. customMetadata: ["x-grpc-id": "request-id"],
  770. encoding: .enabled(.init(
  771. forRequests: .gzip,
  772. acceptableForResponses: [],
  773. decompressionLimit: .ratio(10)
  774. ))
  775. ), allocator: .init()).assertSuccess { headers in
  776. XCTAssertEqual(headers["grpc-encoding"], ["gzip"])
  777. // This asymmetry is strange but allowed: if a client does not advertise support of the
  778. // compression it is using, the server may still process the message so long as it too
  779. // supports the compression.
  780. XCTAssertFalse(headers.contains(name: "grpc-accept-encoding"))
  781. }
  782. }
  783. func testReceiveResponseHeadersWithOkStatus() throws {
  784. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  785. writeState: .one(),
  786. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  787. ))
  788. stateMachine.receiveResponseHeaders(self.makeResponseHeaders()).assertSuccess()
  789. }
  790. func testReceiveResponseHeadersWithNotOkStatus() throws {
  791. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  792. writeState: .one(),
  793. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  794. ))
  795. let code = "\(HTTPResponseStatus.paymentRequired.code)"
  796. let headers = self.makeResponseHeaders(status: code)
  797. stateMachine.receiveResponseHeaders(headers).assertFailure {
  798. XCTAssertEqual($0, .invalidHTTPStatus(code))
  799. }
  800. }
  801. func testReceiveResponseHeadersWithoutContentType() throws {
  802. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  803. writeState: .one(),
  804. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  805. ))
  806. let headers = self.makeResponseHeaders(contentType: nil)
  807. stateMachine.receiveResponseHeaders(headers).assertFailure {
  808. XCTAssertEqual($0, .invalidContentType(nil))
  809. }
  810. }
  811. func testReceiveResponseHeadersWithInvalidContentType() throws {
  812. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  813. writeState: .one(),
  814. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  815. ))
  816. let headers = self.makeResponseHeaders(contentType: "video/mpeg")
  817. stateMachine.receiveResponseHeaders(headers).assertFailure {
  818. XCTAssertEqual($0, .invalidContentType("video/mpeg"))
  819. }
  820. }
  821. func testReceiveResponseHeadersWithSupportedCompressionMechanism() throws {
  822. let configuration = ClientMessageEncoding.Configuration(
  823. forRequests: .none,
  824. acceptableForResponses: [.identity],
  825. decompressionLimit: .ratio(1)
  826. )
  827. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  828. writeState: .one(),
  829. pendingReadState: .init(arity: .one, messageEncoding: .enabled(configuration))
  830. ))
  831. var headers = self.makeResponseHeaders()
  832. // Identity should always be supported.
  833. headers.add(name: "grpc-encoding", value: "identity")
  834. stateMachine.receiveResponseHeaders(headers).assertSuccess()
  835. switch stateMachine.state {
  836. case let .clientActiveServerActive(_, .reading(_, reader)):
  837. XCTAssertEqual(reader.compression?.algorithm, .identity)
  838. default:
  839. XCTFail("unexpected state \(stateMachine.state)")
  840. }
  841. }
  842. func testReceiveResponseHeadersWithUnsupportedCompressionMechanism() throws {
  843. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  844. writeState: .one(),
  845. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  846. ))
  847. var headers = self.makeResponseHeaders()
  848. headers.add(name: "grpc-encoding", value: "snappy")
  849. stateMachine.receiveResponseHeaders(headers).assertFailure {
  850. XCTAssertEqual($0, .unsupportedMessageEncoding("snappy"))
  851. }
  852. }
  853. func testReceiveResponseHeadersWithUnknownCompressionMechanism() throws {
  854. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  855. writeState: .one(),
  856. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  857. ))
  858. var headers = self.makeResponseHeaders()
  859. headers.add(name: "grpc-encoding", value: "not-a-known-compression-(probably)")
  860. stateMachine.receiveResponseHeaders(headers).assertFailure {
  861. XCTAssertEqual($0, .unsupportedMessageEncoding("not-a-known-compression-(probably)"))
  862. }
  863. }
  864. func testReceiveEndOfResponseStreamWithStatus() throws {
  865. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  866. let trailers: HPACKHeaders = ["grpc-status": "0"]
  867. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  868. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 0))
  869. XCTAssertEqual(status.message, nil)
  870. }
  871. }
  872. func testReceiveEndOfResponseStreamWithUnknownStatus() throws {
  873. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  874. let trailers: HPACKHeaders = ["grpc-status": "999"]
  875. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  876. XCTAssertEqual(status.code, .unknown)
  877. }
  878. }
  879. func testReceiveEndOfResponseStreamWithNonIntStatus() throws {
  880. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  881. let trailers: HPACKHeaders = ["grpc-status": "not-a-real-status-code"]
  882. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  883. XCTAssertEqual(status.code, .unknown)
  884. }
  885. }
  886. func testReceiveEndOfResponseStreamWithStatusAndMessage() throws {
  887. var stateMachine = self.makeStateMachine(.clientClosedServerActive(readState: .one()))
  888. let trailers: HPACKHeaders = [
  889. "grpc-status": "5",
  890. "grpc-message": "foo bar 🚀",
  891. ]
  892. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  893. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  894. XCTAssertEqual(status.message, "foo bar 🚀")
  895. }
  896. }
  897. func testReceiveTrailersOnlyEndOfResponseStreamWithoutContentType() throws {
  898. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  899. writeState: .one(),
  900. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  901. ))
  902. let trailers: HPACKHeaders = [
  903. ":status": "200",
  904. "grpc-status": "5",
  905. "grpc-message": "foo bar 🚀",
  906. ]
  907. stateMachine.receiveEndOfResponseStream(trailers).assertSuccess { status in
  908. XCTAssertEqual(status.code, GRPCStatus.Code(rawValue: 5))
  909. XCTAssertEqual(status.message, "foo bar 🚀")
  910. }
  911. }
  912. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidContentType() throws {
  913. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  914. writeState: .one(),
  915. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  916. ))
  917. let trailers: HPACKHeaders = [
  918. ":status": "200",
  919. "grpc-status": "5",
  920. "grpc-message": "foo bar 🚀",
  921. "content-type": "invalid",
  922. ]
  923. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  924. XCTAssertEqual(error, .invalidContentType("invalid"))
  925. }
  926. }
  927. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndValidGRPCStatus() throws {
  928. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  929. writeState: .one(),
  930. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  931. ))
  932. let trailers: HPACKHeaders = [
  933. ":status": "418",
  934. "grpc-status": "5",
  935. ]
  936. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  937. XCTAssertEqual(
  938. error,
  939. .invalidHTTPStatusWithGRPCStatus(GRPCStatus(
  940. code: GRPCStatus.Code(rawValue: 5)!,
  941. message: nil
  942. ))
  943. )
  944. }
  945. }
  946. func testReceiveTrailersOnlyEndOfResponseStreamWithInvalidHTTPStatusAndNoGRPCStatus() throws {
  947. var stateMachine = self.makeStateMachine(.clientActiveServerIdle(
  948. writeState: .one(),
  949. pendingReadState: .init(arity: .one, messageEncoding: .disabled)
  950. ))
  951. let trailers: HPACKHeaders = [":status": "418"]
  952. stateMachine.receiveEndOfResponseStream(trailers).assertFailure { error in
  953. XCTAssertEqual(error, .invalidHTTPStatus("418"))
  954. }
  955. }
  956. }
  957. class ReadStateTests: GRPCTestCase {
  958. var allocator = ByteBufferAllocator()
  959. func writeMessage(_ message: String) -> ByteBuffer {
  960. var buffer = self.allocator.buffer(capacity: 5 + message.utf8.count)
  961. buffer.writeInteger(UInt8(0))
  962. buffer.writeInteger(UInt32(message.utf8.count))
  963. buffer.writeBytes(message.utf8)
  964. return buffer
  965. }
  966. func testReadWhenNoExpectedMessages() {
  967. var state: ReadState = .notReading
  968. var buffer = self.allocator.buffer(capacity: 0)
  969. state.readMessages(&buffer, maxLength: .max).assertFailure {
  970. XCTAssertEqual($0, .cardinalityViolation)
  971. }
  972. state.assertNotReading()
  973. }
  974. func testReadWithLeftOverBytesForOneExpectedMessage() throws {
  975. var buffer = self.writeMessage("Hello!")
  976. // And some extra junk bytes:
  977. let bytes: [UInt8] = [0x00]
  978. buffer.writeBytes(bytes)
  979. var state: ReadState = .one()
  980. state.readMessages(&buffer, maxLength: .max).assertFailure {
  981. XCTAssertEqual($0, .leftOverBytes)
  982. }
  983. state.assertNotReading()
  984. }
  985. func testReadTooManyMessagesForOneExpectedMessages() throws {
  986. // Write a message into the buffer twice:
  987. var buffer1 = self.writeMessage("Hello!")
  988. let buffer2 = buffer1
  989. buffer1.writeImmutableBuffer(buffer2)
  990. var state: ReadState = .one()
  991. state.readMessages(&buffer1, maxLength: .max).assertFailure {
  992. XCTAssertEqual($0, .cardinalityViolation)
  993. }
  994. state.assertNotReading()
  995. }
  996. func testReadOneMessageForOneExpectedMessages() throws {
  997. var buffer = self.writeMessage("Hello!")
  998. var state: ReadState = .one()
  999. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1000. XCTAssertEqual($0, [ByteBuffer(string: "Hello!")])
  1001. }
  1002. // We shouldn't be able to read anymore.
  1003. state.assertNotReading()
  1004. }
  1005. func testReadOneMessageForManyExpectedMessages() throws {
  1006. var buffer = self.writeMessage("Hello!")
  1007. var state: ReadState = .many()
  1008. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1009. XCTAssertEqual($0, [ByteBuffer(string: "Hello!")])
  1010. }
  1011. // We should still be able to read.
  1012. state.assertReading()
  1013. }
  1014. func testReadManyMessagesForManyExpectedMessages() throws {
  1015. let lengthPrefixed = self.writeMessage("Hello!")
  1016. var buffer = lengthPrefixed
  1017. buffer.writeImmutableBuffer(lengthPrefixed)
  1018. buffer.writeImmutableBuffer(lengthPrefixed)
  1019. var state: ReadState = .many()
  1020. state.readMessages(&buffer, maxLength: .max).assertSuccess {
  1021. XCTAssertEqual($0, Array(repeating: ByteBuffer(string: "Hello!"), count: 3))
  1022. }
  1023. // We should still be able to read.
  1024. state.assertReading()
  1025. }
  1026. }
  1027. // MARK: Result helpers
  1028. extension Result {
  1029. /// Asserts the `Result` was a success.
  1030. func assertSuccess(verify: (Success) throws -> Void = { _ in }) {
  1031. switch self {
  1032. case let .success(success):
  1033. do {
  1034. try verify(success)
  1035. } catch {
  1036. XCTFail("verify threw: \(error)")
  1037. }
  1038. case let .failure(error):
  1039. XCTFail("unexpected .failure: \(error)")
  1040. }
  1041. }
  1042. /// Asserts the `Result` was a failure.
  1043. func assertFailure(verify: (Failure) throws -> Void = { _ in }) {
  1044. switch self {
  1045. case let .success(success):
  1046. XCTFail("unexpected .success: \(success)")
  1047. case let .failure(error):
  1048. do {
  1049. try verify(error)
  1050. } catch {
  1051. XCTFail("verify threw: \(error)")
  1052. }
  1053. }
  1054. }
  1055. }
  1056. // MARK: ReadState, PendingWriteState, and WriteState helpers
  1057. extension ReadState {
  1058. static func one() -> ReadState {
  1059. let reader = LengthPrefixedMessageReader()
  1060. return .reading(.one, reader)
  1061. }
  1062. static func many() -> ReadState {
  1063. let reader = LengthPrefixedMessageReader()
  1064. return .reading(.many, reader)
  1065. }
  1066. func assertReading() {
  1067. switch self {
  1068. case .reading:
  1069. ()
  1070. case .notReading:
  1071. XCTFail("unexpected state .notReading")
  1072. }
  1073. }
  1074. func assertNotReading() {
  1075. switch self {
  1076. case .reading:
  1077. XCTFail("unexpected state .reading")
  1078. case .notReading:
  1079. ()
  1080. }
  1081. }
  1082. }
  1083. extension PendingWriteState {
  1084. static func one() -> PendingWriteState {
  1085. return .init(arity: .one, contentType: .protobuf)
  1086. }
  1087. static func many() -> PendingWriteState {
  1088. return .init(arity: .many, contentType: .protobuf)
  1089. }
  1090. }
  1091. extension WriteState {
  1092. static func one() -> WriteState {
  1093. return .init(
  1094. arity: .one,
  1095. contentType: .protobuf,
  1096. writer: .init(compression: .none, allocator: .init())
  1097. )
  1098. }
  1099. static func many() -> WriteState {
  1100. return .init(
  1101. arity: .many,
  1102. contentType: .protobuf,
  1103. writer: .init(compression: .none, allocator: .init())
  1104. )
  1105. }
  1106. }