HTTP2ToRawGRPCStateMachineTests.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  23. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  24. typealias State = StateMachine.State
  25. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  26. private var eventLoop: EventLoop {
  27. return EmbeddedEventLoop()
  28. }
  29. /// An allocator, just here for convenience.
  30. private let allocator = ByteBufferAllocator()
  31. private func makeHeaders(
  32. path: String = "/echo.Echo/Get",
  33. contentType: String?,
  34. encoding: String? = nil,
  35. acceptEncoding: [String]? = nil
  36. ) -> HPACKHeaders {
  37. var headers = HPACKHeaders()
  38. headers.add(name: ":path", value: path)
  39. if let contentType = contentType {
  40. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  41. }
  42. if let encoding = encoding {
  43. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  44. }
  45. if let acceptEncoding = acceptEncoding {
  46. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  47. }
  48. return headers
  49. }
  50. private func makeHeaders(
  51. path: String = "/echo.Echo/Get",
  52. contentType: ContentType? = .protobuf,
  53. encoding: CompressionAlgorithm? = nil,
  54. acceptEncoding: [CompressionAlgorithm]? = nil
  55. ) -> HPACKHeaders {
  56. return self.makeHeaders(
  57. path: path,
  58. contentType: contentType?.canonicalValue,
  59. encoding: encoding?.name,
  60. acceptEncoding: acceptEncoding?.map { $0.name }
  61. )
  62. }
  63. /// A minimum set of viable request headers for the service providers we register by default.
  64. private var viableHeaders: HPACKHeaders {
  65. return self.makeHeaders(
  66. path: "/echo.Echo/Get",
  67. contentType: "application/grpc"
  68. )
  69. }
  70. /// Just the echo service.
  71. private var services: [Substring: CallHandlerProvider] {
  72. let provider = EchoProvider()
  73. return [provider.serviceName: provider]
  74. }
  75. private enum DesiredState {
  76. case requestOpenResponseIdle(pipelineConfigured: Bool)
  77. case requestOpenResponseOpen
  78. case requestClosedResponseIdle(pipelineConfigured: Bool)
  79. case requestClosedResponseOpen
  80. }
  81. /// Makes a state machine in the desired state.
  82. private func makeStateMachine(
  83. services: [Substring: CallHandlerProvider]? = nil,
  84. encoding: ServerMessageEncoding = .disabled,
  85. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  86. ) -> StateMachine {
  87. var machine = StateMachine(services: services ?? self.services, encoding: encoding)
  88. let receiveHeadersAction = machine.receive(
  89. headers: self.viableHeaders,
  90. eventLoop: self.eventLoop,
  91. errorDelegate: nil,
  92. remoteAddress: nil,
  93. logger: self.logger,
  94. allocator: ByteBufferAllocator(),
  95. responseWriter: NoOpResponseWriter()
  96. )
  97. assertThat(receiveHeadersAction, .is(.configure()))
  98. switch state {
  99. case .requestOpenResponseIdle(pipelineConfigured: false):
  100. ()
  101. case .requestOpenResponseIdle(pipelineConfigured: true):
  102. let configuredAction = machine.pipelineConfigured()
  103. assertThat(configuredAction, .is(.forwardHeaders()))
  104. case .requestOpenResponseOpen:
  105. let configuredAction = machine.pipelineConfigured()
  106. assertThat(configuredAction, .is(.forwardHeaders()))
  107. let sendHeadersAction = machine.send(headers: [:])
  108. assertThat(sendHeadersAction, .is(.success()))
  109. case .requestClosedResponseIdle(pipelineConfigured: false):
  110. var emptyBuffer = ByteBuffer()
  111. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  112. assertThat(receiveEnd, .is(false))
  113. case .requestClosedResponseIdle(pipelineConfigured: true):
  114. let configuredAction = machine.pipelineConfigured()
  115. assertThat(configuredAction, .is(.forwardHeaders()))
  116. var emptyBuffer = ByteBuffer()
  117. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  118. assertThat(receiveEnd, .is(true))
  119. case .requestClosedResponseOpen:
  120. let configuredAction = machine.pipelineConfigured()
  121. assertThat(configuredAction, .is(.forwardHeaders()))
  122. var emptyBuffer = ByteBuffer()
  123. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  124. assertThat(receiveEndAction, .is(true))
  125. let readAction = machine.readNextRequest()
  126. assertThat(readAction, .is(.forwardEnd()))
  127. let sendHeadersAction = machine.send(headers: [:])
  128. assertThat(sendHeadersAction, .is(.success()))
  129. }
  130. return machine
  131. }
  132. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  133. /// message bytes (UInt8 ⨉ message length).
  134. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  135. var buffer = ByteBuffer()
  136. buffer.reserveCapacity(count + 5)
  137. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  138. buffer.writeInteger(UInt32(count))
  139. buffer.writeRepeatingByte(0, count: count)
  140. return buffer
  141. }
  142. // MARK: Receive Headers Tests
  143. func testReceiveValidHeaders() {
  144. var machine = StateMachine(services: self.services, encoding: .disabled)
  145. let action = machine.receive(
  146. headers: self.viableHeaders,
  147. eventLoop: self.eventLoop,
  148. errorDelegate: nil,
  149. remoteAddress: nil,
  150. logger: self.logger,
  151. allocator: ByteBufferAllocator(),
  152. responseWriter: NoOpResponseWriter()
  153. )
  154. assertThat(action, .is(.configure()))
  155. }
  156. func testReceiveInvalidContentType() {
  157. var machine = StateMachine(services: self.services, encoding: .disabled)
  158. let action = machine.receive(
  159. headers: self.makeHeaders(contentType: "application/json"),
  160. eventLoop: self.eventLoop,
  161. errorDelegate: nil,
  162. remoteAddress: nil,
  163. logger: self.logger,
  164. allocator: ByteBufferAllocator(),
  165. responseWriter: NoOpResponseWriter()
  166. )
  167. assertThat(action, .is(.rejectRPC(.contains(":status", ["415"]))))
  168. }
  169. func testReceiveValidHeadersForUnknownService() {
  170. var machine = StateMachine(services: self.services, encoding: .disabled)
  171. let action = machine.receive(
  172. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  173. eventLoop: self.eventLoop,
  174. errorDelegate: nil,
  175. remoteAddress: nil,
  176. logger: self.logger,
  177. allocator: ByteBufferAllocator(),
  178. responseWriter: NoOpResponseWriter()
  179. )
  180. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  181. }
  182. func testReceiveValidHeadersForUnknownMethod() {
  183. var machine = StateMachine(services: self.services, encoding: .disabled)
  184. let action = machine.receive(
  185. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  186. eventLoop: self.eventLoop,
  187. errorDelegate: nil,
  188. remoteAddress: nil,
  189. logger: self.logger,
  190. allocator: ByteBufferAllocator(),
  191. responseWriter: NoOpResponseWriter()
  192. )
  193. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  194. }
  195. func testReceiveValidHeadersForInvalidPath() {
  196. var machine = StateMachine(services: self.services, encoding: .disabled)
  197. let action = machine.receive(
  198. headers: self.makeHeaders(path: "nope"),
  199. eventLoop: self.eventLoop,
  200. errorDelegate: nil,
  201. remoteAddress: nil,
  202. logger: self.logger,
  203. allocator: ByteBufferAllocator(),
  204. responseWriter: NoOpResponseWriter()
  205. )
  206. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  207. }
  208. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  209. var machine = StateMachine(services: self.services, encoding: .disabled)
  210. let action = machine.receive(
  211. headers: self.makeHeaders(encoding: .gzip),
  212. eventLoop: self.eventLoop,
  213. errorDelegate: nil,
  214. remoteAddress: nil,
  215. logger: self.logger,
  216. allocator: ByteBufferAllocator(),
  217. responseWriter: NoOpResponseWriter()
  218. )
  219. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  220. }
  221. func testReceiveHeadersWithMultipleEncodings() {
  222. var machine = StateMachine(services: self.services, encoding: .disabled)
  223. // We can't have multiple encodings.
  224. let action = machine.receive(
  225. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  226. eventLoop: self.eventLoop,
  227. errorDelegate: nil,
  228. remoteAddress: nil,
  229. logger: self.logger,
  230. allocator: ByteBufferAllocator(),
  231. responseWriter: NoOpResponseWriter()
  232. )
  233. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .invalidArgument))))
  234. }
  235. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  236. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  237. let action = machine.receive(
  238. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  239. eventLoop: self.eventLoop,
  240. errorDelegate: nil,
  241. remoteAddress: nil,
  242. logger: self.logger,
  243. allocator: ByteBufferAllocator(),
  244. responseWriter: NoOpResponseWriter()
  245. )
  246. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  247. assertThat(
  248. action,
  249. .is(.rejectRPC(.contains("grpc-accept-encoding", ["deflate", "identity"])))
  250. )
  251. }
  252. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  253. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  254. // We didn't advertise gzip, but we do support it.
  255. let action = machine.receive(
  256. headers: self.makeHeaders(encoding: .gzip),
  257. eventLoop: self.eventLoop,
  258. errorDelegate: nil,
  259. remoteAddress: nil,
  260. logger: self.logger,
  261. allocator: ByteBufferAllocator(),
  262. responseWriter: NoOpResponseWriter()
  263. )
  264. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  265. // metadata. Send back headers to test this.
  266. assertThat(action, .is(.configure()))
  267. let sendAction = machine.send(headers: [:])
  268. assertThat(sendAction, .success(.contains(
  269. "grpc-accept-encoding",
  270. ["deflate", "identity", "gzip"]
  271. )))
  272. }
  273. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  274. var machine = StateMachine(services: self.services, encoding: .disabled)
  275. // Identity is always supported, even if compression is disabled.
  276. let action = machine.receive(
  277. headers: self.makeHeaders(encoding: .identity),
  278. eventLoop: self.eventLoop,
  279. errorDelegate: nil,
  280. remoteAddress: nil,
  281. logger: self.logger,
  282. allocator: ByteBufferAllocator(),
  283. responseWriter: NoOpResponseWriter()
  284. )
  285. assertThat(action, .is(.configure()))
  286. }
  287. func testReceiveHeadersNegotiatesResponseEncoding() {
  288. var machine = StateMachine(services: self.services, encoding: .enabled(.gzip, .deflate))
  289. let action = machine.receive(
  290. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  291. eventLoop: self.eventLoop,
  292. errorDelegate: nil,
  293. remoteAddress: nil,
  294. logger: self.logger,
  295. allocator: ByteBufferAllocator(),
  296. responseWriter: NoOpResponseWriter()
  297. )
  298. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  299. assertThat(action, .is(.configure()))
  300. let sendAction = machine.send(headers: [:])
  301. assertThat(sendAction, .success(.contains("grpc-encoding", ["deflate"])))
  302. }
  303. // MARK: Receive Data Tests
  304. func testReceiveDataBeforePipelineIsConfigured() {
  305. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  306. let buffer = self.makeLengthPrefixedBytes(1024)
  307. // Receive a request. The pipeline isn't configured so no action.
  308. var buffer1 = buffer
  309. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  310. assertThat(action1, .is(false))
  311. // Receive another request, still not configured so no action.
  312. var buffer2 = buffer
  313. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  314. assertThat(action2, .is(false))
  315. // Configure the pipeline. We'll have headers to forward and messages to read.
  316. let action3 = machine.pipelineConfigured()
  317. assertThat(action3, .is(.forwardHeadersThenRead()))
  318. // Do the first read.
  319. let action4 = machine.readNextRequest()
  320. assertThat(action4, .is(.forwardMessageThenRead()))
  321. // Do the second and final read.
  322. let action5 = machine.readNextRequest()
  323. assertThat(action5, .is(.forwardMessage()))
  324. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  325. // after receiving.
  326. var emptyBuffer = ByteBuffer()
  327. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  328. assertThat(action6, .is(true))
  329. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  330. let action7 = machine.readNextRequest()
  331. assertThat(action7, .is(.forwardEnd()))
  332. }
  333. func testReceiveDataWhenPipelineIsConfigured() {
  334. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  335. let buffer = self.makeLengthPrefixedBytes(1024)
  336. // Receive a request. The pipeline is configured, so we should try reading.
  337. var buffer1 = buffer
  338. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  339. assertThat(action1, .is(true))
  340. // Read the message, consuming all bytes.
  341. let action2 = machine.readNextRequest()
  342. assertThat(action2, .is(.forwardMessage()))
  343. // Receive another request, we'll split buffer into two parts.
  344. var buffer3 = buffer
  345. var buffer2 = buffer3.readSlice(length: 20)!
  346. // Not enough bytes to form a message, so read won't result in anything.
  347. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  348. assertThat(action4, .is(true))
  349. let action5 = machine.readNextRequest()
  350. assertThat(action5, .is(.none()))
  351. // Now the rest of the message.
  352. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  353. assertThat(action6, .is(true))
  354. let action7 = machine.readNextRequest()
  355. assertThat(action7, .is(.forwardMessage()))
  356. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  357. // after receiving.
  358. var emptyBuffer = ByteBuffer()
  359. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  360. assertThat(action8, .is(true))
  361. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  362. let action9 = machine.readNextRequest()
  363. assertThat(action9, .is(.forwardEnd()))
  364. }
  365. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  366. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  367. let buffer = self.makeLengthPrefixedBytes(1024)
  368. // No action: the pipeline isn't configured.
  369. var buffer1 = buffer
  370. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  371. assertThat(action1, .is(false))
  372. // Still no action.
  373. var buffer2 = buffer
  374. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  375. assertThat(action2, .is(false))
  376. // Configure the pipeline. We have headers to forward and messages to read.
  377. let action3 = machine.pipelineConfigured()
  378. assertThat(action3, .is(.forwardHeadersThenRead()))
  379. // Read the first message.
  380. let action4 = machine.readNextRequest()
  381. assertThat(action4, .is(.forwardMessageThenRead()))
  382. // Read the second and final message.
  383. let action5 = machine.readNextRequest()
  384. assertThat(action5, .is(.forwardMessageAndEnd()))
  385. }
  386. func testReceiveDataAfterPipelineIsConfigured() {
  387. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  388. let buffer = self.makeLengthPrefixedBytes(1024)
  389. // Pipeline is configured, we should be able to read then forward the message.
  390. var buffer1 = buffer
  391. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  392. assertThat(action1, .is(true))
  393. let action2 = machine.readNextRequest()
  394. assertThat(action2, .is(.forwardMessage()))
  395. // Receive another message with end stream set.
  396. // Still no action.
  397. var buffer2 = buffer
  398. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  399. assertThat(action3, .is(true))
  400. let action4 = machine.readNextRequest()
  401. assertThat(action4, .is(.forwardMessageAndEnd()))
  402. }
  403. func testReceiveDataWhenResponseStreamIsOpen() {
  404. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  405. let buffer = self.makeLengthPrefixedBytes(1024)
  406. // Receive a message. We should read and forward it.
  407. var buffer1 = buffer
  408. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  409. assertThat(action1, .is(true))
  410. let action2 = machine.readNextRequest()
  411. assertThat(action2, .is(.forwardMessage()))
  412. // Receive a message and end stream. We should read it then forward message and end.
  413. var buffer2 = buffer
  414. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  415. assertThat(action3, .is(true))
  416. let action4 = machine.readNextRequest()
  417. assertThat(action4, .is(.forwardMessageAndEnd()))
  418. }
  419. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  420. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  421. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  422. let action1 = machine.receive(buffer: &buffer, endStream: false)
  423. assertThat(action1, .is(true))
  424. let action2 = machine.readNextRequest()
  425. assertThat(action2, .is(.errorCaught()))
  426. }
  427. func testReceiveDataWhenClosed() {
  428. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  429. // Close while the request stream is still open.
  430. let action1 = machine.send(
  431. status: GRPCStatus(code: .ok, message: "ok"),
  432. trailers: [:]
  433. )
  434. assertThat(action1, .is(.success(.trailers(code: .ok, message: "ok"))))
  435. // Now receive end of request stream: no action, we're closed.
  436. var emptyBuffer = ByteBuffer()
  437. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  438. assertThat(action2, .is(false))
  439. }
  440. // MARK: Send Metadata Tests
  441. func testSendMetadataRequestStreamOpen() {
  442. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  443. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  444. // We'll just validate more 'normal' things here.
  445. let action1 = machine.send(headers: [:])
  446. assertThat(action1, .is(.success(.contains(":status", ["200"]))))
  447. let action2 = machine.send(headers: [:])
  448. assertThat(action2, .is(.failure()))
  449. }
  450. func testSendMetadataRequestStreamClosed() {
  451. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  452. var buffer = ByteBuffer()
  453. let action1 = machine.receive(buffer: &buffer, endStream: true)
  454. assertThat(action1, .is(true))
  455. let action2 = machine.readNextRequest()
  456. assertThat(action2, .is(.forwardEnd()))
  457. // Write some headers back.
  458. let action3 = machine.send(headers: [:])
  459. assertThat(action3, .is(.success(.contains(":status", ["200"]))))
  460. }
  461. func testSendMetadataWhenOpen() {
  462. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  463. // Response stream is already open.
  464. let action = machine.send(headers: [:])
  465. assertThat(action, .is(.failure()))
  466. }
  467. func testSendMetadataNormalizesUserProvidedMetadata() {
  468. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  469. let action = machine.send(headers: ["FOO": "bar"])
  470. assertThat(action, .success(.contains(caseSensitive: "foo")))
  471. }
  472. // MARK: Send Data Tests
  473. func testSendData() {
  474. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  475. let machine = self.makeStateMachine(state: startingState)
  476. let buffer = ByteBuffer(repeating: 0, count: 1024)
  477. // We should be able to do this multiple times.
  478. for _ in 0 ..< 5 {
  479. let action = machine.send(
  480. buffer: buffer,
  481. allocator: self.allocator,
  482. compress: false
  483. )
  484. assertThat(action, .is(.success()))
  485. }
  486. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  487. // write as normal.
  488. let action = machine.send(
  489. buffer: buffer,
  490. allocator: self.allocator,
  491. compress: true
  492. )
  493. assertThat(action, .is(.success()))
  494. }
  495. }
  496. func testSendDataAfterClose() {
  497. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  498. let action1 = machine.send(status: .ok, trailers: [:])
  499. assertThat(action1, .is(.success(.contains("grpc-status", ["0"]))))
  500. // We're already closed, this should fail.
  501. let buffer = ByteBuffer(repeating: 0, count: 1024)
  502. let action2 = machine.send(
  503. buffer: buffer,
  504. allocator: self.allocator,
  505. compress: false
  506. )
  507. assertThat(action2, .is(.failure()))
  508. }
  509. func testSendDataBeforeMetadata() {
  510. let machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  511. // Response stream is still idle, so this should fail.
  512. let buffer = ByteBuffer(repeating: 0, count: 1024)
  513. let action2 = machine.send(
  514. buffer: buffer,
  515. allocator: self.allocator,
  516. compress: false
  517. )
  518. assertThat(action2, .is(.failure()))
  519. }
  520. // MARK: Send End
  521. func testSendEndWhenResponseStreamIsIdle() {
  522. for state in [
  523. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  524. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  525. ] {
  526. var machine = self.makeStateMachine(state: state)
  527. let action1 = machine.send(status: .ok, trailers: [:])
  528. // This'll be a trailers-only response.
  529. assertThat(action1, .is(.success(.trailersOnly(code: .ok))))
  530. // Already closed.
  531. let action2 = machine.send(status: .ok, trailers: [:])
  532. assertThat(action2, .is(.failure()))
  533. }
  534. }
  535. func testSendEndWhenResponseStreamIsOpen() {
  536. for state in [
  537. DesiredState.requestOpenResponseOpen,
  538. DesiredState.requestClosedResponseOpen,
  539. ] {
  540. var machine = self.makeStateMachine(state: state)
  541. let action = machine.send(
  542. status: GRPCStatus(code: .ok, message: "ok"),
  543. trailers: [:]
  544. )
  545. assertThat(action, .is(.success(.trailers(code: .ok, message: "ok"))))
  546. // Already closed.
  547. let action2 = machine.send(status: .ok, trailers: [:])
  548. assertThat(action2, .is(.failure()))
  549. }
  550. }
  551. }
  552. extension ServerMessageEncoding {
  553. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  554. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  555. }
  556. }
  557. class NoOpResponseWriter: GRPCServerResponseWriter {
  558. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  559. promise?.succeed(())
  560. }
  561. func sendMessage(
  562. _ bytes: ByteBuffer,
  563. metadata: MessageMetadata,
  564. promise: EventLoopPromise<Void>?
  565. ) {
  566. promise?.succeed(())
  567. }
  568. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  569. promise?.succeed(())
  570. }
  571. }