HTTP2ToRawGRPCStateMachineTests.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  23. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  24. typealias State = StateMachine.State
  25. typealias Action = StateMachine.Action
  26. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  27. private var eventLoop: EventLoop {
  28. return EmbeddedEventLoop()
  29. }
  30. /// An allocator, just here for convenience.
  31. private let allocator = ByteBufferAllocator()
  32. private func makeHeaders(
  33. path: String = "/echo.Echo/Get",
  34. contentType: String?,
  35. encoding: String? = nil,
  36. acceptEncoding: [String]? = nil
  37. ) -> HPACKHeaders {
  38. var headers = HPACKHeaders()
  39. headers.add(name: ":path", value: path)
  40. if let contentType = contentType {
  41. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  42. }
  43. if let encoding = encoding {
  44. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  45. }
  46. if let acceptEncoding = acceptEncoding {
  47. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  48. }
  49. return headers
  50. }
  51. private func makeHeaders(
  52. path: String = "/echo.Echo/Get",
  53. contentType: ContentType? = .protobuf,
  54. encoding: CompressionAlgorithm? = nil,
  55. acceptEncoding: [CompressionAlgorithm]? = nil
  56. ) -> HPACKHeaders {
  57. return self.makeHeaders(
  58. path: path,
  59. contentType: contentType?.canonicalValue,
  60. encoding: encoding?.name,
  61. acceptEncoding: acceptEncoding?.map { $0.name }
  62. )
  63. }
  64. /// A minimum set of viable request headers for the service providers we register by default.
  65. private var viableHeaders: HPACKHeaders {
  66. return self.makeHeaders(
  67. path: "/echo.Echo/Get",
  68. contentType: "application/grpc"
  69. )
  70. }
  71. /// Just the echo service.
  72. private var services: [Substring: CallHandlerProvider] {
  73. let provider = EchoProvider()
  74. return [provider.serviceName: provider]
  75. }
  76. private enum DesiredState {
  77. case requestOpenResponseIdle(pipelineConfigured: Bool)
  78. case requestOpenResponseOpen
  79. case requestClosedResponseIdle(pipelineConfigured: Bool)
  80. case requestClosedResponseOpen
  81. }
  82. /// Makes a state machine in the desired state.
  83. private func makeStateMachine(
  84. services: [Substring: CallHandlerProvider]? = nil,
  85. encoding: ServerMessageEncoding = .disabled,
  86. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  87. ) -> StateMachine {
  88. var machine = StateMachine(services: services ?? self.services, encoding: encoding)
  89. let receiveHeadersAction = machine.receive(
  90. headers: self.viableHeaders,
  91. eventLoop: self.eventLoop,
  92. errorDelegate: nil,
  93. remoteAddress: nil,
  94. logger: self.logger
  95. )
  96. assertThat(receiveHeadersAction, .is(.configure()))
  97. switch state {
  98. case .requestOpenResponseIdle(pipelineConfigured: false):
  99. ()
  100. case .requestOpenResponseIdle(pipelineConfigured: true):
  101. let configuredAction = machine.pipelineConfigured()
  102. assertThat(configuredAction, .is(.forwardHeaders()))
  103. case .requestOpenResponseOpen:
  104. let configuredAction = machine.pipelineConfigured()
  105. assertThat(configuredAction, .is(.forwardHeaders()))
  106. let sendHeadersAction = machine.send(headers: [:], promise: nil)
  107. assertThat(sendHeadersAction, .is(.write(.headers())))
  108. case .requestClosedResponseIdle(pipelineConfigured: false):
  109. var emptyBuffer = ByteBuffer()
  110. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  111. assertThat(receiveEnd, .is(.none()))
  112. case .requestClosedResponseIdle(pipelineConfigured: true):
  113. let configuredAction = machine.pipelineConfigured()
  114. assertThat(configuredAction, .is(.forwardHeaders()))
  115. var emptyBuffer = ByteBuffer()
  116. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  117. assertThat(receiveEnd, .is(.readNextRequest()))
  118. case .requestClosedResponseOpen:
  119. let configuredAction = machine.pipelineConfigured()
  120. assertThat(configuredAction, .is(.forwardHeaders()))
  121. var emptyBuffer = ByteBuffer()
  122. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  123. assertThat(receiveEndAction, .is(.readNextRequest()))
  124. let readAction = machine.readNextRequest()
  125. assertThat(readAction, .is(.forwardEnd()))
  126. let sendHeadersAction = machine.send(headers: [:], promise: nil)
  127. assertThat(sendHeadersAction, .is(.write(.headers())))
  128. }
  129. return machine
  130. }
  131. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  132. /// message bytes (UInt8 ⨉ message length).
  133. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  134. var buffer = ByteBuffer()
  135. buffer.reserveCapacity(count + 5)
  136. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  137. buffer.writeInteger(UInt32(count))
  138. buffer.writeRepeatingByte(0, count: count)
  139. return buffer
  140. }
  141. // MARK: Receive Headers Tests
  142. func testReceiveValidHeaders() {
  143. var machine = StateMachine(services: self.services, encoding: .disabled)
  144. let action = machine.receive(
  145. headers: self.viableHeaders,
  146. eventLoop: self.eventLoop,
  147. errorDelegate: nil,
  148. remoteAddress: nil,
  149. logger: self.logger
  150. )
  151. assertThat(action, .is(.configure()))
  152. }
  153. func testReceiveInvalidContentType() {
  154. var machine = StateMachine(services: self.services, encoding: .disabled)
  155. let action = machine.receive(
  156. headers: self.makeHeaders(contentType: "application/json"),
  157. eventLoop: self.eventLoop,
  158. errorDelegate: nil,
  159. remoteAddress: nil,
  160. logger: self.logger
  161. )
  162. assertThat(
  163. action,
  164. .is(.write(.headers(.contains(":status", ["415"]), endStream: true), flush: true))
  165. )
  166. }
  167. func testReceiveValidHeadersForUnknownService() {
  168. var machine = StateMachine(services: self.services, encoding: .disabled)
  169. let action = machine.receive(
  170. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  171. eventLoop: self.eventLoop,
  172. errorDelegate: nil,
  173. remoteAddress: nil,
  174. logger: self.logger
  175. )
  176. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  177. }
  178. func testReceiveValidHeadersForUnknownMethod() {
  179. var machine = StateMachine(services: self.services, encoding: .disabled)
  180. let action = machine.receive(
  181. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  182. eventLoop: self.eventLoop,
  183. errorDelegate: nil,
  184. remoteAddress: nil,
  185. logger: self.logger
  186. )
  187. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  188. }
  189. func testReceiveValidHeadersForInvalidPath() {
  190. var machine = StateMachine(services: self.services, encoding: .disabled)
  191. let action = machine.receive(
  192. headers: self.makeHeaders(path: "nope"),
  193. eventLoop: self.eventLoop,
  194. errorDelegate: nil,
  195. remoteAddress: nil,
  196. logger: self.logger
  197. )
  198. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  199. }
  200. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  201. var machine = StateMachine(services: self.services, encoding: .disabled)
  202. let action = machine.receive(
  203. headers: self.makeHeaders(encoding: .gzip),
  204. eventLoop: self.eventLoop,
  205. errorDelegate: nil,
  206. remoteAddress: nil,
  207. logger: self.logger
  208. )
  209. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  210. }
  211. func testReceiveHeadersWithMultipleEncodings() {
  212. var machine = StateMachine(services: self.services, encoding: .disabled)
  213. // We can't have multiple encodings.
  214. let action = machine.receive(
  215. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  216. eventLoop: self.eventLoop,
  217. errorDelegate: nil,
  218. remoteAddress: nil,
  219. logger: self.logger
  220. )
  221. assertThat(action, .is(.write(.trailersOnly(code: .invalidArgument), flush: true)))
  222. }
  223. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  224. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  225. let action = machine.receive(
  226. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  227. eventLoop: self.eventLoop,
  228. errorDelegate: nil,
  229. remoteAddress: nil,
  230. logger: self.logger
  231. )
  232. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  233. assertThat(
  234. action,
  235. .is(.write(.headers(.contains("grpc-accept-encoding", ["deflate", "identity"])), flush: true))
  236. )
  237. }
  238. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  239. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  240. // We didn't advertise gzip, but we do support it.
  241. let action = machine.receive(
  242. headers: self.makeHeaders(encoding: .gzip),
  243. eventLoop: self.eventLoop,
  244. errorDelegate: nil,
  245. remoteAddress: nil,
  246. logger: self.logger
  247. )
  248. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  249. // metadata. Send back headers to test this.
  250. assertThat(action, .is(.configure()))
  251. let sendAction = machine.send(headers: [:], promise: nil)
  252. assertThat(sendAction, .write(.headers(.contains(
  253. "grpc-accept-encoding",
  254. ["deflate", "identity", "gzip"]
  255. ))))
  256. }
  257. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  258. var machine = StateMachine(services: self.services, encoding: .disabled)
  259. // Identity is always supported, even if compression is disabled.
  260. let action = machine.receive(
  261. headers: self.makeHeaders(encoding: .identity),
  262. eventLoop: self.eventLoop,
  263. errorDelegate: nil,
  264. remoteAddress: nil,
  265. logger: self.logger
  266. )
  267. assertThat(action, .is(.configure()))
  268. }
  269. func testReceiveHeadersNegotiatesResponseEncoding() {
  270. var machine = StateMachine(services: self.services, encoding: .enabled(.gzip, .deflate))
  271. let action = machine.receive(
  272. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  273. eventLoop: self.eventLoop,
  274. errorDelegate: nil,
  275. remoteAddress: nil,
  276. logger: self.logger
  277. )
  278. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  279. assertThat(action, .is(.configure()))
  280. let sendAction = machine.send(headers: [:], promise: nil)
  281. assertThat(sendAction, .write(.headers(.contains("grpc-encoding", ["deflate"]))))
  282. }
  283. // MARK: Receive Data Tests
  284. func testReceiveDataBeforePipelineIsConfigured() {
  285. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  286. let buffer = self.makeLengthPrefixedBytes(1024)
  287. // Receive a request. The pipeline isn't configured so no action.
  288. var buffer1 = buffer
  289. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  290. assertThat(action1, .is(.none()))
  291. // Receive another request, still not configured so no action.
  292. var buffer2 = buffer
  293. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  294. assertThat(action2, .is(.none()))
  295. // Configure the pipeline. We'll have headers to forward and messages to read.
  296. let action3 = machine.pipelineConfigured()
  297. assertThat(action3, .is(.forwardHeadersThenRead()))
  298. // Do the first read.
  299. let action4 = machine.readNextRequest()
  300. assertThat(action4, .is(.forwardMessageThenRead()))
  301. // Do the second and final read.
  302. let action5 = machine.readNextRequest()
  303. assertThat(action5, .is(.forwardMessage()))
  304. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  305. // after receiving.
  306. var emptyBuffer = ByteBuffer()
  307. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  308. assertThat(action6, .is(.readNextRequest()))
  309. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  310. let action7 = machine.readNextRequest()
  311. assertThat(action7, .is(.forwardEnd()))
  312. }
  313. func testReceiveDataWhenPipelineIsConfigured() {
  314. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  315. let buffer = self.makeLengthPrefixedBytes(1024)
  316. // Receive a request. The pipeline is configured, so we should try reading.
  317. var buffer1 = buffer
  318. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  319. assertThat(action1, .is(.readNextRequest()))
  320. // Read the message, consuming all bytes.
  321. let action2 = machine.readNextRequest()
  322. assertThat(action2, .is(.forwardMessage()))
  323. // Receive another request, we'll split buffer into two parts.
  324. var buffer3 = buffer
  325. var buffer2 = buffer3.readSlice(length: 20)!
  326. // Not enough bytes to form a message, so read won't result in anything.
  327. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  328. assertThat(action4, .is(.readNextRequest()))
  329. let action5 = machine.readNextRequest()
  330. assertThat(action5, .is(.none()))
  331. // Now the rest of the message.
  332. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  333. assertThat(action6, .is(.readNextRequest()))
  334. let action7 = machine.readNextRequest()
  335. assertThat(action7, .is(.forwardMessage()))
  336. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  337. // after receiving.
  338. var emptyBuffer = ByteBuffer()
  339. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  340. assertThat(action8, .is(.readNextRequest()))
  341. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  342. let action9 = machine.readNextRequest()
  343. assertThat(action9, .is(.forwardEnd()))
  344. }
  345. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  346. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  347. let buffer = self.makeLengthPrefixedBytes(1024)
  348. // No action: the pipeline isn't configured.
  349. var buffer1 = buffer
  350. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  351. assertThat(action1, .is(.none()))
  352. // Still no action.
  353. var buffer2 = buffer
  354. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  355. assertThat(action2, .is(.none()))
  356. // Configure the pipeline. We have headers to forward and messages to read.
  357. let action3 = machine.pipelineConfigured()
  358. assertThat(action3, .is(.forwardHeadersThenRead()))
  359. // Read the first message.
  360. let action4 = machine.readNextRequest()
  361. assertThat(action4, .is(.forwardMessageThenRead()))
  362. // Read the second and final message.
  363. let action5 = machine.readNextRequest()
  364. assertThat(action5, .is(.forwardMessageAndEnd()))
  365. }
  366. func testReceiveDataAfterPipelineIsConfigured() {
  367. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  368. let buffer = self.makeLengthPrefixedBytes(1024)
  369. // Pipeline is configured, we should be able to read then forward the message.
  370. var buffer1 = buffer
  371. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  372. assertThat(action1, .is(.readNextRequest()))
  373. let action2 = machine.readNextRequest()
  374. assertThat(action2, .is(.forwardMessage()))
  375. // Receive another message with end stream set.
  376. // Still no action.
  377. var buffer2 = buffer
  378. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  379. assertThat(action3, .is(.readNextRequest()))
  380. let action4 = machine.readNextRequest()
  381. assertThat(action4, .is(.forwardMessageAndEnd()))
  382. }
  383. func testReceiveDataWhenResponseStreamIsOpen() {
  384. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  385. let buffer = self.makeLengthPrefixedBytes(1024)
  386. // Receive a message. We should read and forward it.
  387. var buffer1 = buffer
  388. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  389. assertThat(action1, .is(.readNextRequest()))
  390. let action2 = machine.readNextRequest()
  391. assertThat(action2, .is(.forwardMessage()))
  392. // Receive a message and end stream. We should read it then forward message and end.
  393. var buffer2 = buffer
  394. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  395. assertThat(action3, .is(.readNextRequest()))
  396. let action4 = machine.readNextRequest()
  397. assertThat(action4, .is(.forwardMessageAndEnd()))
  398. }
  399. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  400. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  401. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  402. let action1 = machine.receive(buffer: &buffer, endStream: false)
  403. assertThat(action1, .is(.readNextRequest()))
  404. let action2 = machine.readNextRequest()
  405. assertThat(action2, .is(.errorCaught()))
  406. }
  407. func testReceiveDataWhenClosed() {
  408. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  409. // Close while the request stream is still open.
  410. let action1 = machine.send(
  411. status: GRPCStatus(code: .ok, message: "ok"),
  412. trailers: [:],
  413. promise: nil
  414. )
  415. assertThat(action1, .is(.write(.trailers(code: .ok, message: "ok"))))
  416. // Now receive end of request stream: no action, we're closed.
  417. var emptyBuffer = ByteBuffer()
  418. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  419. assertThat(action2, .is(.none()))
  420. }
  421. // MARK: Send Metadata Tests
  422. func testSendMetadataRequestStreamOpen() {
  423. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  424. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  425. // We'll just validate more 'normal' things here.
  426. let action1 = machine.send(headers: [:], promise: nil)
  427. assertThat(action1, .is(.write(.headers(.contains(":status", ["200"])))))
  428. let action2 = machine.send(headers: [:], promise: nil)
  429. assertThat(action2, .is(.completePromise(with: .failure())))
  430. }
  431. func testSendMetadataRequestStreamClosed() {
  432. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  433. var buffer = ByteBuffer()
  434. let action1 = machine.receive(buffer: &buffer, endStream: true)
  435. assertThat(action1, .is(.readNextRequest()))
  436. let action2 = machine.readNextRequest()
  437. assertThat(action2, .is(.forwardEnd()))
  438. // Write some headers back.
  439. let action3 = machine.send(headers: [:], promise: nil)
  440. assertThat(action3, .is(.write(.headers(.contains(":status", ["200"])))))
  441. }
  442. func testSendMetadataWhenOpen() {
  443. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  444. // Response stream is already open.
  445. let action = machine.send(headers: [:], promise: nil)
  446. assertThat(action, .is(.completePromise(with: .failure())))
  447. }
  448. func testSendMetadataNormalizesUserProvidedMetadata() {
  449. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  450. let action = machine.send(headers: ["FOO": "bar"], promise: nil)
  451. assertThat(action, .is(.write(.headers(.contains(caseSensitive: "foo")))))
  452. }
  453. // MARK: Send Data Tests
  454. func testSendData() {
  455. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  456. var machine = self.makeStateMachine(state: startingState)
  457. let buffer = ByteBuffer(repeating: 0, count: 1024)
  458. // We should be able to do this multiple times.
  459. for _ in 0 ..< 5 {
  460. let action = machine.send(
  461. buffer: buffer,
  462. allocator: self.allocator,
  463. compress: false,
  464. promise: nil
  465. )
  466. assertThat(action, .is(.write(.data(endStream: false))))
  467. }
  468. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  469. // write as normal.
  470. let action = machine.send(
  471. buffer: buffer,
  472. allocator: self.allocator,
  473. compress: true,
  474. promise: nil
  475. )
  476. assertThat(action, .is(.write(.data(endStream: false))))
  477. }
  478. }
  479. func testSendDataAfterClose() {
  480. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  481. let action1 = machine.send(status: .ok, trailers: [:], promise: nil)
  482. assertThat(action1, .is(.write(.headers(.contains("grpc-status", ["0"]), endStream: true))))
  483. // We're already closed, this should fail.
  484. let buffer = ByteBuffer(repeating: 0, count: 1024)
  485. let action2 = machine.send(
  486. buffer: buffer,
  487. allocator: self.allocator,
  488. compress: false,
  489. promise: nil
  490. )
  491. assertThat(action2, .is(.completePromise(with: .failure())))
  492. }
  493. func testSendDataBeforeMetadata() {
  494. var machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  495. // Response stream is still idle, so this should fail.
  496. let buffer = ByteBuffer(repeating: 0, count: 1024)
  497. let action2 = machine.send(
  498. buffer: buffer,
  499. allocator: self.allocator,
  500. compress: false,
  501. promise: nil
  502. )
  503. assertThat(action2, .is(.completePromise(with: .failure())))
  504. }
  505. // MARK: Send End
  506. func testSendEndWhenResponseStreamIsIdle() {
  507. for state in [
  508. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  509. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  510. ] {
  511. var machine = self.makeStateMachine(state: state)
  512. let action1 = machine.send(status: .ok, trailers: [:], promise: nil)
  513. // This'll be a trailers-only response.
  514. assertThat(action1, .is(.write(.trailersOnly(code: .ok))))
  515. // Already closed.
  516. let action2 = machine.send(status: .ok, trailers: [:], promise: nil)
  517. assertThat(action2, .is(.completePromise(with: .failure())))
  518. }
  519. }
  520. func testSendEndWhenResponseStreamIsOpen() {
  521. for state in [
  522. DesiredState.requestOpenResponseOpen,
  523. DesiredState.requestClosedResponseOpen,
  524. ] {
  525. var machine = self.makeStateMachine(state: state)
  526. let action = machine.send(
  527. status: GRPCStatus(code: .ok, message: "ok"),
  528. trailers: [:],
  529. promise: nil
  530. )
  531. assertThat(action, .is(.write(.trailers(code: .ok, message: "ok"))))
  532. // Already closed.
  533. let action2 = machine.send(status: .ok, trailers: [:], promise: nil)
  534. assertThat(action2, .is(.completePromise(with: .failure())))
  535. }
  536. }
  537. }
  538. extension ServerMessageEncoding {
  539. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  540. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  541. }
  542. }