HTTP2ToRawGRPCStateMachineTests.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHPACK
  21. import NIOHTTP2
  22. import NIOPosix
  23. import XCTest
  24. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  25. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  26. typealias State = StateMachine.State
  27. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  28. private var eventLoop: EventLoop {
  29. return EmbeddedEventLoop()
  30. }
  31. /// An allocator, just here for convenience.
  32. private let allocator = ByteBufferAllocator()
  33. private func makeHeaders(
  34. path: String = "/echo.Echo/Get",
  35. contentType: String?,
  36. encoding: String? = nil,
  37. acceptEncoding: [String]? = nil
  38. ) -> HPACKHeaders {
  39. var headers = HPACKHeaders()
  40. headers.add(name: ":path", value: path)
  41. if let contentType = contentType {
  42. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  43. }
  44. if let encoding = encoding {
  45. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  46. }
  47. if let acceptEncoding = acceptEncoding {
  48. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  49. }
  50. return headers
  51. }
  52. private func makeHeaders(
  53. path: String = "/echo.Echo/Get",
  54. contentType: ContentType? = .protobuf,
  55. encoding: CompressionAlgorithm? = nil,
  56. acceptEncoding: [CompressionAlgorithm]? = nil
  57. ) -> HPACKHeaders {
  58. return self.makeHeaders(
  59. path: path,
  60. contentType: contentType?.canonicalValue,
  61. encoding: encoding?.name,
  62. acceptEncoding: acceptEncoding?.map { $0.name }
  63. )
  64. }
  65. /// A minimum set of viable request headers for the service providers we register by default.
  66. private var viableHeaders: HPACKHeaders {
  67. return self.makeHeaders(
  68. path: "/echo.Echo/Get",
  69. contentType: "application/grpc"
  70. )
  71. }
  72. /// Just the echo service.
  73. private var services: [Substring: CallHandlerProvider] {
  74. let provider = EchoProvider()
  75. return [provider.serviceName: provider]
  76. }
  77. private enum DesiredState {
  78. case requestOpenResponseIdle(pipelineConfigured: Bool)
  79. case requestOpenResponseOpen
  80. case requestClosedResponseIdle(pipelineConfigured: Bool)
  81. case requestClosedResponseOpen
  82. }
  83. /// Makes a state machine in the desired state.
  84. private func makeStateMachine(
  85. services: [Substring: CallHandlerProvider]? = nil,
  86. encoding: ServerMessageEncoding = .disabled,
  87. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  88. ) -> StateMachine {
  89. var machine = StateMachine()
  90. let receiveHeadersAction = machine.receive(
  91. headers: self.viableHeaders,
  92. eventLoop: self.eventLoop,
  93. errorDelegate: nil,
  94. remoteAddress: nil,
  95. logger: self.logger,
  96. allocator: ByteBufferAllocator(),
  97. responseWriter: NoOpResponseWriter(),
  98. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  99. services: services ?? self.services,
  100. encoding: encoding,
  101. normalizeHeaders: true
  102. )
  103. assertThat(receiveHeadersAction, .is(.configure()))
  104. switch state {
  105. case .requestOpenResponseIdle(pipelineConfigured: false):
  106. ()
  107. case .requestOpenResponseIdle(pipelineConfigured: true):
  108. let configuredAction = machine.pipelineConfigured()
  109. assertThat(configuredAction, .is(.forwardHeaders()))
  110. case .requestOpenResponseOpen:
  111. let configuredAction = machine.pipelineConfigured()
  112. assertThat(configuredAction, .is(.forwardHeaders()))
  113. let sendHeadersAction = machine.send(headers: [:])
  114. assertThat(sendHeadersAction, .is(.success()))
  115. case .requestClosedResponseIdle(pipelineConfigured: false):
  116. var emptyBuffer = ByteBuffer()
  117. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  118. assertThat(receiveEnd, .is(.nothing))
  119. case .requestClosedResponseIdle(pipelineConfigured: true):
  120. let configuredAction = machine.pipelineConfigured()
  121. assertThat(configuredAction, .is(.forwardHeaders()))
  122. var emptyBuffer = ByteBuffer()
  123. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  124. assertThat(receiveEnd, .is(.tryReading))
  125. case .requestClosedResponseOpen:
  126. let configuredAction = machine.pipelineConfigured()
  127. assertThat(configuredAction, .is(.forwardHeaders()))
  128. var emptyBuffer = ByteBuffer()
  129. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  130. assertThat(receiveEndAction, .is(.tryReading))
  131. let readAction = machine.readNextRequest()
  132. assertThat(readAction, .is(.forwardEnd()))
  133. let sendHeadersAction = machine.send(headers: [:])
  134. assertThat(sendHeadersAction, .is(.success()))
  135. }
  136. return machine
  137. }
  138. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  139. /// message bytes (UInt8 ⨉ message length).
  140. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  141. var buffer = ByteBuffer()
  142. buffer.reserveCapacity(count + 5)
  143. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  144. buffer.writeInteger(UInt32(count))
  145. buffer.writeRepeatingByte(0, count: count)
  146. return buffer
  147. }
  148. // MARK: Receive Headers Tests
  149. func testReceiveValidHeaders() {
  150. var machine = StateMachine()
  151. let action = machine.receive(
  152. headers: self.viableHeaders,
  153. eventLoop: self.eventLoop,
  154. errorDelegate: nil,
  155. remoteAddress: nil,
  156. logger: self.logger,
  157. allocator: ByteBufferAllocator(),
  158. responseWriter: NoOpResponseWriter(),
  159. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  160. services: self.services,
  161. encoding: .disabled,
  162. normalizeHeaders: false
  163. )
  164. assertThat(action, .is(.configure()))
  165. }
  166. func testReceiveInvalidContentType() {
  167. var machine = StateMachine()
  168. let action = machine.receive(
  169. headers: self.makeHeaders(contentType: "application/json"),
  170. eventLoop: self.eventLoop,
  171. errorDelegate: nil,
  172. remoteAddress: nil,
  173. logger: self.logger,
  174. allocator: ByteBufferAllocator(),
  175. responseWriter: NoOpResponseWriter(),
  176. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  177. services: self.services,
  178. encoding: .disabled,
  179. normalizeHeaders: false
  180. )
  181. assertThat(action, .is(.rejectRPC(.contains(":status", ["415"]))))
  182. }
  183. func testReceiveValidHeadersForUnknownService() {
  184. var machine = StateMachine()
  185. let action = machine.receive(
  186. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  187. eventLoop: self.eventLoop,
  188. errorDelegate: nil,
  189. remoteAddress: nil,
  190. logger: self.logger,
  191. allocator: ByteBufferAllocator(),
  192. responseWriter: NoOpResponseWriter(),
  193. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  194. services: self.services,
  195. encoding: .disabled,
  196. normalizeHeaders: false
  197. )
  198. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  199. }
  200. func testReceiveValidHeadersForUnknownMethod() {
  201. var machine = StateMachine()
  202. let action = machine.receive(
  203. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  204. eventLoop: self.eventLoop,
  205. errorDelegate: nil,
  206. remoteAddress: nil,
  207. logger: self.logger,
  208. allocator: ByteBufferAllocator(),
  209. responseWriter: NoOpResponseWriter(),
  210. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  211. services: self.services,
  212. encoding: .disabled,
  213. normalizeHeaders: false
  214. )
  215. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  216. }
  217. func testReceiveValidHeadersForInvalidPath() {
  218. var machine = StateMachine()
  219. let action = machine.receive(
  220. headers: self.makeHeaders(path: "nope"),
  221. eventLoop: self.eventLoop,
  222. errorDelegate: nil,
  223. remoteAddress: nil,
  224. logger: self.logger,
  225. allocator: ByteBufferAllocator(),
  226. responseWriter: NoOpResponseWriter(),
  227. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  228. services: self.services,
  229. encoding: .disabled,
  230. normalizeHeaders: false
  231. )
  232. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  233. }
  234. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  235. var machine = StateMachine()
  236. let action = machine.receive(
  237. headers: self.makeHeaders(encoding: .gzip),
  238. eventLoop: self.eventLoop,
  239. errorDelegate: nil,
  240. remoteAddress: nil,
  241. logger: self.logger,
  242. allocator: ByteBufferAllocator(),
  243. responseWriter: NoOpResponseWriter(),
  244. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  245. services: self.services,
  246. encoding: .disabled,
  247. normalizeHeaders: false
  248. )
  249. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  250. }
  251. func testReceiveHeadersWithMultipleEncodings() {
  252. var machine = StateMachine()
  253. // We can't have multiple encodings.
  254. let action = machine.receive(
  255. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  256. eventLoop: self.eventLoop,
  257. errorDelegate: nil,
  258. remoteAddress: nil,
  259. logger: self.logger,
  260. allocator: ByteBufferAllocator(),
  261. responseWriter: NoOpResponseWriter(),
  262. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  263. services: self.services,
  264. encoding: .disabled,
  265. normalizeHeaders: false
  266. )
  267. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .invalidArgument))))
  268. }
  269. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  270. var machine = StateMachine()
  271. let action = machine.receive(
  272. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  273. eventLoop: self.eventLoop,
  274. errorDelegate: nil,
  275. remoteAddress: nil,
  276. logger: self.logger,
  277. allocator: ByteBufferAllocator(),
  278. responseWriter: NoOpResponseWriter(),
  279. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  280. services: self.services,
  281. encoding: .enabled(.deflate, .identity),
  282. normalizeHeaders: false
  283. )
  284. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  285. assertThat(
  286. action,
  287. .is(.rejectRPC(.contains("grpc-accept-encoding", ["deflate", "identity"])))
  288. )
  289. }
  290. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  291. var machine = StateMachine()
  292. // We didn't advertise gzip, but we do support it.
  293. let action = machine.receive(
  294. headers: self.makeHeaders(encoding: .gzip),
  295. eventLoop: self.eventLoop,
  296. errorDelegate: nil,
  297. remoteAddress: nil,
  298. logger: self.logger,
  299. allocator: ByteBufferAllocator(),
  300. responseWriter: NoOpResponseWriter(),
  301. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  302. services: self.services,
  303. encoding: .enabled(.deflate, .identity),
  304. normalizeHeaders: false
  305. )
  306. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  307. // metadata. Send back headers to test this.
  308. assertThat(action, .is(.configure()))
  309. let sendAction = machine.send(headers: [:])
  310. assertThat(sendAction, .success(.contains(
  311. "grpc-accept-encoding",
  312. ["deflate", "identity", "gzip"]
  313. )))
  314. }
  315. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  316. var machine = StateMachine()
  317. // Identity is always supported, even if compression is disabled.
  318. let action = machine.receive(
  319. headers: self.makeHeaders(encoding: .identity),
  320. eventLoop: self.eventLoop,
  321. errorDelegate: nil,
  322. remoteAddress: nil,
  323. logger: self.logger,
  324. allocator: ByteBufferAllocator(),
  325. responseWriter: NoOpResponseWriter(),
  326. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  327. services: self.services,
  328. encoding: .disabled,
  329. normalizeHeaders: false
  330. )
  331. assertThat(action, .is(.configure()))
  332. }
  333. func testReceiveHeadersNegotiatesResponseEncoding() {
  334. var machine = StateMachine()
  335. let action = machine.receive(
  336. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  337. eventLoop: self.eventLoop,
  338. errorDelegate: nil,
  339. remoteAddress: nil,
  340. logger: self.logger,
  341. allocator: ByteBufferAllocator(),
  342. responseWriter: NoOpResponseWriter(),
  343. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  344. services: self.services,
  345. encoding: .enabled(.gzip, .deflate),
  346. normalizeHeaders: false
  347. )
  348. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  349. assertThat(action, .is(.configure()))
  350. let sendAction = machine.send(headers: [:])
  351. assertThat(sendAction, .success(.contains("grpc-encoding", ["deflate"])))
  352. }
  353. // MARK: Receive Data Tests
  354. func testReceiveDataBeforePipelineIsConfigured() {
  355. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  356. let buffer = self.makeLengthPrefixedBytes(1024)
  357. // Receive a request. The pipeline isn't configured so no action.
  358. var buffer1 = buffer
  359. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  360. assertThat(action1, .is(.nothing))
  361. // Receive another request, still not configured so no action.
  362. var buffer2 = buffer
  363. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  364. assertThat(action2, .is(.nothing))
  365. // Configure the pipeline. We'll have headers to forward and messages to read.
  366. let action3 = machine.pipelineConfigured()
  367. assertThat(action3, .is(.forwardHeadersThenRead()))
  368. // Do the first read.
  369. let action4 = machine.readNextRequest()
  370. assertThat(action4, .is(.forwardMessageThenRead()))
  371. // Do the second and final read.
  372. let action5 = machine.readNextRequest()
  373. assertThat(action5, .is(.forwardMessage()))
  374. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  375. // after receiving.
  376. var emptyBuffer = ByteBuffer()
  377. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  378. assertThat(action6, .is(.tryReading))
  379. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  380. let action7 = machine.readNextRequest()
  381. assertThat(action7, .is(.forwardEnd()))
  382. }
  383. func testReceiveDataWhenPipelineIsConfigured() {
  384. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  385. let buffer = self.makeLengthPrefixedBytes(1024)
  386. // Receive a request. The pipeline is configured, so we should try reading.
  387. var buffer1 = buffer
  388. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  389. assertThat(action1, .is(.tryReading))
  390. // Read the message, consuming all bytes.
  391. let action2 = machine.readNextRequest()
  392. assertThat(action2, .is(.forwardMessage()))
  393. // Receive another request, we'll split buffer into two parts.
  394. var buffer3 = buffer
  395. var buffer2 = buffer3.readSlice(length: 20)!
  396. // Not enough bytes to form a message, so read won't result in anything.
  397. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  398. assertThat(action4, .is(.tryReading))
  399. let action5 = machine.readNextRequest()
  400. assertThat(action5, .is(.none()))
  401. // Now the rest of the message.
  402. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  403. assertThat(action6, .is(.tryReading))
  404. let action7 = machine.readNextRequest()
  405. assertThat(action7, .is(.forwardMessage()))
  406. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  407. // after receiving.
  408. var emptyBuffer = ByteBuffer()
  409. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  410. assertThat(action8, .is(.tryReading))
  411. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  412. let action9 = machine.readNextRequest()
  413. assertThat(action9, .is(.forwardEnd()))
  414. }
  415. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  416. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  417. let buffer = self.makeLengthPrefixedBytes(1024)
  418. // No action: the pipeline isn't configured.
  419. var buffer1 = buffer
  420. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  421. assertThat(action1, .is(.nothing))
  422. // Still no action.
  423. var buffer2 = buffer
  424. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  425. assertThat(action2, .is(.nothing))
  426. // Configure the pipeline. We have headers to forward and messages to read.
  427. let action3 = machine.pipelineConfigured()
  428. assertThat(action3, .is(.forwardHeadersThenRead()))
  429. // Read the first message.
  430. let action4 = machine.readNextRequest()
  431. assertThat(action4, .is(.forwardMessageThenRead()))
  432. // Read the second and final message.
  433. let action5 = machine.readNextRequest()
  434. assertThat(action5, .is(.forwardMessageThenRead()))
  435. let action6 = machine.readNextRequest()
  436. assertThat(action6, .is(.forwardEnd()))
  437. }
  438. func testReceiveDataAfterPipelineIsConfigured() {
  439. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  440. let buffer = self.makeLengthPrefixedBytes(1024)
  441. // Pipeline is configured, we should be able to read then forward the message.
  442. var buffer1 = buffer
  443. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  444. assertThat(action1, .is(.tryReading))
  445. let action2 = machine.readNextRequest()
  446. assertThat(action2, .is(.forwardMessage()))
  447. // Receive another message with end stream set.
  448. // Still no action.
  449. var buffer2 = buffer
  450. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  451. assertThat(action3, .is(.tryReading))
  452. let action4 = machine.readNextRequest()
  453. assertThat(action4, .is(.forwardMessageThenRead()))
  454. let action5 = machine.readNextRequest()
  455. assertThat(action5, .is(.forwardEnd()))
  456. }
  457. func testReceiveDataWhenResponseStreamIsOpen() {
  458. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  459. let buffer = self.makeLengthPrefixedBytes(1024)
  460. // Receive a message. We should read and forward it.
  461. var buffer1 = buffer
  462. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  463. assertThat(action1, .is(.tryReading))
  464. let action2 = machine.readNextRequest()
  465. assertThat(action2, .is(.forwardMessage()))
  466. // Receive a message and end stream. We should read it then forward message and end.
  467. var buffer2 = buffer
  468. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  469. assertThat(action3, .is(.tryReading))
  470. let action4 = machine.readNextRequest()
  471. assertThat(action4, .is(.forwardMessageThenRead()))
  472. let action5 = machine.readNextRequest()
  473. assertThat(action5, .is(.forwardEnd()))
  474. }
  475. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  476. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  477. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  478. let action1 = machine.receive(buffer: &buffer, endStream: false)
  479. assertThat(action1, .is(.tryReading))
  480. let action2 = machine.readNextRequest()
  481. assertThat(action2, .is(.errorCaught()))
  482. }
  483. func testReceiveDataWhenClosed() {
  484. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  485. // Close while the request stream is still open.
  486. let action1 = machine.send(
  487. status: GRPCStatus(code: .ok, message: "ok"),
  488. trailers: [:]
  489. )
  490. assertThat(action1, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
  491. // Now receive end of request stream: tear down the handler, we're closed
  492. var emptyBuffer = ByteBuffer()
  493. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  494. assertThat(action2, .is(.finishHandler))
  495. }
  496. // MARK: Send Metadata Tests
  497. func testSendMetadataRequestStreamOpen() {
  498. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  499. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  500. // We'll just validate more 'normal' things here.
  501. let action1 = machine.send(headers: [:])
  502. assertThat(action1, .is(.success(.contains(":status", ["200"]))))
  503. let action2 = machine.send(headers: [:])
  504. assertThat(action2, .is(.failure()))
  505. }
  506. func testSendMetadataRequestStreamClosed() {
  507. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  508. var buffer = ByteBuffer()
  509. let action1 = machine.receive(buffer: &buffer, endStream: true)
  510. assertThat(action1, .is(.tryReading))
  511. let action2 = machine.readNextRequest()
  512. assertThat(action2, .is(.forwardEnd()))
  513. // Write some headers back.
  514. let action3 = machine.send(headers: [:])
  515. assertThat(action3, .is(.success(.contains(":status", ["200"]))))
  516. }
  517. func testSendMetadataWhenOpen() {
  518. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  519. // Response stream is already open.
  520. let action = machine.send(headers: [:])
  521. assertThat(action, .is(.failure()))
  522. }
  523. func testSendMetadataNormalizesUserProvidedMetadata() {
  524. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  525. let action = machine.send(headers: ["FOO": "bar"])
  526. assertThat(action, .success(.contains(caseSensitive: "foo")))
  527. }
  528. // MARK: Send Data Tests
  529. func testSendData() {
  530. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  531. var machine = self.makeStateMachine(state: startingState)
  532. let buffer = ByteBuffer(repeating: 0, count: 1024)
  533. // We should be able to do this multiple times.
  534. for _ in 0 ..< 5 {
  535. let action = machine.send(
  536. buffer: buffer,
  537. compress: false,
  538. promise: nil
  539. )
  540. assertThat(action, .is(.success()))
  541. }
  542. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  543. // write as normal.
  544. let action = machine.send(
  545. buffer: buffer,
  546. compress: true,
  547. promise: nil
  548. )
  549. assertThat(action, .is(.success()))
  550. }
  551. }
  552. func testSendDataAfterClose() {
  553. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  554. let action1 = machine.send(status: .ok, trailers: [:])
  555. assertThat(action1, .is(.sendTrailersAndFinish(.contains("grpc-status", ["0"]))))
  556. // We're already closed, this should fail.
  557. let buffer = ByteBuffer(repeating: 0, count: 1024)
  558. let action2 = machine.send(
  559. buffer: buffer,
  560. compress: false,
  561. promise: nil
  562. )
  563. assertThat(action2, .is(.failure()))
  564. }
  565. func testSendDataBeforeMetadata() {
  566. var machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  567. // Response stream is still idle, so this should fail.
  568. let buffer = ByteBuffer(repeating: 0, count: 1024)
  569. let action2 = machine.send(
  570. buffer: buffer,
  571. compress: false,
  572. promise: nil
  573. )
  574. assertThat(action2, .is(.failure()))
  575. }
  576. // MARK: Next Response
  577. func testNextResponseBeforeMetadata() {
  578. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  579. XCTAssertNil(machine.nextResponse())
  580. }
  581. func testNextResponseWhenOpen() throws {
  582. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  583. var machine = self.makeStateMachine(state: startingState)
  584. // No response buffered yet.
  585. XCTAssertNil(machine.nextResponse())
  586. let buffer = ByteBuffer(repeating: 0, count: 1024)
  587. machine.send(buffer: buffer, compress: false, promise: nil).assertSuccess()
  588. let (framedBuffer, promise) = try XCTUnwrap(machine.nextResponse())
  589. XCTAssertNil(promise) // Didn't provide a promise.
  590. framedBuffer.assertSuccess()
  591. // No more responses.
  592. XCTAssertNil(machine.nextResponse())
  593. }
  594. }
  595. func testNextResponseWhenClosed() throws {
  596. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  597. let action = machine.send(status: .ok, trailers: [:])
  598. switch action {
  599. case .sendTrailersAndFinish:
  600. ()
  601. default:
  602. XCTFail("Expected 'sendTrailersAndFinish' but got \(action)")
  603. }
  604. XCTAssertNil(machine.nextResponse())
  605. }
  606. // MARK: Send End
  607. func testSendEndWhenResponseStreamIsIdle() {
  608. for (state, closed) in zip([
  609. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  610. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  611. ], [false, true]) {
  612. var machine = self.makeStateMachine(state: state)
  613. let action1 = machine.send(status: .ok, trailers: [:])
  614. // This'll be a trailers-only response.
  615. if closed {
  616. assertThat(action1, .is(.sendTrailersAndFinish(.trailersOnly(code: .ok))))
  617. } else {
  618. assertThat(action1, .is(.sendTrailers(.trailersOnly(code: .ok))))
  619. }
  620. // Already closed.
  621. let action2 = machine.send(status: .ok, trailers: [:])
  622. assertThat(action2, .is(.failure()))
  623. }
  624. }
  625. func testSendEndWhenResponseStreamIsOpen() {
  626. for (state, closed) in zip([
  627. DesiredState.requestOpenResponseOpen,
  628. DesiredState.requestClosedResponseOpen,
  629. ], [false, true]) {
  630. var machine = self.makeStateMachine(state: state)
  631. let action = machine.send(
  632. status: GRPCStatus(code: .ok, message: "ok"),
  633. trailers: [:]
  634. )
  635. if closed {
  636. assertThat(action, .is(.sendTrailersAndFinish(.trailers(code: .ok, message: "ok"))))
  637. } else {
  638. assertThat(action, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
  639. }
  640. // Already closed.
  641. let action2 = machine.send(status: .ok, trailers: [:])
  642. assertThat(action2, .is(.failure()))
  643. }
  644. }
  645. }
  646. extension ServerMessageEncoding {
  647. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  648. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  649. }
  650. }
  651. class NoOpResponseWriter: GRPCServerResponseWriter {
  652. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  653. promise?.succeed(())
  654. }
  655. func sendMessage(
  656. _ bytes: ByteBuffer,
  657. metadata: MessageMetadata,
  658. promise: EventLoopPromise<Void>?
  659. ) {
  660. promise?.succeed(())
  661. }
  662. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  663. promise?.succeed(())
  664. }
  665. }
  666. extension HTTP2ToRawGRPCStateMachine {
  667. fileprivate mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  668. return self.readNextRequest(maxLength: .max)
  669. }
  670. }