HTTP2ToRawGRPCStateMachineTests.swift 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  23. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  24. typealias State = StateMachine.State
  25. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  26. private var eventLoop: EventLoop {
  27. return EmbeddedEventLoop()
  28. }
  29. /// An allocator, just here for convenience.
  30. private let allocator = ByteBufferAllocator()
  31. private func makeHeaders(
  32. path: String = "/echo.Echo/Get",
  33. contentType: String?,
  34. encoding: String? = nil,
  35. acceptEncoding: [String]? = nil
  36. ) -> HPACKHeaders {
  37. var headers = HPACKHeaders()
  38. headers.add(name: ":path", value: path)
  39. if let contentType = contentType {
  40. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  41. }
  42. if let encoding = encoding {
  43. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  44. }
  45. if let acceptEncoding = acceptEncoding {
  46. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  47. }
  48. return headers
  49. }
  50. private func makeHeaders(
  51. path: String = "/echo.Echo/Get",
  52. contentType: ContentType? = .protobuf,
  53. encoding: CompressionAlgorithm? = nil,
  54. acceptEncoding: [CompressionAlgorithm]? = nil
  55. ) -> HPACKHeaders {
  56. return self.makeHeaders(
  57. path: path,
  58. contentType: contentType?.canonicalValue,
  59. encoding: encoding?.name,
  60. acceptEncoding: acceptEncoding?.map { $0.name }
  61. )
  62. }
  63. /// A minimum set of viable request headers for the service providers we register by default.
  64. private var viableHeaders: HPACKHeaders {
  65. return self.makeHeaders(
  66. path: "/echo.Echo/Get",
  67. contentType: "application/grpc"
  68. )
  69. }
  70. /// Just the echo service.
  71. private var services: [Substring: CallHandlerProvider] {
  72. let provider = EchoProvider()
  73. return [provider.serviceName: provider]
  74. }
  75. private enum DesiredState {
  76. case requestOpenResponseIdle(pipelineConfigured: Bool)
  77. case requestOpenResponseOpen
  78. case requestClosedResponseIdle(pipelineConfigured: Bool)
  79. case requestClosedResponseOpen
  80. }
  81. /// Makes a state machine in the desired state.
  82. private func makeStateMachine(
  83. services: [Substring: CallHandlerProvider]? = nil,
  84. encoding: ServerMessageEncoding = .disabled,
  85. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  86. ) -> StateMachine {
  87. var machine = StateMachine()
  88. let receiveHeadersAction = machine.receive(
  89. headers: self.viableHeaders,
  90. eventLoop: self.eventLoop,
  91. errorDelegate: nil,
  92. remoteAddress: nil,
  93. logger: self.logger,
  94. allocator: ByteBufferAllocator(),
  95. responseWriter: NoOpResponseWriter(),
  96. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  97. services: services ?? self.services,
  98. encoding: encoding,
  99. normalizeHeaders: true
  100. )
  101. assertThat(receiveHeadersAction, .is(.configure()))
  102. switch state {
  103. case .requestOpenResponseIdle(pipelineConfigured: false):
  104. ()
  105. case .requestOpenResponseIdle(pipelineConfigured: true):
  106. let configuredAction = machine.pipelineConfigured()
  107. assertThat(configuredAction, .is(.forwardHeaders()))
  108. case .requestOpenResponseOpen:
  109. let configuredAction = machine.pipelineConfigured()
  110. assertThat(configuredAction, .is(.forwardHeaders()))
  111. let sendHeadersAction = machine.send(headers: [:])
  112. assertThat(sendHeadersAction, .is(.success()))
  113. case .requestClosedResponseIdle(pipelineConfigured: false):
  114. var emptyBuffer = ByteBuffer()
  115. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  116. assertThat(receiveEnd, .is(.nothing))
  117. case .requestClosedResponseIdle(pipelineConfigured: true):
  118. let configuredAction = machine.pipelineConfigured()
  119. assertThat(configuredAction, .is(.forwardHeaders()))
  120. var emptyBuffer = ByteBuffer()
  121. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  122. assertThat(receiveEnd, .is(.tryReading))
  123. case .requestClosedResponseOpen:
  124. let configuredAction = machine.pipelineConfigured()
  125. assertThat(configuredAction, .is(.forwardHeaders()))
  126. var emptyBuffer = ByteBuffer()
  127. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  128. assertThat(receiveEndAction, .is(.tryReading))
  129. let readAction = machine.readNextRequest()
  130. assertThat(readAction, .is(.forwardEnd()))
  131. let sendHeadersAction = machine.send(headers: [:])
  132. assertThat(sendHeadersAction, .is(.success()))
  133. }
  134. return machine
  135. }
  136. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  137. /// message bytes (UInt8 ⨉ message length).
  138. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  139. var buffer = ByteBuffer()
  140. buffer.reserveCapacity(count + 5)
  141. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  142. buffer.writeInteger(UInt32(count))
  143. buffer.writeRepeatingByte(0, count: count)
  144. return buffer
  145. }
  146. // MARK: Receive Headers Tests
  147. func testReceiveValidHeaders() {
  148. var machine = StateMachine()
  149. let action = machine.receive(
  150. headers: self.viableHeaders,
  151. eventLoop: self.eventLoop,
  152. errorDelegate: nil,
  153. remoteAddress: nil,
  154. logger: self.logger,
  155. allocator: ByteBufferAllocator(),
  156. responseWriter: NoOpResponseWriter(),
  157. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  158. services: self.services,
  159. encoding: .disabled,
  160. normalizeHeaders: false
  161. )
  162. assertThat(action, .is(.configure()))
  163. }
  164. func testReceiveInvalidContentType() {
  165. var machine = StateMachine()
  166. let action = machine.receive(
  167. headers: self.makeHeaders(contentType: "application/json"),
  168. eventLoop: self.eventLoop,
  169. errorDelegate: nil,
  170. remoteAddress: nil,
  171. logger: self.logger,
  172. allocator: ByteBufferAllocator(),
  173. responseWriter: NoOpResponseWriter(),
  174. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  175. services: self.services,
  176. encoding: .disabled,
  177. normalizeHeaders: false
  178. )
  179. assertThat(action, .is(.rejectRPC(.contains(":status", ["415"]))))
  180. }
  181. func testReceiveValidHeadersForUnknownService() {
  182. var machine = StateMachine()
  183. let action = machine.receive(
  184. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  185. eventLoop: self.eventLoop,
  186. errorDelegate: nil,
  187. remoteAddress: nil,
  188. logger: self.logger,
  189. allocator: ByteBufferAllocator(),
  190. responseWriter: NoOpResponseWriter(),
  191. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  192. services: self.services,
  193. encoding: .disabled,
  194. normalizeHeaders: false
  195. )
  196. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  197. }
  198. func testReceiveValidHeadersForUnknownMethod() {
  199. var machine = StateMachine()
  200. let action = machine.receive(
  201. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  202. eventLoop: self.eventLoop,
  203. errorDelegate: nil,
  204. remoteAddress: nil,
  205. logger: self.logger,
  206. allocator: ByteBufferAllocator(),
  207. responseWriter: NoOpResponseWriter(),
  208. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  209. services: self.services,
  210. encoding: .disabled,
  211. normalizeHeaders: false
  212. )
  213. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  214. }
  215. func testReceiveValidHeadersForInvalidPath() {
  216. var machine = StateMachine()
  217. let action = machine.receive(
  218. headers: self.makeHeaders(path: "nope"),
  219. eventLoop: self.eventLoop,
  220. errorDelegate: nil,
  221. remoteAddress: nil,
  222. logger: self.logger,
  223. allocator: ByteBufferAllocator(),
  224. responseWriter: NoOpResponseWriter(),
  225. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  226. services: self.services,
  227. encoding: .disabled,
  228. normalizeHeaders: false
  229. )
  230. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  231. }
  232. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  233. var machine = StateMachine()
  234. let action = machine.receive(
  235. headers: self.makeHeaders(encoding: .gzip),
  236. eventLoop: self.eventLoop,
  237. errorDelegate: nil,
  238. remoteAddress: nil,
  239. logger: self.logger,
  240. allocator: ByteBufferAllocator(),
  241. responseWriter: NoOpResponseWriter(),
  242. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  243. services: self.services,
  244. encoding: .disabled,
  245. normalizeHeaders: false
  246. )
  247. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  248. }
  249. func testReceiveHeadersWithMultipleEncodings() {
  250. var machine = StateMachine()
  251. // We can't have multiple encodings.
  252. let action = machine.receive(
  253. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  254. eventLoop: self.eventLoop,
  255. errorDelegate: nil,
  256. remoteAddress: nil,
  257. logger: self.logger,
  258. allocator: ByteBufferAllocator(),
  259. responseWriter: NoOpResponseWriter(),
  260. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  261. services: self.services,
  262. encoding: .disabled,
  263. normalizeHeaders: false
  264. )
  265. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .invalidArgument))))
  266. }
  267. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  268. var machine = StateMachine()
  269. let action = machine.receive(
  270. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  271. eventLoop: self.eventLoop,
  272. errorDelegate: nil,
  273. remoteAddress: nil,
  274. logger: self.logger,
  275. allocator: ByteBufferAllocator(),
  276. responseWriter: NoOpResponseWriter(),
  277. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  278. services: self.services,
  279. encoding: .enabled(.deflate, .identity),
  280. normalizeHeaders: false
  281. )
  282. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  283. assertThat(
  284. action,
  285. .is(.rejectRPC(.contains("grpc-accept-encoding", ["deflate", "identity"])))
  286. )
  287. }
  288. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  289. var machine = StateMachine()
  290. // We didn't advertise gzip, but we do support it.
  291. let action = machine.receive(
  292. headers: self.makeHeaders(encoding: .gzip),
  293. eventLoop: self.eventLoop,
  294. errorDelegate: nil,
  295. remoteAddress: nil,
  296. logger: self.logger,
  297. allocator: ByteBufferAllocator(),
  298. responseWriter: NoOpResponseWriter(),
  299. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  300. services: self.services,
  301. encoding: .enabled(.deflate, .identity),
  302. normalizeHeaders: false
  303. )
  304. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  305. // metadata. Send back headers to test this.
  306. assertThat(action, .is(.configure()))
  307. let sendAction = machine.send(headers: [:])
  308. assertThat(sendAction, .success(.contains(
  309. "grpc-accept-encoding",
  310. ["deflate", "identity", "gzip"]
  311. )))
  312. }
  313. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  314. var machine = StateMachine()
  315. // Identity is always supported, even if compression is disabled.
  316. let action = machine.receive(
  317. headers: self.makeHeaders(encoding: .identity),
  318. eventLoop: self.eventLoop,
  319. errorDelegate: nil,
  320. remoteAddress: nil,
  321. logger: self.logger,
  322. allocator: ByteBufferAllocator(),
  323. responseWriter: NoOpResponseWriter(),
  324. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  325. services: self.services,
  326. encoding: .disabled,
  327. normalizeHeaders: false
  328. )
  329. assertThat(action, .is(.configure()))
  330. }
  331. func testReceiveHeadersNegotiatesResponseEncoding() {
  332. var machine = StateMachine()
  333. let action = machine.receive(
  334. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  335. eventLoop: self.eventLoop,
  336. errorDelegate: nil,
  337. remoteAddress: nil,
  338. logger: self.logger,
  339. allocator: ByteBufferAllocator(),
  340. responseWriter: NoOpResponseWriter(),
  341. closeFuture: self.eventLoop.makeSucceededVoidFuture(),
  342. services: self.services,
  343. encoding: .enabled(.gzip, .deflate),
  344. normalizeHeaders: false
  345. )
  346. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  347. assertThat(action, .is(.configure()))
  348. let sendAction = machine.send(headers: [:])
  349. assertThat(sendAction, .success(.contains("grpc-encoding", ["deflate"])))
  350. }
  351. // MARK: Receive Data Tests
  352. func testReceiveDataBeforePipelineIsConfigured() {
  353. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  354. let buffer = self.makeLengthPrefixedBytes(1024)
  355. // Receive a request. The pipeline isn't configured so no action.
  356. var buffer1 = buffer
  357. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  358. assertThat(action1, .is(.nothing))
  359. // Receive another request, still not configured so no action.
  360. var buffer2 = buffer
  361. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  362. assertThat(action2, .is(.nothing))
  363. // Configure the pipeline. We'll have headers to forward and messages to read.
  364. let action3 = machine.pipelineConfigured()
  365. assertThat(action3, .is(.forwardHeadersThenRead()))
  366. // Do the first read.
  367. let action4 = machine.readNextRequest()
  368. assertThat(action4, .is(.forwardMessageThenRead()))
  369. // Do the second and final read.
  370. let action5 = machine.readNextRequest()
  371. assertThat(action5, .is(.forwardMessage()))
  372. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  373. // after receiving.
  374. var emptyBuffer = ByteBuffer()
  375. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  376. assertThat(action6, .is(.tryReading))
  377. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  378. let action7 = machine.readNextRequest()
  379. assertThat(action7, .is(.forwardEnd()))
  380. }
  381. func testReceiveDataWhenPipelineIsConfigured() {
  382. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  383. let buffer = self.makeLengthPrefixedBytes(1024)
  384. // Receive a request. The pipeline is configured, so we should try reading.
  385. var buffer1 = buffer
  386. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  387. assertThat(action1, .is(.tryReading))
  388. // Read the message, consuming all bytes.
  389. let action2 = machine.readNextRequest()
  390. assertThat(action2, .is(.forwardMessage()))
  391. // Receive another request, we'll split buffer into two parts.
  392. var buffer3 = buffer
  393. var buffer2 = buffer3.readSlice(length: 20)!
  394. // Not enough bytes to form a message, so read won't result in anything.
  395. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  396. assertThat(action4, .is(.tryReading))
  397. let action5 = machine.readNextRequest()
  398. assertThat(action5, .is(.none()))
  399. // Now the rest of the message.
  400. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  401. assertThat(action6, .is(.tryReading))
  402. let action7 = machine.readNextRequest()
  403. assertThat(action7, .is(.forwardMessage()))
  404. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  405. // after receiving.
  406. var emptyBuffer = ByteBuffer()
  407. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  408. assertThat(action8, .is(.tryReading))
  409. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  410. let action9 = machine.readNextRequest()
  411. assertThat(action9, .is(.forwardEnd()))
  412. }
  413. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  414. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  415. let buffer = self.makeLengthPrefixedBytes(1024)
  416. // No action: the pipeline isn't configured.
  417. var buffer1 = buffer
  418. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  419. assertThat(action1, .is(.nothing))
  420. // Still no action.
  421. var buffer2 = buffer
  422. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  423. assertThat(action2, .is(.nothing))
  424. // Configure the pipeline. We have headers to forward and messages to read.
  425. let action3 = machine.pipelineConfigured()
  426. assertThat(action3, .is(.forwardHeadersThenRead()))
  427. // Read the first message.
  428. let action4 = machine.readNextRequest()
  429. assertThat(action4, .is(.forwardMessageThenRead()))
  430. // Read the second and final message.
  431. let action5 = machine.readNextRequest()
  432. assertThat(action5, .is(.forwardMessageThenRead()))
  433. let action6 = machine.readNextRequest()
  434. assertThat(action6, .is(.forwardEnd()))
  435. }
  436. func testReceiveDataAfterPipelineIsConfigured() {
  437. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  438. let buffer = self.makeLengthPrefixedBytes(1024)
  439. // Pipeline is configured, we should be able to read then forward the message.
  440. var buffer1 = buffer
  441. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  442. assertThat(action1, .is(.tryReading))
  443. let action2 = machine.readNextRequest()
  444. assertThat(action2, .is(.forwardMessage()))
  445. // Receive another message with end stream set.
  446. // Still no action.
  447. var buffer2 = buffer
  448. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  449. assertThat(action3, .is(.tryReading))
  450. let action4 = machine.readNextRequest()
  451. assertThat(action4, .is(.forwardMessageThenRead()))
  452. let action5 = machine.readNextRequest()
  453. assertThat(action5, .is(.forwardEnd()))
  454. }
  455. func testReceiveDataWhenResponseStreamIsOpen() {
  456. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  457. let buffer = self.makeLengthPrefixedBytes(1024)
  458. // Receive a message. We should read and forward it.
  459. var buffer1 = buffer
  460. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  461. assertThat(action1, .is(.tryReading))
  462. let action2 = machine.readNextRequest()
  463. assertThat(action2, .is(.forwardMessage()))
  464. // Receive a message and end stream. We should read it then forward message and end.
  465. var buffer2 = buffer
  466. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  467. assertThat(action3, .is(.tryReading))
  468. let action4 = machine.readNextRequest()
  469. assertThat(action4, .is(.forwardMessageThenRead()))
  470. let action5 = machine.readNextRequest()
  471. assertThat(action5, .is(.forwardEnd()))
  472. }
  473. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  474. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  475. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  476. let action1 = machine.receive(buffer: &buffer, endStream: false)
  477. assertThat(action1, .is(.tryReading))
  478. let action2 = machine.readNextRequest()
  479. assertThat(action2, .is(.errorCaught()))
  480. }
  481. func testReceiveDataWhenClosed() {
  482. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  483. // Close while the request stream is still open.
  484. let action1 = machine.send(
  485. status: GRPCStatus(code: .ok, message: "ok"),
  486. trailers: [:]
  487. )
  488. assertThat(action1, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
  489. // Now receive end of request stream: tear down the handler, we're closed
  490. var emptyBuffer = ByteBuffer()
  491. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  492. assertThat(action2, .is(.finishHandler))
  493. }
  494. // MARK: Send Metadata Tests
  495. func testSendMetadataRequestStreamOpen() {
  496. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  497. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  498. // We'll just validate more 'normal' things here.
  499. let action1 = machine.send(headers: [:])
  500. assertThat(action1, .is(.success(.contains(":status", ["200"]))))
  501. let action2 = machine.send(headers: [:])
  502. assertThat(action2, .is(.failure()))
  503. }
  504. func testSendMetadataRequestStreamClosed() {
  505. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  506. var buffer = ByteBuffer()
  507. let action1 = machine.receive(buffer: &buffer, endStream: true)
  508. assertThat(action1, .is(.tryReading))
  509. let action2 = machine.readNextRequest()
  510. assertThat(action2, .is(.forwardEnd()))
  511. // Write some headers back.
  512. let action3 = machine.send(headers: [:])
  513. assertThat(action3, .is(.success(.contains(":status", ["200"]))))
  514. }
  515. func testSendMetadataWhenOpen() {
  516. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  517. // Response stream is already open.
  518. let action = machine.send(headers: [:])
  519. assertThat(action, .is(.failure()))
  520. }
  521. func testSendMetadataNormalizesUserProvidedMetadata() {
  522. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  523. let action = machine.send(headers: ["FOO": "bar"])
  524. assertThat(action, .success(.contains(caseSensitive: "foo")))
  525. }
  526. // MARK: Send Data Tests
  527. func testSendData() {
  528. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  529. let machine = self.makeStateMachine(state: startingState)
  530. let buffer = ByteBuffer(repeating: 0, count: 1024)
  531. // We should be able to do this multiple times.
  532. for _ in 0 ..< 5 {
  533. let action = machine.send(
  534. buffer: buffer,
  535. allocator: self.allocator,
  536. compress: false
  537. )
  538. assertThat(action, .is(.success()))
  539. }
  540. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  541. // write as normal.
  542. let action = machine.send(
  543. buffer: buffer,
  544. allocator: self.allocator,
  545. compress: true
  546. )
  547. assertThat(action, .is(.success()))
  548. }
  549. }
  550. func testSendDataAfterClose() {
  551. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  552. let action1 = machine.send(status: .ok, trailers: [:])
  553. assertThat(action1, .is(.sendTrailersAndFinish(.contains("grpc-status", ["0"]))))
  554. // We're already closed, this should fail.
  555. let buffer = ByteBuffer(repeating: 0, count: 1024)
  556. let action2 = machine.send(
  557. buffer: buffer,
  558. allocator: self.allocator,
  559. compress: false
  560. )
  561. assertThat(action2, .is(.failure()))
  562. }
  563. func testSendDataBeforeMetadata() {
  564. let machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  565. // Response stream is still idle, so this should fail.
  566. let buffer = ByteBuffer(repeating: 0, count: 1024)
  567. let action2 = machine.send(
  568. buffer: buffer,
  569. allocator: self.allocator,
  570. compress: false
  571. )
  572. assertThat(action2, .is(.failure()))
  573. }
  574. // MARK: Send End
  575. func testSendEndWhenResponseStreamIsIdle() {
  576. for (state, closed) in zip([
  577. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  578. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  579. ], [false, true]) {
  580. var machine = self.makeStateMachine(state: state)
  581. let action1 = machine.send(status: .ok, trailers: [:])
  582. // This'll be a trailers-only response.
  583. if closed {
  584. assertThat(action1, .is(.sendTrailersAndFinish(.trailersOnly(code: .ok))))
  585. } else {
  586. assertThat(action1, .is(.sendTrailers(.trailersOnly(code: .ok))))
  587. }
  588. // Already closed.
  589. let action2 = machine.send(status: .ok, trailers: [:])
  590. assertThat(action2, .is(.failure()))
  591. }
  592. }
  593. func testSendEndWhenResponseStreamIsOpen() {
  594. for (state, closed) in zip([
  595. DesiredState.requestOpenResponseOpen,
  596. DesiredState.requestClosedResponseOpen,
  597. ], [false, true]) {
  598. var machine = self.makeStateMachine(state: state)
  599. let action = machine.send(
  600. status: GRPCStatus(code: .ok, message: "ok"),
  601. trailers: [:]
  602. )
  603. if closed {
  604. assertThat(action, .is(.sendTrailersAndFinish(.trailers(code: .ok, message: "ok"))))
  605. } else {
  606. assertThat(action, .is(.sendTrailers(.trailers(code: .ok, message: "ok"))))
  607. }
  608. // Already closed.
  609. let action2 = machine.send(status: .ok, trailers: [:])
  610. assertThat(action2, .is(.failure()))
  611. }
  612. }
  613. }
  614. extension ServerMessageEncoding {
  615. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  616. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  617. }
  618. }
  619. class NoOpResponseWriter: GRPCServerResponseWriter {
  620. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  621. promise?.succeed(())
  622. }
  623. func sendMessage(
  624. _ bytes: ByteBuffer,
  625. metadata: MessageMetadata,
  626. promise: EventLoopPromise<Void>?
  627. ) {
  628. promise?.succeed(())
  629. }
  630. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  631. promise?.succeed(())
  632. }
  633. }
  634. extension HTTP2ToRawGRPCStateMachine {
  635. fileprivate mutating func readNextRequest() -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
  636. return self.readNextRequest(maxLength: .max)
  637. }
  638. }