HTTP2ToRawGRPCStateMachineTests.swift 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import NIOPosix
  22. import XCTest
  23. @testable import GRPC
  24. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  25. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  26. typealias State = StateMachine.State
  27. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  28. private var eventLoop: EventLoop {
  29. return EmbeddedEventLoop()
  30. }
  31. /// An allocator, just here for convenience.
  32. private let allocator = ByteBufferAllocator()
  33. private func makeHeaders(
  34. path: String = "/echo.Echo/Get",
  35. contentType: String?,
  36. encoding: String? = nil,
  37. acceptEncoding: [String]? = nil
  38. ) -> HPACKHeaders {
  39. var headers = HPACKHeaders()
  40. headers.add(name: ":path", value: path)
  41. if let contentType = contentType {
  42. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  43. }
  44. if let encoding = encoding {
  45. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  46. }
  47. if let acceptEncoding = acceptEncoding {
  48. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  49. }
  50. return headers
  51. }
  52. private func makeHeaders(
  53. path: String = "/echo.Echo/Get",
  54. contentType: ContentType? = .protobuf,
  55. encoding: CompressionAlgorithm? = nil,
  56. acceptEncoding: [CompressionAlgorithm]? = nil
  57. ) -> HPACKHeaders {
  58. return self.makeHeaders(
  59. path: path,
  60. contentType: contentType?.canonicalValue,
  61. encoding: encoding?.name,
  62. acceptEncoding: acceptEncoding?.map { $0.name }
  63. )
  64. }
  65. /// A minimum set of viable request headers for the service providers we register by default.
  66. private var viableHeaders: HPACKHeaders {
  67. return self.makeHeaders(
  68. path: "/echo.Echo/Get",
  69. contentType: "application/grpc"
  70. )
  71. }
  72. /// Just the echo service.
  73. private var services: [Substring: CallHandlerProvider] {
  74. let provider = EchoProvider()
  75. return [provider.serviceName: provider]
  76. }
  77. private enum DesiredState {
  78. case requestOpenResponseIdle(pipelineConfigured: Bool)
  79. case requestOpenResponseOpen
  80. case requestClosedResponseIdle(pipelineConfigured: Bool)
  81. case requestClosedResponseOpen
  82. }
  83. /// Makes a state machine in the desired state.
  84. private func makeStateMachine(
  85. services: [Substring: CallHandlerProvider]? = nil,
  86. encoding: ServerMessageEncoding = .disabled,
  87. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  88. ) -> StateMachine {
  89. var machine = StateMachine()
  90. let receiveHeadersAction = machine.receive(
  91. headers: self.viableHeaders,
  92. eventLoop: self.eventLoop,
  93. errorDelegate: nil,
  94. remoteAddress: nil,
  95. logger: self.logger,
  96. allocator: ByteBufferAllocator(),
  97. responseWriter: NoOpResponseWriter(),
  98. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  99. services: services ?? self.services,
  100. encoding: encoding,
  101. normalizeHeaders: true
  102. )
  103. assertThat(receiveHeadersAction, .is(.configure()))
  104. switch state {
  105. case .requestOpenResponseIdle(pipelineConfigured: false):
  106. ()
  107. case .requestOpenResponseIdle(pipelineConfigured: true):
  108. let configuredAction = machine.pipelineConfigured()
  109. assertThat(configuredAction, .is(.forwardHeaders()))
  110. case .requestOpenResponseOpen:
  111. let configuredAction = machine.pipelineConfigured()
  112. assertThat(configuredAction, .is(.forwardHeaders()))
  113. let sendHeadersAction = machine.send(headers: [:])
  114. assertThat(sendHeadersAction, .is(.success()))
  115. case .requestClosedResponseIdle(pipelineConfigured: false):
  116. var emptyBuffer = ByteBuffer()
  117. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  118. assertThat(receiveEnd, .is(.nothing))
  119. case .requestClosedResponseIdle(pipelineConfigured: true):
  120. let configuredAction = machine.pipelineConfigured()
  121. assertThat(configuredAction, .is(.forwardHeaders()))
  122. var emptyBuffer = ByteBuffer()
  123. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  124. assertThat(receiveEnd, .is(.tryReading))
  125. case .requestClosedResponseOpen:
  126. let configuredAction = machine.pipelineConfigured()
  127. assertThat(configuredAction, .is(.forwardHeaders()))
  128. var emptyBuffer = ByteBuffer()
  129. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  130. assertThat(receiveEndAction, .is(.tryReading))
  131. let readAction = machine.readNextRequest()
  132. assertThat(readAction, .is(.forwardEnd()))
  133. let sendHeadersAction = machine.send(headers: [:])
  134. assertThat(sendHeadersAction, .is(.success()))
  135. }
  136. return machine
  137. }
  138. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  139. /// message bytes (UInt8 ⨉ message length).
  140. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  141. var buffer = ByteBuffer()
  142. buffer.reserveCapacity(count + 5)
  143. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  144. buffer.writeInteger(UInt32(count))
  145. buffer.writeRepeatingByte(0, count: count)
  146. return buffer
  147. }
  148. // MARK: Receive Headers Tests
  149. func testReceiveValidHeaders() {
  150. var machine = StateMachine()
  151. let action = machine.receive(
  152. headers: self.viableHeaders,
  153. eventLoop: self.eventLoop,
  154. errorDelegate: nil,
  155. remoteAddress: nil,
  156. logger: self.logger,
  157. allocator: ByteBufferAllocator(),
  158. responseWriter: NoOpResponseWriter(),
  159. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  160. services: self.services,
  161. encoding: .disabled,
  162. normalizeHeaders: false
  163. )
  164. assertThat(action, .is(.configure()))
  165. }
  166. func testReceiveInvalidContentType() {
  167. var machine = StateMachine()
  168. let action = machine.receive(
  169. headers: self.makeHeaders(contentType: "application/json"),
  170. eventLoop: self.eventLoop,
  171. errorDelegate: nil,
  172. remoteAddress: nil,
  173. logger: self.logger,
  174. allocator: ByteBufferAllocator(),
  175. responseWriter: NoOpResponseWriter(),
  176. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  177. services: self.services,
  178. encoding: .disabled,
  179. normalizeHeaders: false
  180. )
  181. assertThat(action, .is(.rejectRPC(.contains(":status", ["415"]))))
  182. }
  183. func testReceiveValidHeadersForUnknownService() {
  184. var machine = StateMachine()
  185. let action = machine.receive(
  186. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  187. eventLoop: self.eventLoop,
  188. errorDelegate: nil,
  189. remoteAddress: nil,
  190. logger: self.logger,
  191. allocator: ByteBufferAllocator(),
  192. responseWriter: NoOpResponseWriter(),
  193. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  194. services: self.services,
  195. encoding: .disabled,
  196. normalizeHeaders: false
  197. )
  198. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  199. }
  200. func testReceiveValidHeadersForUnknownMethod() {
  201. var machine = StateMachine()
  202. let action = machine.receive(
  203. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  204. eventLoop: self.eventLoop,
  205. errorDelegate: nil,
  206. remoteAddress: nil,
  207. logger: self.logger,
  208. allocator: ByteBufferAllocator(),
  209. responseWriter: NoOpResponseWriter(),
  210. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  211. services: self.services,
  212. encoding: .disabled,
  213. normalizeHeaders: false
  214. )
  215. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  216. }
  217. func testReceiveValidHeadersForInvalidPath() {
  218. var machine = StateMachine()
  219. let action = machine.receive(
  220. headers: self.makeHeaders(path: "nope"),
  221. eventLoop: self.eventLoop,
  222. errorDelegate: nil,
  223. remoteAddress: nil,
  224. logger: self.logger,
  225. allocator: ByteBufferAllocator(),
  226. responseWriter: NoOpResponseWriter(),
  227. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  228. services: self.services,
  229. encoding: .disabled,
  230. normalizeHeaders: false
  231. )
  232. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  233. }
  234. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  235. var machine = StateMachine()
  236. let action = machine.receive(
  237. headers: self.makeHeaders(encoding: .gzip),
  238. eventLoop: self.eventLoop,
  239. errorDelegate: nil,
  240. remoteAddress: nil,
  241. logger: self.logger,
  242. allocator: ByteBufferAllocator(),
  243. responseWriter: NoOpResponseWriter(),
  244. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  245. services: self.services,
  246. encoding: .disabled,
  247. normalizeHeaders: false
  248. )
  249. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  250. }
  251. func testReceiveHeadersWithMultipleEncodings() {
  252. var machine = StateMachine()
  253. // We can't have multiple encodings.
  254. let action = machine.receive(
  255. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  256. eventLoop: self.eventLoop,
  257. errorDelegate: nil,
  258. remoteAddress: nil,
  259. logger: self.logger,
  260. allocator: ByteBufferAllocator(),
  261. responseWriter: NoOpResponseWriter(),
  262. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  263. services: self.services,
  264. encoding: .disabled,
  265. normalizeHeaders: false
  266. )
  267. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .invalidArgument))))
  268. }
  269. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  270. var machine = StateMachine()
  271. let action = machine.receive(
  272. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  273. eventLoop: self.eventLoop,
  274. errorDelegate: nil,
  275. remoteAddress: nil,
  276. logger: self.logger,
  277. allocator: ByteBufferAllocator(),
  278. responseWriter: NoOpResponseWriter(),
  279. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  280. services: self.services,
  281. encoding: .enabled(.deflate, .identity),
  282. normalizeHeaders: false
  283. )
  284. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  285. assertThat(
  286. action,
  287. .is(.rejectRPC(.contains("grpc-accept-encoding", ["deflate", "identity"])))
  288. )
  289. }
  290. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  291. var machine = StateMachine()
  292. // We didn't advertise gzip, but we do support it.
  293. let action = machine.receive(
  294. headers: self.makeHeaders(encoding: .gzip),
  295. eventLoop: self.eventLoop,
  296. errorDelegate: nil,
  297. remoteAddress: nil,
  298. logger: self.logger,
  299. allocator: ByteBufferAllocator(),
  300. responseWriter: NoOpResponseWriter(),
  301. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  302. services: self.services,
  303. encoding: .enabled(.deflate, .identity),
  304. normalizeHeaders: false
  305. )
  306. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  307. // metadata. Send back headers to test this.
  308. assertThat(action, .is(.configure()))
  309. let sendAction = machine.send(headers: [:])
  310. assertThat(
  311. sendAction,
  312. .success(
  313. .contains(
  314. "grpc-accept-encoding",
  315. ["deflate", "identity", "gzip"]
  316. )
  317. )
  318. )
  319. }
  320. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  321. var machine = StateMachine()
  322. // Identity is always supported, even if compression is disabled.
  323. let action = machine.receive(
  324. headers: self.makeHeaders(encoding: .identity),
  325. eventLoop: self.eventLoop,
  326. errorDelegate: nil,
  327. remoteAddress: nil,
  328. logger: self.logger,
  329. allocator: ByteBufferAllocator(),
  330. responseWriter: NoOpResponseWriter(),
  331. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  332. services: self.services,
  333. encoding: .disabled,
  334. normalizeHeaders: false
  335. )
  336. assertThat(action, .is(.configure()))
  337. }
  338. func testReceiveHeadersNegotiatesResponseEncoding() {
  339. var machine = StateMachine()
  340. let action = machine.receive(
  341. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  342. eventLoop: self.eventLoop,
  343. errorDelegate: nil,
  344. remoteAddress: nil,
  345. logger: self.logger,
  346. allocator: ByteBufferAllocator(),
  347. responseWriter: NoOpResponseWriter(),
  348. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  349. services: self.services,
  350. encoding: .enabled(.gzip, .deflate),
  351. normalizeHeaders: false
  352. )
  353. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  354. assertThat(action, .is(.configure()))
  355. let sendAction = machine.send(headers: [:])
  356. assertThat(sendAction, .success(.contains("grpc-encoding", ["deflate"])))
  357. }
  358. // MARK: Receive Data Tests
  359. func testReceiveDataBeforePipelineIsConfigured() {
  360. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  361. let buffer = self.makeLengthPrefixedBytes(1024)
  362. // Receive a request. The pipeline isn't configured so no action.
  363. var buffer1 = buffer
  364. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  365. assertThat(action1, .is(.nothing))
  366. // Receive another request, still not configured so no action.
  367. var buffer2 = buffer
  368. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  369. assertThat(action2, .is(.nothing))
  370. // Configure the pipeline. We'll have headers to forward and messages to read.
  371. let action3 = machine.pipelineConfigured()
  372. assertThat(action3, .is(.forwardHeadersThenRead()))
  373. // Do the first read.
  374. let action4 = machine.readNextRequest()
  375. assertThat(action4, .is(.forwardMessageThenRead()))
  376. // Do the second and final read.
  377. let action5 = machine.readNextRequest()
  378. assertThat(action5, .is(.forwardMessage()))
  379. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  380. // after receiving.
  381. var emptyBuffer = ByteBuffer()
  382. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  383. assertThat(action6, .is(.tryReading))
  384. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  385. let action7 = machine.readNextRequest()
  386. assertThat(action7, .is(.forwardEnd()))
  387. }
  388. func testReceiveDataWhenPipelineIsConfigured() {
  389. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  390. let buffer = self.makeLengthPrefixedBytes(1024)
  391. // Receive a request. The pipeline is configured, so we should try reading.
  392. var buffer1 = buffer
  393. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  394. assertThat(action1, .is(.tryReading))
  395. // Read the message, consuming all bytes.
  396. let action2 = machine.readNextRequest()
  397. assertThat(action2, .is(.forwardMessage()))
  398. // Receive another request, we'll split buffer into two parts.
  399. var buffer3 = buffer
  400. var buffer2 = buffer3.readSlice(length: 20)!
  401. // Not enough bytes to form a message, so read won't result in anything.
  402. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  403. assertThat(action4, .is(.tryReading))
  404. let action5 = machine.readNextRequest()
  405. assertThat(action5, .is(.none()))
  406. // Now the rest of the message.
  407. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  408. assertThat(action6, .is(.tryReading))
  409. let action7 = machine.readNextRequest()
  410. assertThat(action7, .is(.forwardMessage()))
  411. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  412. // after receiving.
  413. var emptyBuffer = ByteBuffer()
  414. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  415. assertThat(action8, .is(.tryReading))
  416. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  417. let action9 = machine.readNextRequest()
  418. assertThat(action9, .is(.forwardEnd()))
  419. }
  420. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  421. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  422. let buffer = self.makeLengthPrefixedBytes(1024)
  423. // No action: the pipeline isn't configured.
  424. var buffer1 = buffer
  425. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  426. assertThat(action1, .is(.nothing))
  427. // Still no action.
  428. var buffer2 = buffer
  429. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  430. assertThat(action2, .is(.nothing))
  431. // Configure the pipeline. We have headers to forward and messages to read.
  432. let action3 = machine.pipelineConfigured()
  433. assertThat(action3, .is(.forwardHeadersThenRead()))
  434. // Read the first message.
  435. let action4 = machine.readNextRequest()
  436. assertThat(action4, .is(.forwardMessageThenRead()))
  437. // Read the second and final message.
  438. let action5 = machine.readNextRequest()
  439. assertThat(action5, .is(.forwardMessageThenRead()))
  440. let action6 = machine.readNextRequest()
  441. assertThat(action6, .is(.forwardEnd()))
  442. }
  443. func testReceiveDataAfterPipelineIsConfigured() {
  444. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  445. let buffer = self.makeLengthPrefixedBytes(1024)
  446. // Pipeline is configured, we should be able to read then forward the message.
  447. var buffer1 = buffer
  448. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  449. assertThat(action1, .is(.tryReading))
  450. let action2 = machine.readNextRequest()
  451. assertThat(action2, .is(.forwardMessage()))
  452. // Receive another message with end stream set.
  453. // Still no action.
  454. var buffer2 = buffer
  455. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  456. assertThat(action3, .is(.tryReading))
  457. let action4 = machine.readNextRequest()
  458. assertThat(action4, .is(.forwardMessageThenRead()))
  459. let action5 = machine.readNextRequest()
  460. assertThat(action5, .is(.forwardEnd()))
  461. }
  462. func testReceiveDataWhenResponseStreamIsOpen() {
  463. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  464. let buffer = self.makeLengthPrefixedBytes(1024)
  465. // Receive a message. We should read and forward it.
  466. var buffer1 = buffer
  467. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  468. assertThat(action1, .is(.tryReading))
  469. let action2 = machine.readNextRequest()
  470. assertThat(action2, .is(.forwardMessage()))
  471. // Receive a message and end stream. We should read it then forward message and end.
  472. var buffer2 = buffer
  473. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  474. assertThat(action3, .is(.tryReading))
  475. let action4 = machine.readNextRequest()
  476. assertThat(action4, .is(.forwardMessageThenRead()))
  477. let action5 = machine.readNextRequest()
  478. assertThat(action5, .is(.forwardEnd()))
  479. }
  480. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  481. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  482. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  483. let action1 = machine.receive(buffer: &buffer, endStream: false)
  484. assertThat(action1, .is(.tryReading))
  485. let action2 = machine.readNextRequest()
  486. assertThat(action2, .is(.errorCaught()))
  487. }
  488. func testReceiveDataWhenClosed() {
  489. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  490. // Close while the request stream is still open.
  491. let action1 = machine.send(
  492. status: GRPCStatus(code: .ok, message: "ok"),
  493. trailers: [:]
  494. )
  495. assertThat(action1, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
  496. // Now receive end of request stream: tear down the handler, we're closed
  497. var emptyBuffer = ByteBuffer()
  498. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  499. assertThat(action2, .is(.finishHandler))
  500. }
  501. // MARK: Send Metadata Tests
  502. func testSendMetadataRequestStreamOpen() {
  503. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  504. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  505. // We'll just validate more 'normal' things here.
  506. let action1 = machine.send(headers: [:])
  507. assertThat(action1, .is(.success(.contains(":status", ["200"]))))
  508. let action2 = machine.send(headers: [:])
  509. assertThat(action2, .is(.failure()))
  510. }
  511. func testSendMetadataRequestStreamClosed() {
  512. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  513. var buffer = ByteBuffer()
  514. let action1 = machine.receive(buffer: &buffer, endStream: true)
  515. assertThat(action1, .is(.tryReading))
  516. let action2 = machine.readNextRequest()
  517. assertThat(action2, .is(.forwardEnd()))
  518. // Write some headers back.
  519. let action3 = machine.send(headers: [:])
  520. assertThat(action3, .is(.success(.contains(":status", ["200"]))))
  521. }
  522. func testSendMetadataWhenOpen() {
  523. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  524. // Response stream is already open.
  525. let action = machine.send(headers: [:])
  526. assertThat(action, .is(.failure()))
  527. }
  528. func testSendMetadataNormalizesUserProvidedMetadata() {
  529. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  530. let action = machine.send(headers: ["FOO": "bar"])
  531. assertThat(action, .success(.contains(caseSensitive: "foo")))
  532. }
  533. // MARK: Send Data Tests
  534. func testSendData() {
  535. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  536. var machine = self.makeStateMachine(state: startingState)
  537. let buffer = ByteBuffer(repeating: 0, count: 1024)
  538. // We should be able to do this multiple times.
  539. for _ in 0 ..< 5 {
  540. let action = machine.send(
  541. buffer: buffer,
  542. compress: false,
  543. promise: nil
  544. )
  545. assertThat(action, .is(.success()))
  546. }
  547. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  548. // write as normal.
  549. let action = machine.send(
  550. buffer: buffer,
  551. compress: true,
  552. promise: nil
  553. )
  554. assertThat(action, .is(.success()))
  555. }
  556. }
  557. func testSendDataAfterClose() {
  558. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  559. let action1 = machine.send(status: .ok, trailers: [:])
  560. assertThat(action1, .is(.sendTrailersAndFinish(.contains("grpc-status", ["0"]))))
  561. // We're already closed, this should fail.
  562. let buffer = ByteBuffer(repeating: 0, count: 1024)
  563. let action2 = machine.send(
  564. buffer: buffer,
  565. compress: false,
  566. promise: nil
  567. )
  568. assertThat(action2, .is(.failure()))
  569. }
  570. func testSendDataBeforeMetadata() {
  571. var machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  572. // Response stream is still idle, so this should fail.
  573. let buffer = ByteBuffer(repeating: 0, count: 1024)
  574. let action2 = machine.send(
  575. buffer: buffer,
  576. compress: false,
  577. promise: nil
  578. )
  579. assertThat(action2, .is(.failure()))
  580. }
  581. // MARK: Next Response
  582. func testNextResponseBeforeMetadata() {
  583. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  584. XCTAssertNil(machine.nextResponse())
  585. }
  586. func testNextResponseWhenOpen() throws {
  587. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  588. var machine = self.makeStateMachine(state: startingState)
  589. // No response buffered yet.
  590. XCTAssertNil(machine.nextResponse())
  591. let buffer = ByteBuffer(repeating: 0, count: 1024)
  592. machine.send(buffer: buffer, compress: false, promise: nil).assertSuccess()
  593. let (framedBuffer, promise) = try XCTUnwrap(machine.nextResponse())
  594. XCTAssertNil(promise) // Didn't provide a promise.
  595. framedBuffer.assertSuccess()
  596. // No more responses.
  597. XCTAssertNil(machine.nextResponse())
  598. }
  599. }
  600. func testNextResponseWhenClosed() throws {
  601. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  602. let action = machine.send(status: .ok, trailers: [:])
  603. switch action {
  604. case .sendTrailersAndFinish:
  605. ()
  606. default:
  607. XCTFail("Expected 'sendTrailersAndFinish' but got \(action)")
  608. }
  609. XCTAssertNil(machine.nextResponse())
  610. }
  611. // MARK: Send End
  612. func testSendEndWhenResponseStreamIsIdle() {
  613. for (state, closed) in zip(
  614. [
  615. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  616. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  617. ],
  618. [false, true]
  619. ) {
  620. var machine = self.makeStateMachine(state: state)
  621. let action1 = machine.send(status: .ok, trailers: [:])
  622. // This'll be a trailers-only response.
  623. if closed {
  624. assertThat(action1, .is(.sendTrailersAndFinish(.trailersOnly(code: .ok))))
  625. } else {
  626. assertThat(action1, .is(.sendTrailers(.trailersOnly(code: .ok))))
  627. }
  628. // Already closed.
  629. let action2 = machine.send(status: .ok, trailers: [:])
  630. assertThat(action2, .is(.failure()))
  631. }
  632. }
  633. func testSendEndWhenResponseStreamIsOpen() {
  634. for (state, closed) in zip(
  635. [
  636. DesiredState.requestOpenResponseOpen,
  637. DesiredState.requestClosedResponseOpen,
  638. ],
  639. [false, true]
  640. ) {
  641. var machine = self.makeStateMachine(state: state)
  642. let action = machine.send(
  643. status: GRPCStatus(code: .ok, message: "ok"),
  644. trailers: [:]
  645. )
  646. if closed {
  647. assertThat(action, .is(.sendTrailersAndFinish(.trailers(code: .ok, message: "ok"))))
  648. } else {
  649. assertThat(action, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
  650. }
  651. // Already closed.
  652. let action2 = machine.send(status: .ok, trailers: [:])
  653. assertThat(action2, .is(.failure()))
  654. }
  655. }
  656. }
  657. extension ServerMessageEncoding {
  658. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  659. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  660. }
  661. }
  662. class NoOpResponseWriter: GRPCServerResponseWriter {
  663. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  664. promise?.succeed(())
  665. }
  666. func sendMessage(
  667. _ bytes: ByteBuffer,
  668. metadata: MessageMetadata,
  669. promise: EventLoopPromise<Void>?
  670. ) {
  671. promise?.succeed(())
  672. }
  673. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  674. promise?.succeed(())
  675. }
  676. }
  677. extension HTTP2ToRawGRPCStateMachine {
  678. fileprivate mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  679. return self.readNextRequest(maxLength: .max)
  680. }
  681. }