HTTP2ToRawGRPCStateMachineTests.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  23. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  24. typealias State = StateMachine.State
  25. typealias Action = StateMachine.Action
  26. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  27. private var eventLoop: EventLoop {
  28. return EmbeddedEventLoop()
  29. }
  30. /// An allocator, just here for convenience.
  31. private let allocator = ByteBufferAllocator()
  32. private func makeHeaders(
  33. path: String = "/echo.Echo/Get",
  34. contentType: String?,
  35. encoding: String? = nil,
  36. acceptEncoding: [String]? = nil
  37. ) -> HPACKHeaders {
  38. var headers = HPACKHeaders()
  39. headers.add(name: ":path", value: path)
  40. if let contentType = contentType {
  41. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  42. }
  43. if let encoding = encoding {
  44. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  45. }
  46. if let acceptEncoding = acceptEncoding {
  47. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  48. }
  49. return headers
  50. }
  51. private func makeHeaders(
  52. path: String = "/echo.Echo/Get",
  53. contentType: ContentType? = .protobuf,
  54. encoding: CompressionAlgorithm? = nil,
  55. acceptEncoding: [CompressionAlgorithm]? = nil
  56. ) -> HPACKHeaders {
  57. return self.makeHeaders(
  58. path: path,
  59. contentType: contentType?.canonicalValue,
  60. encoding: encoding?.name,
  61. acceptEncoding: acceptEncoding?.map { $0.name }
  62. )
  63. }
  64. /// A minimum set of viable request headers for the service providers we register by default.
  65. private var viableHeaders: HPACKHeaders {
  66. return self.makeHeaders(
  67. path: "/echo.Echo/Get",
  68. contentType: "application/grpc"
  69. )
  70. }
  71. /// Just the echo service.
  72. private var services: [Substring: CallHandlerProvider] {
  73. let provider = EchoProvider()
  74. return [provider.serviceName: provider]
  75. }
  76. private enum DesiredState {
  77. case requestOpenResponseIdle(pipelineConfigured: Bool)
  78. case requestOpenResponseOpen
  79. case requestClosedResponseIdle(pipelineConfigured: Bool)
  80. case requestClosedResponseOpen
  81. }
  82. /// Makes a state machine in the desired state.
  83. private func makeStateMachine(
  84. services: [Substring: CallHandlerProvider]? = nil,
  85. encoding: ServerMessageEncoding = .disabled,
  86. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  87. ) -> StateMachine {
  88. var machine = StateMachine(services: services ?? self.services, encoding: encoding)
  89. let receiveHeadersAction = machine.receive(
  90. headers: self.viableHeaders,
  91. eventLoop: self.eventLoop,
  92. errorDelegate: nil,
  93. remoteAddress: nil,
  94. logger: self.logger,
  95. allocator: ByteBufferAllocator(),
  96. responseWriter: NoOpResponseWriter()
  97. )
  98. assertThat(receiveHeadersAction, .is(.configure()))
  99. switch state {
  100. case .requestOpenResponseIdle(pipelineConfigured: false):
  101. ()
  102. case .requestOpenResponseIdle(pipelineConfigured: true):
  103. let configuredAction = machine.pipelineConfigured()
  104. assertThat(configuredAction, .is(.forwardHeaders()))
  105. case .requestOpenResponseOpen:
  106. let configuredAction = machine.pipelineConfigured()
  107. assertThat(configuredAction, .is(.forwardHeaders()))
  108. let sendHeadersAction = machine.send(headers: [:], promise: nil)
  109. assertThat(sendHeadersAction, .is(.write(.headers())))
  110. case .requestClosedResponseIdle(pipelineConfigured: false):
  111. var emptyBuffer = ByteBuffer()
  112. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  113. assertThat(receiveEnd, .is(.none()))
  114. case .requestClosedResponseIdle(pipelineConfigured: true):
  115. let configuredAction = machine.pipelineConfigured()
  116. assertThat(configuredAction, .is(.forwardHeaders()))
  117. var emptyBuffer = ByteBuffer()
  118. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  119. assertThat(receiveEnd, .is(.readNextRequest()))
  120. case .requestClosedResponseOpen:
  121. let configuredAction = machine.pipelineConfigured()
  122. assertThat(configuredAction, .is(.forwardHeaders()))
  123. var emptyBuffer = ByteBuffer()
  124. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  125. assertThat(receiveEndAction, .is(.readNextRequest()))
  126. let readAction = machine.readNextRequest()
  127. assertThat(readAction, .is(.forwardEnd()))
  128. let sendHeadersAction = machine.send(headers: [:], promise: nil)
  129. assertThat(sendHeadersAction, .is(.write(.headers())))
  130. }
  131. return machine
  132. }
  133. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  134. /// message bytes (UInt8 ⨉ message length).
  135. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  136. var buffer = ByteBuffer()
  137. buffer.reserveCapacity(count + 5)
  138. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  139. buffer.writeInteger(UInt32(count))
  140. buffer.writeRepeatingByte(0, count: count)
  141. return buffer
  142. }
  143. // MARK: Receive Headers Tests
  144. func testReceiveValidHeaders() {
  145. var machine = StateMachine(services: self.services, encoding: .disabled)
  146. let action = machine.receive(
  147. headers: self.viableHeaders,
  148. eventLoop: self.eventLoop,
  149. errorDelegate: nil,
  150. remoteAddress: nil,
  151. logger: self.logger,
  152. allocator: ByteBufferAllocator(),
  153. responseWriter: NoOpResponseWriter()
  154. )
  155. assertThat(action, .is(.configure()))
  156. }
  157. func testReceiveInvalidContentType() {
  158. var machine = StateMachine(services: self.services, encoding: .disabled)
  159. let action = machine.receive(
  160. headers: self.makeHeaders(contentType: "application/json"),
  161. eventLoop: self.eventLoop,
  162. errorDelegate: nil,
  163. remoteAddress: nil,
  164. logger: self.logger,
  165. allocator: ByteBufferAllocator(),
  166. responseWriter: NoOpResponseWriter()
  167. )
  168. assertThat(
  169. action,
  170. .is(.write(.headers(.contains(":status", ["415"]), endStream: true), flush: true))
  171. )
  172. }
  173. func testReceiveValidHeadersForUnknownService() {
  174. var machine = StateMachine(services: self.services, encoding: .disabled)
  175. let action = machine.receive(
  176. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  177. eventLoop: self.eventLoop,
  178. errorDelegate: nil,
  179. remoteAddress: nil,
  180. logger: self.logger,
  181. allocator: ByteBufferAllocator(),
  182. responseWriter: NoOpResponseWriter()
  183. )
  184. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  185. }
  186. func testReceiveValidHeadersForUnknownMethod() {
  187. var machine = StateMachine(services: self.services, encoding: .disabled)
  188. let action = machine.receive(
  189. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  190. eventLoop: self.eventLoop,
  191. errorDelegate: nil,
  192. remoteAddress: nil,
  193. logger: self.logger,
  194. allocator: ByteBufferAllocator(),
  195. responseWriter: NoOpResponseWriter()
  196. )
  197. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  198. }
  199. func testReceiveValidHeadersForInvalidPath() {
  200. var machine = StateMachine(services: self.services, encoding: .disabled)
  201. let action = machine.receive(
  202. headers: self.makeHeaders(path: "nope"),
  203. eventLoop: self.eventLoop,
  204. errorDelegate: nil,
  205. remoteAddress: nil,
  206. logger: self.logger,
  207. allocator: ByteBufferAllocator(),
  208. responseWriter: NoOpResponseWriter()
  209. )
  210. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  211. }
  212. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  213. var machine = StateMachine(services: self.services, encoding: .disabled)
  214. let action = machine.receive(
  215. headers: self.makeHeaders(encoding: .gzip),
  216. eventLoop: self.eventLoop,
  217. errorDelegate: nil,
  218. remoteAddress: nil,
  219. logger: self.logger,
  220. allocator: ByteBufferAllocator(),
  221. responseWriter: NoOpResponseWriter()
  222. )
  223. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  224. }
  225. func testReceiveHeadersWithMultipleEncodings() {
  226. var machine = StateMachine(services: self.services, encoding: .disabled)
  227. // We can't have multiple encodings.
  228. let action = machine.receive(
  229. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  230. eventLoop: self.eventLoop,
  231. errorDelegate: nil,
  232. remoteAddress: nil,
  233. logger: self.logger,
  234. allocator: ByteBufferAllocator(),
  235. responseWriter: NoOpResponseWriter()
  236. )
  237. assertThat(action, .is(.write(.trailersOnly(code: .invalidArgument), flush: true)))
  238. }
  239. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  240. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  241. let action = machine.receive(
  242. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  243. eventLoop: self.eventLoop,
  244. errorDelegate: nil,
  245. remoteAddress: nil,
  246. logger: self.logger,
  247. allocator: ByteBufferAllocator(),
  248. responseWriter: NoOpResponseWriter()
  249. )
  250. assertThat(action, .is(.write(.trailersOnly(code: .unimplemented), flush: true)))
  251. assertThat(
  252. action,
  253. .is(.write(.headers(.contains("grpc-accept-encoding", ["deflate", "identity"])), flush: true))
  254. )
  255. }
  256. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  257. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  258. // We didn't advertise gzip, but we do support it.
  259. let action = machine.receive(
  260. headers: self.makeHeaders(encoding: .gzip),
  261. eventLoop: self.eventLoop,
  262. errorDelegate: nil,
  263. remoteAddress: nil,
  264. logger: self.logger,
  265. allocator: ByteBufferAllocator(),
  266. responseWriter: NoOpResponseWriter()
  267. )
  268. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  269. // metadata. Send back headers to test this.
  270. assertThat(action, .is(.configure()))
  271. let sendAction = machine.send(headers: [:], promise: nil)
  272. assertThat(sendAction, .write(.headers(.contains(
  273. "grpc-accept-encoding",
  274. ["deflate", "identity", "gzip"]
  275. ))))
  276. }
  277. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  278. var machine = StateMachine(services: self.services, encoding: .disabled)
  279. // Identity is always supported, even if compression is disabled.
  280. let action = machine.receive(
  281. headers: self.makeHeaders(encoding: .identity),
  282. eventLoop: self.eventLoop,
  283. errorDelegate: nil,
  284. remoteAddress: nil,
  285. logger: self.logger,
  286. allocator: ByteBufferAllocator(),
  287. responseWriter: NoOpResponseWriter()
  288. )
  289. assertThat(action, .is(.configure()))
  290. }
  291. func testReceiveHeadersNegotiatesResponseEncoding() {
  292. var machine = StateMachine(services: self.services, encoding: .enabled(.gzip, .deflate))
  293. let action = machine.receive(
  294. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  295. eventLoop: self.eventLoop,
  296. errorDelegate: nil,
  297. remoteAddress: nil,
  298. logger: self.logger,
  299. allocator: ByteBufferAllocator(),
  300. responseWriter: NoOpResponseWriter()
  301. )
  302. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  303. assertThat(action, .is(.configure()))
  304. let sendAction = machine.send(headers: [:], promise: nil)
  305. assertThat(sendAction, .write(.headers(.contains("grpc-encoding", ["deflate"]))))
  306. }
  307. // MARK: Receive Data Tests
  308. func testReceiveDataBeforePipelineIsConfigured() {
  309. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  310. let buffer = self.makeLengthPrefixedBytes(1024)
  311. // Receive a request. The pipeline isn't configured so no action.
  312. var buffer1 = buffer
  313. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  314. assertThat(action1, .is(.none()))
  315. // Receive another request, still not configured so no action.
  316. var buffer2 = buffer
  317. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  318. assertThat(action2, .is(.none()))
  319. // Configure the pipeline. We'll have headers to forward and messages to read.
  320. let action3 = machine.pipelineConfigured()
  321. assertThat(action3, .is(.forwardHeadersThenRead()))
  322. // Do the first read.
  323. let action4 = machine.readNextRequest()
  324. assertThat(action4, .is(.forwardMessageThenRead()))
  325. // Do the second and final read.
  326. let action5 = machine.readNextRequest()
  327. assertThat(action5, .is(.forwardMessage()))
  328. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  329. // after receiving.
  330. var emptyBuffer = ByteBuffer()
  331. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  332. assertThat(action6, .is(.readNextRequest()))
  333. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  334. let action7 = machine.readNextRequest()
  335. assertThat(action7, .is(.forwardEnd()))
  336. }
  337. func testReceiveDataWhenPipelineIsConfigured() {
  338. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  339. let buffer = self.makeLengthPrefixedBytes(1024)
  340. // Receive a request. The pipeline is configured, so we should try reading.
  341. var buffer1 = buffer
  342. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  343. assertThat(action1, .is(.readNextRequest()))
  344. // Read the message, consuming all bytes.
  345. let action2 = machine.readNextRequest()
  346. assertThat(action2, .is(.forwardMessage()))
  347. // Receive another request, we'll split buffer into two parts.
  348. var buffer3 = buffer
  349. var buffer2 = buffer3.readSlice(length: 20)!
  350. // Not enough bytes to form a message, so read won't result in anything.
  351. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  352. assertThat(action4, .is(.readNextRequest()))
  353. let action5 = machine.readNextRequest()
  354. assertThat(action5, .is(.none()))
  355. // Now the rest of the message.
  356. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  357. assertThat(action6, .is(.readNextRequest()))
  358. let action7 = machine.readNextRequest()
  359. assertThat(action7, .is(.forwardMessage()))
  360. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  361. // after receiving.
  362. var emptyBuffer = ByteBuffer()
  363. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  364. assertThat(action8, .is(.readNextRequest()))
  365. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  366. let action9 = machine.readNextRequest()
  367. assertThat(action9, .is(.forwardEnd()))
  368. }
  369. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  370. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  371. let buffer = self.makeLengthPrefixedBytes(1024)
  372. // No action: the pipeline isn't configured.
  373. var buffer1 = buffer
  374. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  375. assertThat(action1, .is(.none()))
  376. // Still no action.
  377. var buffer2 = buffer
  378. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  379. assertThat(action2, .is(.none()))
  380. // Configure the pipeline. We have headers to forward and messages to read.
  381. let action3 = machine.pipelineConfigured()
  382. assertThat(action3, .is(.forwardHeadersThenRead()))
  383. // Read the first message.
  384. let action4 = machine.readNextRequest()
  385. assertThat(action4, .is(.forwardMessageThenRead()))
  386. // Read the second and final message.
  387. let action5 = machine.readNextRequest()
  388. assertThat(action5, .is(.forwardMessageAndEnd()))
  389. }
  390. func testReceiveDataAfterPipelineIsConfigured() {
  391. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  392. let buffer = self.makeLengthPrefixedBytes(1024)
  393. // Pipeline is configured, we should be able to read then forward the message.
  394. var buffer1 = buffer
  395. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  396. assertThat(action1, .is(.readNextRequest()))
  397. let action2 = machine.readNextRequest()
  398. assertThat(action2, .is(.forwardMessage()))
  399. // Receive another message with end stream set.
  400. // Still no action.
  401. var buffer2 = buffer
  402. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  403. assertThat(action3, .is(.readNextRequest()))
  404. let action4 = machine.readNextRequest()
  405. assertThat(action4, .is(.forwardMessageAndEnd()))
  406. }
  407. func testReceiveDataWhenResponseStreamIsOpen() {
  408. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  409. let buffer = self.makeLengthPrefixedBytes(1024)
  410. // Receive a message. We should read and forward it.
  411. var buffer1 = buffer
  412. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  413. assertThat(action1, .is(.readNextRequest()))
  414. let action2 = machine.readNextRequest()
  415. assertThat(action2, .is(.forwardMessage()))
  416. // Receive a message and end stream. We should read it then forward message and end.
  417. var buffer2 = buffer
  418. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  419. assertThat(action3, .is(.readNextRequest()))
  420. let action4 = machine.readNextRequest()
  421. assertThat(action4, .is(.forwardMessageAndEnd()))
  422. }
  423. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  424. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  425. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  426. let action1 = machine.receive(buffer: &buffer, endStream: false)
  427. assertThat(action1, .is(.readNextRequest()))
  428. let action2 = machine.readNextRequest()
  429. assertThat(action2, .is(.errorCaught()))
  430. }
  431. func testReceiveDataWhenClosed() {
  432. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  433. // Close while the request stream is still open.
  434. let action1 = machine.send(
  435. status: GRPCStatus(code: .ok, message: "ok"),
  436. trailers: [:],
  437. promise: nil
  438. )
  439. assertThat(action1, .is(.write(.trailers(code: .ok, message: "ok"))))
  440. // Now receive end of request stream: no action, we're closed.
  441. var emptyBuffer = ByteBuffer()
  442. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  443. assertThat(action2, .is(.none()))
  444. }
  445. // MARK: Send Metadata Tests
  446. func testSendMetadataRequestStreamOpen() {
  447. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  448. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  449. // We'll just validate more 'normal' things here.
  450. let action1 = machine.send(headers: [:], promise: nil)
  451. assertThat(action1, .is(.write(.headers(.contains(":status", ["200"])))))
  452. let action2 = machine.send(headers: [:], promise: nil)
  453. assertThat(action2, .is(.completePromise(with: .failure())))
  454. }
  455. func testSendMetadataRequestStreamClosed() {
  456. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  457. var buffer = ByteBuffer()
  458. let action1 = machine.receive(buffer: &buffer, endStream: true)
  459. assertThat(action1, .is(.readNextRequest()))
  460. let action2 = machine.readNextRequest()
  461. assertThat(action2, .is(.forwardEnd()))
  462. // Write some headers back.
  463. let action3 = machine.send(headers: [:], promise: nil)
  464. assertThat(action3, .is(.write(.headers(.contains(":status", ["200"])))))
  465. }
  466. func testSendMetadataWhenOpen() {
  467. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  468. // Response stream is already open.
  469. let action = machine.send(headers: [:], promise: nil)
  470. assertThat(action, .is(.completePromise(with: .failure())))
  471. }
  472. func testSendMetadataNormalizesUserProvidedMetadata() {
  473. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  474. let action = machine.send(headers: ["FOO": "bar"], promise: nil)
  475. assertThat(action, .is(.write(.headers(.contains(caseSensitive: "foo")))))
  476. }
  477. // MARK: Send Data Tests
  478. func testSendData() {
  479. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  480. var machine = self.makeStateMachine(state: startingState)
  481. let buffer = ByteBuffer(repeating: 0, count: 1024)
  482. // We should be able to do this multiple times.
  483. for _ in 0 ..< 5 {
  484. let action = machine.send(
  485. buffer: buffer,
  486. allocator: self.allocator,
  487. compress: false,
  488. promise: nil
  489. )
  490. assertThat(action, .is(.write(.data(endStream: false))))
  491. }
  492. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  493. // write as normal.
  494. let action = machine.send(
  495. buffer: buffer,
  496. allocator: self.allocator,
  497. compress: true,
  498. promise: nil
  499. )
  500. assertThat(action, .is(.write(.data(endStream: false))))
  501. }
  502. }
  503. func testSendDataAfterClose() {
  504. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  505. let action1 = machine.send(status: .ok, trailers: [:], promise: nil)
  506. assertThat(action1, .is(.write(.headers(.contains("grpc-status", ["0"]), endStream: true))))
  507. // We're already closed, this should fail.
  508. let buffer = ByteBuffer(repeating: 0, count: 1024)
  509. let action2 = machine.send(
  510. buffer: buffer,
  511. allocator: self.allocator,
  512. compress: false,
  513. promise: nil
  514. )
  515. assertThat(action2, .is(.completePromise(with: .failure())))
  516. }
  517. func testSendDataBeforeMetadata() {
  518. var machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  519. // Response stream is still idle, so this should fail.
  520. let buffer = ByteBuffer(repeating: 0, count: 1024)
  521. let action2 = machine.send(
  522. buffer: buffer,
  523. allocator: self.allocator,
  524. compress: false,
  525. promise: nil
  526. )
  527. assertThat(action2, .is(.completePromise(with: .failure())))
  528. }
  529. // MARK: Send End
  530. func testSendEndWhenResponseStreamIsIdle() {
  531. for state in [
  532. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  533. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  534. ] {
  535. var machine = self.makeStateMachine(state: state)
  536. let action1 = machine.send(status: .ok, trailers: [:], promise: nil)
  537. // This'll be a trailers-only response.
  538. assertThat(action1, .is(.write(.trailersOnly(code: .ok))))
  539. // Already closed.
  540. let action2 = machine.send(status: .ok, trailers: [:], promise: nil)
  541. assertThat(action2, .is(.completePromise(with: .failure())))
  542. }
  543. }
  544. func testSendEndWhenResponseStreamIsOpen() {
  545. for state in [
  546. DesiredState.requestOpenResponseOpen,
  547. DesiredState.requestClosedResponseOpen,
  548. ] {
  549. var machine = self.makeStateMachine(state: state)
  550. let action = machine.send(
  551. status: GRPCStatus(code: .ok, message: "ok"),
  552. trailers: [:],
  553. promise: nil
  554. )
  555. assertThat(action, .is(.write(.trailers(code: .ok, message: "ok"))))
  556. // Already closed.
  557. let action2 = machine.send(status: .ok, trailers: [:], promise: nil)
  558. assertThat(action2, .is(.completePromise(with: .failure())))
  559. }
  560. }
  561. }
  562. extension ServerMessageEncoding {
  563. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  564. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  565. }
  566. }
  567. class NoOpResponseWriter: GRPCServerResponseWriter {
  568. func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  569. promise?.succeed(())
  570. }
  571. func sendMessage(
  572. _ bytes: ByteBuffer,
  573. metadata: MessageMetadata,
  574. promise: EventLoopPromise<Void>?
  575. ) {
  576. promise?.succeed(())
  577. }
  578. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  579. promise?.succeed(())
  580. }
  581. }