HTTP2ToRawGRPCStateMachineTests.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  23. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  24. typealias State = StateMachine.State
  25. typealias Action = StateMachine.Action
  26. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  27. private var eventLoop: EventLoop {
  28. return EmbeddedEventLoop()
  29. }
  30. /// An allocator, just here for convenience.
  31. private let allocator = ByteBufferAllocator()
  32. private func makeHeaders(
  33. path: String = "/echo.Echo/Get",
  34. contentType: String?,
  35. encoding: String? = nil,
  36. acceptEncoding: [String]? = nil
  37. ) -> HPACKHeaders {
  38. var headers = HPACKHeaders()
  39. headers.add(name: ":path", value: path)
  40. if let contentType = contentType {
  41. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  42. }
  43. if let encoding = encoding {
  44. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  45. }
  46. if let acceptEncoding = acceptEncoding {
  47. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  48. }
  49. return headers
  50. }
  51. private func makeHeaders(
  52. path: String = "/echo.Echo/Get",
  53. contentType: ContentType? = .protobuf,
  54. encoding: CompressionAlgorithm? = nil,
  55. acceptEncoding: [CompressionAlgorithm]? = nil
  56. ) -> HPACKHeaders {
  57. return self.makeHeaders(
  58. path: path,
  59. contentType: contentType?.canonicalValue,
  60. encoding: encoding?.name,
  61. acceptEncoding: acceptEncoding?.map { $0.name }
  62. )
  63. }
  64. /// A minimum set of viable request headers for the service providers we register by default.
  65. private var viableHeaders: HPACKHeaders {
  66. return self.makeHeaders(
  67. path: "/echo.Echo/Get",
  68. contentType: "application/grpc"
  69. )
  70. }
  71. /// Just the echo service.
  72. private var services: [Substring: CallHandlerProvider] {
  73. let provider = EchoProvider()
  74. return [provider.serviceName: provider]
  75. }
  76. private enum DesiredState {
  77. case requestOpenResponseIdle(pipelineConfigured: Bool)
  78. case requestOpenResponseOpen
  79. case requestClosedResponseIdle(pipelineConfigured: Bool)
  80. case requestClosedResponseOpen
  81. }
  82. /// Makes a state machine in the desired state.
  83. private func makeStateMachine(
  84. services: [Substring: CallHandlerProvider]? = nil,
  85. encoding: ServerMessageEncoding = .disabled,
  86. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  87. ) -> StateMachine {
  88. var machine = StateMachine(services: services ?? self.services, encoding: encoding)
  89. let receiveHeadersAction = machine.receive(
  90. headers: self.viableHeaders,
  91. eventLoop: self.eventLoop,
  92. errorDelegate: nil,
  93. logger: self.logger
  94. )
  95. assertThat(receiveHeadersAction, .is(.configure()))
  96. switch state {
  97. case .requestOpenResponseIdle(pipelineConfigured: false):
  98. ()
  99. case .requestOpenResponseIdle(pipelineConfigured: true):
  100. let configuredAction = machine.pipelineConfigured()
  101. assertThat(configuredAction, .is(.forwardHeaders()))
  102. case .requestOpenResponseOpen:
  103. let configuredAction = machine.pipelineConfigured()
  104. assertThat(configuredAction, .is(.forwardHeaders()))
  105. let sendHeadersAction = machine.send(headers: [:], promise: nil)
  106. assertThat(sendHeadersAction, .is(.write(.headers())))
  107. case .requestClosedResponseIdle(pipelineConfigured: false):
  108. var emptyBuffer = ByteBuffer()
  109. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  110. assertThat(receiveEnd, .is(.none()))
  111. case .requestClosedResponseIdle(pipelineConfigured: true):
  112. let configuredAction = machine.pipelineConfigured()
  113. assertThat(configuredAction, .is(.forwardHeaders()))
  114. var emptyBuffer = ByteBuffer()
  115. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  116. assertThat(receiveEnd, .is(.readNextRequest()))
  117. case .requestClosedResponseOpen:
  118. let configuredAction = machine.pipelineConfigured()
  119. assertThat(configuredAction, .is(.forwardHeaders()))
  120. var emptyBuffer = ByteBuffer()
  121. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  122. assertThat(receiveEndAction, .is(.readNextRequest()))
  123. let readAction = machine.readNextRequest()
  124. assertThat(readAction, .is(.forwardEnd()))
  125. let sendHeadersAction = machine.send(headers: [:], promise: nil)
  126. assertThat(sendHeadersAction, .is(.write(.headers())))
  127. }
  128. return machine
  129. }
  130. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  131. /// message bytes (UInt8 ⨉ message length).
  132. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  133. var buffer = ByteBuffer()
  134. buffer.reserveCapacity(count + 5)
  135. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  136. buffer.writeInteger(UInt32(count))
  137. buffer.writeRepeatingByte(0, count: count)
  138. return buffer
  139. }
  140. // MARK: Receive Headers Tests
  141. func testReceiveValidHeaders() {
  142. var machine = StateMachine(services: self.services, encoding: .disabled)
  143. let action = machine.receive(
  144. headers: self.viableHeaders,
  145. eventLoop: self.eventLoop,
  146. errorDelegate: nil,
  147. logger: self.logger
  148. )
  149. assertThat(action, .is(.configure()))
  150. }
  151. func testReceiveInvalidContentType() {
  152. var machine = StateMachine(services: self.services, encoding: .disabled)
  153. let action = machine.receive(
  154. headers: self.makeHeaders(contentType: "application/json"),
  155. eventLoop: self.eventLoop,
  156. errorDelegate: nil, logger: self.logger
  157. )
  158. assertThat(
  159. action,
  160. .is(.write(.headers(.contains(":status", ["415"]), endStream: true), flush: true))
  161. )
  162. }
  163. func testReceiveValidHeadersForUnknownService() {
  164. var machine = StateMachine(services: self.services, encoding: .disabled)
  165. let action = machine.receive(
  166. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  167. eventLoop: self.eventLoop,
  168. errorDelegate: nil,
  169. logger: self.logger
  170. )
  171. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  172. }
  173. func testReceiveValidHeadersForUnknownMethod() {
  174. var machine = StateMachine(services: self.services, encoding: .disabled)
  175. let action = machine.receive(
  176. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  177. eventLoop: self.eventLoop,
  178. errorDelegate: nil,
  179. logger: self.logger
  180. )
  181. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  182. }
  183. func testReceiveValidHeadersForInvalidPath() {
  184. var machine = StateMachine(services: self.services, encoding: .disabled)
  185. let action = machine.receive(
  186. headers: self.makeHeaders(path: "nope"),
  187. eventLoop: self.eventLoop,
  188. errorDelegate: nil,
  189. logger: self.logger
  190. )
  191. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  192. }
  193. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  194. var machine = StateMachine(services: self.services, encoding: .disabled)
  195. let action = machine.receive(
  196. headers: self.makeHeaders(encoding: .gzip),
  197. eventLoop: self.eventLoop,
  198. errorDelegate: nil,
  199. logger: self.logger
  200. )
  201. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  202. }
  203. func testReceiveHeadersWithMultipleEncodings() {
  204. var machine = StateMachine(services: self.services, encoding: .disabled)
  205. // We can't have multiple encodings.
  206. let action = machine.receive(
  207. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  208. eventLoop: self.eventLoop,
  209. errorDelegate: nil,
  210. logger: self.logger
  211. )
  212. assertThat(action, .is(.write(.trailersOnly(code: .invalidArgument), flush: true)))
  213. }
  214. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  215. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  216. let action = machine.receive(
  217. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  218. eventLoop: self.eventLoop,
  219. errorDelegate: nil,
  220. logger: self.logger
  221. )
  222. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  223. assertThat(
  224. action,
  225. .is(.write(.headers(.contains("grpc-accept-encoding", ["deflate", "identity"])), flush: true))
  226. )
  227. }
  228. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  229. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  230. // We didn't advertise gzip, but we do support it.
  231. let action = machine.receive(
  232. headers: self.makeHeaders(encoding: .gzip),
  233. eventLoop: self.eventLoop,
  234. errorDelegate: nil,
  235. logger: self.logger
  236. )
  237. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  238. // metadata. Send back headers to test this.
  239. assertThat(action, .is(.configure()))
  240. let sendAction = machine.send(headers: [:], promise: nil)
  241. assertThat(sendAction, .write(.headers(.contains(
  242. "grpc-accept-encoding",
  243. ["deflate", "identity", "gzip"]
  244. ))))
  245. }
  246. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  247. var machine = StateMachine(services: self.services, encoding: .disabled)
  248. // Identity is always supported, even if compression is disabled.
  249. let action = machine.receive(
  250. headers: self.makeHeaders(encoding: .identity),
  251. eventLoop: self.eventLoop,
  252. errorDelegate: nil,
  253. logger: self.logger
  254. )
  255. assertThat(action, .is(.configure()))
  256. }
  257. func testReceiveHeadersNegotiatesResponseEncoding() {
  258. var machine = StateMachine(services: self.services, encoding: .enabled(.gzip, .deflate))
  259. let action = machine.receive(
  260. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  261. eventLoop: self.eventLoop,
  262. errorDelegate: nil,
  263. logger: self.logger
  264. )
  265. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  266. assertThat(action, .is(.configure()))
  267. let sendAction = machine.send(headers: [:], promise: nil)
  268. assertThat(sendAction, .write(.headers(.contains("grpc-encoding", ["deflate"]))))
  269. }
  270. // MARK: Receive Data Tests
  271. func testReceiveDataBeforePipelineIsConfigured() {
  272. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  273. let buffer = self.makeLengthPrefixedBytes(1024)
  274. // Receive a request. The pipeline isn't configured so no action.
  275. var buffer1 = buffer
  276. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  277. assertThat(action1, .is(.none()))
  278. // Receive another request, still not configured so no action.
  279. var buffer2 = buffer
  280. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  281. assertThat(action2, .is(.none()))
  282. // Configure the pipeline. We'll have headers to forward and messages to read.
  283. let action3 = machine.pipelineConfigured()
  284. assertThat(action3, .is(.forwardHeadersThenRead()))
  285. // Do the first read.
  286. let action4 = machine.readNextRequest()
  287. assertThat(action4, .is(.forwardMessageThenRead()))
  288. // Do the second and final read.
  289. let action5 = machine.readNextRequest()
  290. assertThat(action5, .is(.forwardMessage()))
  291. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  292. // after receiving.
  293. var emptyBuffer = ByteBuffer()
  294. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  295. assertThat(action6, .is(.readNextRequest()))
  296. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  297. let action7 = machine.readNextRequest()
  298. assertThat(action7, .is(.forwardEnd()))
  299. }
  300. func testReceiveDataWhenPipelineIsConfigured() {
  301. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  302. let buffer = self.makeLengthPrefixedBytes(1024)
  303. // Receive a request. The pipeline is configured, so we should try reading.
  304. var buffer1 = buffer
  305. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  306. assertThat(action1, .is(.readNextRequest()))
  307. // Read the message, consuming all bytes.
  308. let action2 = machine.readNextRequest()
  309. assertThat(action2, .is(.forwardMessage()))
  310. // Receive another request, we'll split buffer into two parts.
  311. var buffer3 = buffer
  312. var buffer2 = buffer3.readSlice(length: 20)!
  313. // Not enough bytes to form a message, so read won't result in anything.
  314. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  315. assertThat(action4, .is(.readNextRequest()))
  316. let action5 = machine.readNextRequest()
  317. assertThat(action5, .is(.none()))
  318. // Now the rest of the message.
  319. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  320. assertThat(action6, .is(.readNextRequest()))
  321. let action7 = machine.readNextRequest()
  322. assertThat(action7, .is(.forwardMessage()))
  323. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  324. // after receiving.
  325. var emptyBuffer = ByteBuffer()
  326. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  327. assertThat(action8, .is(.readNextRequest()))
  328. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  329. let action9 = machine.readNextRequest()
  330. assertThat(action9, .is(.forwardEnd()))
  331. }
  332. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  333. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  334. let buffer = self.makeLengthPrefixedBytes(1024)
  335. // No action: the pipeline isn't configured.
  336. var buffer1 = buffer
  337. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  338. assertThat(action1, .is(.none()))
  339. // Still no action.
  340. var buffer2 = buffer
  341. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  342. assertThat(action2, .is(.none()))
  343. // Configure the pipeline. We have headers to forward and messages to read.
  344. let action3 = machine.pipelineConfigured()
  345. assertThat(action3, .is(.forwardHeadersThenRead()))
  346. // Read the first message.
  347. let action4 = machine.readNextRequest()
  348. assertThat(action4, .is(.forwardMessageThenRead()))
  349. // Read the second and final message.
  350. let action5 = machine.readNextRequest()
  351. assertThat(action5, .is(.forwardMessageAndEnd()))
  352. }
  353. func testReceiveDataAfterPipelineIsConfigured() {
  354. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  355. let buffer = self.makeLengthPrefixedBytes(1024)
  356. // Pipeline is configured, we should be able to read then forward the message.
  357. var buffer1 = buffer
  358. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  359. assertThat(action1, .is(.readNextRequest()))
  360. let action2 = machine.readNextRequest()
  361. assertThat(action2, .is(.forwardMessage()))
  362. // Receive another message with end stream set.
  363. // Still no action.
  364. var buffer2 = buffer
  365. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  366. assertThat(action3, .is(.readNextRequest()))
  367. let action4 = machine.readNextRequest()
  368. assertThat(action4, .is(.forwardMessageAndEnd()))
  369. }
  370. func testReceiveDataWhenResponseStreamIsOpen() {
  371. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  372. let buffer = self.makeLengthPrefixedBytes(1024)
  373. // Receive a message. We should read and forward it.
  374. var buffer1 = buffer
  375. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  376. assertThat(action1, .is(.readNextRequest()))
  377. let action2 = machine.readNextRequest()
  378. assertThat(action2, .is(.forwardMessage()))
  379. // Receive a message and end stream. We should read it then forward message and end.
  380. var buffer2 = buffer
  381. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  382. assertThat(action3, .is(.readNextRequest()))
  383. let action4 = machine.readNextRequest()
  384. assertThat(action4, .is(.forwardMessageAndEnd()))
  385. }
  386. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  387. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  388. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  389. let action1 = machine.receive(buffer: &buffer, endStream: false)
  390. assertThat(action1, .is(.readNextRequest()))
  391. let action2 = machine.readNextRequest()
  392. assertThat(action2, .is(.errorCaught()))
  393. }
  394. func testReceiveDataWhenClosed() {
  395. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  396. // Close while the request stream is still open.
  397. let action1 = machine.send(
  398. status: GRPCStatus(code: .ok, message: "ok"),
  399. trailers: [:],
  400. promise: nil
  401. )
  402. assertThat(action1, .is(.write(.trailers(code: .ok, message: "ok"))))
  403. // Now receive end of request stream: no action, we're closed.
  404. var emptyBuffer = ByteBuffer()
  405. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  406. assertThat(action2, .is(.none()))
  407. }
  408. // MARK: Send Metadata Tests
  409. func testSendMetadataRequestStreamOpen() {
  410. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  411. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  412. // We'll just validate more 'normal' things here.
  413. let action1 = machine.send(headers: [:], promise: nil)
  414. assertThat(action1, .is(.write(.headers(.contains(":status", ["200"])))))
  415. let action2 = machine.send(headers: [:], promise: nil)
  416. assertThat(action2, .is(.completePromise(with: .failure())))
  417. }
  418. func testSendMetadataRequestStreamClosed() {
  419. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  420. var buffer = ByteBuffer()
  421. let action1 = machine.receive(buffer: &buffer, endStream: true)
  422. assertThat(action1, .is(.readNextRequest()))
  423. let action2 = machine.readNextRequest()
  424. assertThat(action2, .is(.forwardEnd()))
  425. // Write some headers back.
  426. let action3 = machine.send(headers: [:], promise: nil)
  427. assertThat(action3, .is(.write(.headers(.contains(":status", ["200"])))))
  428. }
  429. func testSendMetadataWhenOpen() {
  430. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  431. // Response stream is already open.
  432. let action = machine.send(headers: [:], promise: nil)
  433. assertThat(action, .is(.completePromise(with: .failure())))
  434. }
  435. func testSendMetadataNormalizesUserProvidedMetadata() {
  436. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  437. let action = machine.send(headers: ["FOO": "bar"], promise: nil)
  438. assertThat(action, .is(.write(.headers(.contains(caseSensitive: "foo")))))
  439. }
  440. // MARK: Send Data Tests
  441. func testSendData() {
  442. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  443. var machine = self.makeStateMachine(state: startingState)
  444. let buffer = ByteBuffer(repeating: 0, count: 1024)
  445. // We should be able to do this multiple times.
  446. for _ in 0 ..< 5 {
  447. let action = machine.send(
  448. buffer: buffer,
  449. allocator: self.allocator,
  450. compress: false,
  451. promise: nil
  452. )
  453. assertThat(action, .is(.write(.data(endStream: false))))
  454. }
  455. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  456. // write as normal.
  457. let action = machine.send(
  458. buffer: buffer,
  459. allocator: self.allocator,
  460. compress: true,
  461. promise: nil
  462. )
  463. assertThat(action, .is(.write(.data(endStream: false))))
  464. }
  465. }
  466. func testSendDataAfterClose() {
  467. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  468. let action1 = machine.send(status: .ok, trailers: [:], promise: nil)
  469. assertThat(action1, .is(.write(.headers(.contains("grpc-status", ["0"]), endStream: true))))
  470. // We're already closed, this should fail.
  471. let buffer = ByteBuffer(repeating: 0, count: 1024)
  472. let action2 = machine.send(
  473. buffer: buffer,
  474. allocator: self.allocator,
  475. compress: false,
  476. promise: nil
  477. )
  478. assertThat(action2, .is(.completePromise(with: .failure())))
  479. }
  480. func testSendDataBeforeMetadata() {
  481. var machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  482. // Response stream is still idle, so this should fail.
  483. let buffer = ByteBuffer(repeating: 0, count: 1024)
  484. let action2 = machine.send(
  485. buffer: buffer,
  486. allocator: self.allocator,
  487. compress: false,
  488. promise: nil
  489. )
  490. assertThat(action2, .is(.completePromise(with: .failure())))
  491. }
  492. // MARK: Send End
  493. func testSendEndWhenResponseStreamIsIdle() {
  494. for state in [
  495. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  496. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  497. ] {
  498. var machine = self.makeStateMachine(state: state)
  499. let action1 = machine.send(status: .ok, trailers: [:], promise: nil)
  500. // This'll be a trailers-only response.
  501. assertThat(action1, .is(.write(.trailersOnly(code: .ok))))
  502. // Already closed.
  503. let action2 = machine.send(status: .ok, trailers: [:], promise: nil)
  504. assertThat(action2, .is(.completePromise(with: .failure())))
  505. }
  506. }
  507. func testSendEndWhenResponseStreamIsOpen() {
  508. for state in [
  509. DesiredState.requestOpenResponseOpen,
  510. DesiredState.requestClosedResponseOpen,
  511. ] {
  512. var machine = self.makeStateMachine(state: state)
  513. let action = machine.send(
  514. status: GRPCStatus(code: .ok, message: "ok"),
  515. trailers: [:],
  516. promise: nil
  517. )
  518. assertThat(action, .is(.write(.trailers(code: .ok, message: "ok"))))
  519. // Already closed.
  520. let action2 = machine.send(status: .ok, trailers: [:], promise: nil)
  521. assertThat(action2, .is(.completePromise(with: .failure())))
  522. }
  523. }
  524. }
  525. extension ServerMessageEncoding {
  526. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  527. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  528. }
  529. }