HTTP2ToRawGRPCStateMachineTests.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. @testable import GRPC
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. class HTTP2ToRawGRPCStateMachineTests: GRPCTestCase {
  23. typealias StateMachine = HTTP2ToRawGRPCStateMachine
  24. typealias State = StateMachine.State
  25. // An event loop gets passed to any service handler that's created, we don't actually use it here.
  26. private var eventLoop: EventLoop {
  27. return EmbeddedEventLoop()
  28. }
  29. /// An allocator, just here for convenience.
  30. private let allocator = ByteBufferAllocator()
  31. private func makeHeaders(
  32. path: String = "/echo.Echo/Get",
  33. contentType: String?,
  34. encoding: String? = nil,
  35. acceptEncoding: [String]? = nil
  36. ) -> HPACKHeaders {
  37. var headers = HPACKHeaders()
  38. headers.add(name: ":path", value: path)
  39. if let contentType = contentType {
  40. headers.add(name: GRPCHeaderName.contentType, value: contentType)
  41. }
  42. if let encoding = encoding {
  43. headers.add(name: GRPCHeaderName.encoding, value: encoding)
  44. }
  45. if let acceptEncoding = acceptEncoding {
  46. headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding.joined(separator: ","))
  47. }
  48. return headers
  49. }
  50. private func makeHeaders(
  51. path: String = "/echo.Echo/Get",
  52. contentType: ContentType? = .protobuf,
  53. encoding: CompressionAlgorithm? = nil,
  54. acceptEncoding: [CompressionAlgorithm]? = nil
  55. ) -> HPACKHeaders {
  56. return self.makeHeaders(
  57. path: path,
  58. contentType: contentType?.canonicalValue,
  59. encoding: encoding?.name,
  60. acceptEncoding: acceptEncoding?.map { $0.name }
  61. )
  62. }
  63. /// A minimum set of viable request headers for the service providers we register by default.
  64. private var viableHeaders: HPACKHeaders {
  65. return self.makeHeaders(
  66. path: "/echo.Echo/Get",
  67. contentType: "application/grpc"
  68. )
  69. }
  70. /// Just the echo service.
  71. private var services: [Substring: CallHandlerProvider] {
  72. let provider = EchoProvider()
  73. return [provider.serviceName: provider]
  74. }
  75. private enum DesiredState {
  76. case requestOpenResponseIdle(pipelineConfigured: Bool)
  77. case requestOpenResponseOpen
  78. case requestClosedResponseIdle(pipelineConfigured: Bool)
  79. case requestClosedResponseOpen
  80. }
  81. /// Makes a state machine in the desired state.
  82. private func makeStateMachine(
  83. services: [Substring: CallHandlerProvider]? = nil,
  84. encoding: ServerMessageEncoding = .disabled,
  85. state: DesiredState = .requestOpenResponseIdle(pipelineConfigured: true)
  86. ) -> StateMachine {
  87. var machine = StateMachine(services: services ?? self.services, encoding: encoding)
  88. let receiveHeadersAction = machine.receive(
  89. headers: self.viableHeaders,
  90. eventLoop: self.eventLoop,
  91. errorDelegate: nil,
  92. remoteAddress: nil,
  93. logger: self.logger,
  94. allocator: ByteBufferAllocator(),
  95. responseWriter: NoOpResponseWriter(),
  96. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  97. )
  98. assertThat(receiveHeadersAction, .is(.configure()))
  99. switch state {
  100. case .requestOpenResponseIdle(pipelineConfigured: false):
  101. ()
  102. case .requestOpenResponseIdle(pipelineConfigured: true):
  103. let configuredAction = machine.pipelineConfigured()
  104. assertThat(configuredAction, .is(.forwardHeaders()))
  105. case .requestOpenResponseOpen:
  106. let configuredAction = machine.pipelineConfigured()
  107. assertThat(configuredAction, .is(.forwardHeaders()))
  108. let sendHeadersAction = machine.send(headers: [:])
  109. assertThat(sendHeadersAction, .is(.success()))
  110. case .requestClosedResponseIdle(pipelineConfigured: false):
  111. var emptyBuffer = ByteBuffer()
  112. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  113. assertThat(receiveEnd, .is(false))
  114. case .requestClosedResponseIdle(pipelineConfigured: true):
  115. let configuredAction = machine.pipelineConfigured()
  116. assertThat(configuredAction, .is(.forwardHeaders()))
  117. var emptyBuffer = ByteBuffer()
  118. let receiveEnd = machine.receive(buffer: &emptyBuffer, endStream: true)
  119. assertThat(receiveEnd, .is(true))
  120. case .requestClosedResponseOpen:
  121. let configuredAction = machine.pipelineConfigured()
  122. assertThat(configuredAction, .is(.forwardHeaders()))
  123. var emptyBuffer = ByteBuffer()
  124. let receiveEndAction = machine.receive(buffer: &emptyBuffer, endStream: true)
  125. assertThat(receiveEndAction, .is(true))
  126. let readAction = machine.readNextRequest()
  127. assertThat(readAction, .is(.forwardEnd()))
  128. let sendHeadersAction = machine.send(headers: [:])
  129. assertThat(sendHeadersAction, .is(.success()))
  130. }
  131. return machine
  132. }
  133. /// Makes a gRPC framed message; i.e. a compression flag (UInt8), the message length (UIn32), the
  134. /// message bytes (UInt8 ⨉ message length).
  135. private func makeLengthPrefixedBytes(_ count: Int, setCompressFlag: Bool = false) -> ByteBuffer {
  136. var buffer = ByteBuffer()
  137. buffer.reserveCapacity(count + 5)
  138. buffer.writeInteger(UInt8(setCompressFlag ? 1 : 0))
  139. buffer.writeInteger(UInt32(count))
  140. buffer.writeRepeatingByte(0, count: count)
  141. return buffer
  142. }
  143. // MARK: Receive Headers Tests
  144. func testReceiveValidHeaders() {
  145. var machine = StateMachine(services: self.services, encoding: .disabled)
  146. let action = machine.receive(
  147. headers: self.viableHeaders,
  148. eventLoop: self.eventLoop,
  149. errorDelegate: nil,
  150. remoteAddress: nil,
  151. logger: self.logger,
  152. allocator: ByteBufferAllocator(),
  153. responseWriter: NoOpResponseWriter(),
  154. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  155. )
  156. assertThat(action, .is(.configure()))
  157. }
  158. func testReceiveInvalidContentType() {
  159. var machine = StateMachine(services: self.services, encoding: .disabled)
  160. let action = machine.receive(
  161. headers: self.makeHeaders(contentType: "application/json"),
  162. eventLoop: self.eventLoop,
  163. errorDelegate: nil,
  164. remoteAddress: nil,
  165. logger: self.logger,
  166. allocator: ByteBufferAllocator(),
  167. responseWriter: NoOpResponseWriter(),
  168. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  169. )
  170. assertThat(action, .is(.rejectRPC(.contains(":status", ["415"]))))
  171. }
  172. func testReceiveValidHeadersForUnknownService() {
  173. var machine = StateMachine(services: self.services, encoding: .disabled)
  174. let action = machine.receive(
  175. headers: self.makeHeaders(path: "/foo.Foo/Get"),
  176. eventLoop: self.eventLoop,
  177. errorDelegate: nil,
  178. remoteAddress: nil,
  179. logger: self.logger,
  180. allocator: ByteBufferAllocator(),
  181. responseWriter: NoOpResponseWriter(),
  182. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  183. )
  184. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  185. }
  186. func testReceiveValidHeadersForUnknownMethod() {
  187. var machine = StateMachine(services: self.services, encoding: .disabled)
  188. let action = machine.receive(
  189. headers: self.makeHeaders(path: "/echo.Echo/Foo"),
  190. eventLoop: self.eventLoop,
  191. errorDelegate: nil,
  192. remoteAddress: nil,
  193. logger: self.logger,
  194. allocator: ByteBufferAllocator(),
  195. responseWriter: NoOpResponseWriter(),
  196. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  197. )
  198. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  199. }
  200. func testReceiveValidHeadersForInvalidPath() {
  201. var machine = StateMachine(services: self.services, encoding: .disabled)
  202. let action = machine.receive(
  203. headers: self.makeHeaders(path: "nope"),
  204. eventLoop: self.eventLoop,
  205. errorDelegate: nil,
  206. remoteAddress: nil,
  207. logger: self.logger,
  208. allocator: ByteBufferAllocator(),
  209. responseWriter: NoOpResponseWriter(),
  210. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  211. )
  212. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  213. }
  214. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsDisabled() {
  215. var machine = StateMachine(services: self.services, encoding: .disabled)
  216. let action = machine.receive(
  217. headers: self.makeHeaders(encoding: .gzip),
  218. eventLoop: self.eventLoop,
  219. errorDelegate: nil,
  220. remoteAddress: nil,
  221. logger: self.logger,
  222. allocator: ByteBufferAllocator(),
  223. responseWriter: NoOpResponseWriter(),
  224. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  225. )
  226. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  227. }
  228. func testReceiveHeadersWithMultipleEncodings() {
  229. var machine = StateMachine(services: self.services, encoding: .disabled)
  230. // We can't have multiple encodings.
  231. let action = machine.receive(
  232. headers: self.makeHeaders(contentType: "application/grpc", encoding: "gzip,identity"),
  233. eventLoop: self.eventLoop,
  234. errorDelegate: nil,
  235. remoteAddress: nil,
  236. logger: self.logger,
  237. allocator: ByteBufferAllocator(),
  238. responseWriter: NoOpResponseWriter(),
  239. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  240. )
  241. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .invalidArgument))))
  242. }
  243. func testReceiveHeadersWithUnsupportedEncodingWhenCompressionIsEnabled() {
  244. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  245. let action = machine.receive(
  246. headers: self.makeHeaders(contentType: "application/grpc", encoding: "foozip"),
  247. eventLoop: self.eventLoop,
  248. errorDelegate: nil,
  249. remoteAddress: nil,
  250. logger: self.logger,
  251. allocator: ByteBufferAllocator(),
  252. responseWriter: NoOpResponseWriter(),
  253. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  254. )
  255. assertThat(action, .is(.rejectRPC(.trailersOnly(code: .unimplemented))))
  256. assertThat(
  257. action,
  258. .is(.rejectRPC(.contains("grpc-accept-encoding", ["deflate", "identity"])))
  259. )
  260. }
  261. func testReceiveHeadersWithSupportedButNotAdvertisedEncoding() {
  262. var machine = StateMachine(services: self.services, encoding: .enabled(.deflate, .identity))
  263. // We didn't advertise gzip, but we do support it.
  264. let action = machine.receive(
  265. headers: self.makeHeaders(encoding: .gzip),
  266. eventLoop: self.eventLoop,
  267. errorDelegate: nil,
  268. remoteAddress: nil,
  269. logger: self.logger,
  270. allocator: ByteBufferAllocator(),
  271. responseWriter: NoOpResponseWriter(),
  272. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  273. )
  274. // This is expected: however, we also expect 'grpc-accept-encoding' to be in the response
  275. // metadata. Send back headers to test this.
  276. assertThat(action, .is(.configure()))
  277. let sendAction = machine.send(headers: [:])
  278. assertThat(sendAction, .success(.contains(
  279. "grpc-accept-encoding",
  280. ["deflate", "identity", "gzip"]
  281. )))
  282. }
  283. func testReceiveHeadersWithIdentityCompressionWhenCompressionIsDisabled() {
  284. var machine = StateMachine(services: self.services, encoding: .disabled)
  285. // Identity is always supported, even if compression is disabled.
  286. let action = machine.receive(
  287. headers: self.makeHeaders(encoding: .identity),
  288. eventLoop: self.eventLoop,
  289. errorDelegate: nil,
  290. remoteAddress: nil,
  291. logger: self.logger,
  292. allocator: ByteBufferAllocator(),
  293. responseWriter: NoOpResponseWriter(),
  294. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  295. )
  296. assertThat(action, .is(.configure()))
  297. }
  298. func testReceiveHeadersNegotiatesResponseEncoding() {
  299. var machine = StateMachine(services: self.services, encoding: .enabled(.gzip, .deflate))
  300. let action = machine.receive(
  301. headers: self.makeHeaders(acceptEncoding: [.deflate]),
  302. eventLoop: self.eventLoop,
  303. errorDelegate: nil,
  304. remoteAddress: nil,
  305. logger: self.logger,
  306. allocator: ByteBufferAllocator(),
  307. responseWriter: NoOpResponseWriter(),
  308. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  309. )
  310. // This is expected, but we need to check the value of 'grpc-encoding' in the response headers.
  311. assertThat(action, .is(.configure()))
  312. let sendAction = machine.send(headers: [:])
  313. assertThat(sendAction, .success(.contains("grpc-encoding", ["deflate"])))
  314. }
  315. // MARK: Receive Data Tests
  316. func testReceiveDataBeforePipelineIsConfigured() {
  317. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  318. let buffer = self.makeLengthPrefixedBytes(1024)
  319. // Receive a request. The pipeline isn't configured so no action.
  320. var buffer1 = buffer
  321. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  322. assertThat(action1, .is(false))
  323. // Receive another request, still not configured so no action.
  324. var buffer2 = buffer
  325. let action2 = machine.receive(buffer: &buffer2, endStream: false)
  326. assertThat(action2, .is(false))
  327. // Configure the pipeline. We'll have headers to forward and messages to read.
  328. let action3 = machine.pipelineConfigured()
  329. assertThat(action3, .is(.forwardHeadersThenRead()))
  330. // Do the first read.
  331. let action4 = machine.readNextRequest()
  332. assertThat(action4, .is(.forwardMessageThenRead()))
  333. // Do the second and final read.
  334. let action5 = machine.readNextRequest()
  335. assertThat(action5, .is(.forwardMessage()))
  336. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  337. // after receiving.
  338. var emptyBuffer = ByteBuffer()
  339. let action6 = machine.receive(buffer: &emptyBuffer, endStream: true)
  340. assertThat(action6, .is(true))
  341. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  342. let action7 = machine.readNextRequest()
  343. assertThat(action7, .is(.forwardEnd()))
  344. }
  345. func testReceiveDataWhenPipelineIsConfigured() {
  346. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  347. let buffer = self.makeLengthPrefixedBytes(1024)
  348. // Receive a request. The pipeline is configured, so we should try reading.
  349. var buffer1 = buffer
  350. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  351. assertThat(action1, .is(true))
  352. // Read the message, consuming all bytes.
  353. let action2 = machine.readNextRequest()
  354. assertThat(action2, .is(.forwardMessage()))
  355. // Receive another request, we'll split buffer into two parts.
  356. var buffer3 = buffer
  357. var buffer2 = buffer3.readSlice(length: 20)!
  358. // Not enough bytes to form a message, so read won't result in anything.
  359. let action4 = machine.receive(buffer: &buffer2, endStream: false)
  360. assertThat(action4, .is(true))
  361. let action5 = machine.readNextRequest()
  362. assertThat(action5, .is(.none()))
  363. // Now the rest of the message.
  364. let action6 = machine.receive(buffer: &buffer3, endStream: false)
  365. assertThat(action6, .is(true))
  366. let action7 = machine.readNextRequest()
  367. assertThat(action7, .is(.forwardMessage()))
  368. // Receive an empty buffer with end stream. Since we're configured we'll always try to read
  369. // after receiving.
  370. var emptyBuffer = ByteBuffer()
  371. let action8 = machine.receive(buffer: &emptyBuffer, endStream: true)
  372. assertThat(action8, .is(true))
  373. // There's nothing in the reader to consume, but since we saw end stream we'll have to close.
  374. let action9 = machine.readNextRequest()
  375. assertThat(action9, .is(.forwardEnd()))
  376. }
  377. func testReceiveDataAndEndStreamBeforePipelineIsConfigured() {
  378. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: false))
  379. let buffer = self.makeLengthPrefixedBytes(1024)
  380. // No action: the pipeline isn't configured.
  381. var buffer1 = buffer
  382. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  383. assertThat(action1, .is(false))
  384. // Still no action.
  385. var buffer2 = buffer
  386. let action2 = machine.receive(buffer: &buffer2, endStream: true)
  387. assertThat(action2, .is(false))
  388. // Configure the pipeline. We have headers to forward and messages to read.
  389. let action3 = machine.pipelineConfigured()
  390. assertThat(action3, .is(.forwardHeadersThenRead()))
  391. // Read the first message.
  392. let action4 = machine.readNextRequest()
  393. assertThat(action4, .is(.forwardMessageThenRead()))
  394. // Read the second and final message.
  395. let action5 = machine.readNextRequest()
  396. assertThat(action5, .is(.forwardMessageAndEnd()))
  397. }
  398. func testReceiveDataAfterPipelineIsConfigured() {
  399. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  400. let buffer = self.makeLengthPrefixedBytes(1024)
  401. // Pipeline is configured, we should be able to read then forward the message.
  402. var buffer1 = buffer
  403. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  404. assertThat(action1, .is(true))
  405. let action2 = machine.readNextRequest()
  406. assertThat(action2, .is(.forwardMessage()))
  407. // Receive another message with end stream set.
  408. // Still no action.
  409. var buffer2 = buffer
  410. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  411. assertThat(action3, .is(true))
  412. let action4 = machine.readNextRequest()
  413. assertThat(action4, .is(.forwardMessageAndEnd()))
  414. }
  415. func testReceiveDataWhenResponseStreamIsOpen() {
  416. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  417. let buffer = self.makeLengthPrefixedBytes(1024)
  418. // Receive a message. We should read and forward it.
  419. var buffer1 = buffer
  420. let action1 = machine.receive(buffer: &buffer1, endStream: false)
  421. assertThat(action1, .is(true))
  422. let action2 = machine.readNextRequest()
  423. assertThat(action2, .is(.forwardMessage()))
  424. // Receive a message and end stream. We should read it then forward message and end.
  425. var buffer2 = buffer
  426. let action3 = machine.receive(buffer: &buffer2, endStream: true)
  427. assertThat(action3, .is(true))
  428. let action4 = machine.readNextRequest()
  429. assertThat(action4, .is(.forwardMessageAndEnd()))
  430. }
  431. func testReceiveCompressedMessageWhenCompressionIsDisabled() {
  432. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  433. var buffer = self.makeLengthPrefixedBytes(1024, setCompressFlag: true)
  434. let action1 = machine.receive(buffer: &buffer, endStream: false)
  435. assertThat(action1, .is(true))
  436. let action2 = machine.readNextRequest()
  437. assertThat(action2, .is(.errorCaught()))
  438. }
  439. func testReceiveDataWhenClosed() {
  440. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  441. // Close while the request stream is still open.
  442. let action1 = machine.send(
  443. status: GRPCStatus(code: .ok, message: "ok"),
  444. trailers: [:]
  445. )
  446. assertThat(action1, .is(.success(.trailers(code: .ok, message: "ok"))))
  447. // Now receive end of request stream: no action, we're closed.
  448. var emptyBuffer = ByteBuffer()
  449. let action2 = machine.receive(buffer: &emptyBuffer, endStream: true)
  450. assertThat(action2, .is(false))
  451. }
  452. // MARK: Send Metadata Tests
  453. func testSendMetadataRequestStreamOpen() {
  454. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  455. // We tested most of the weird (request encoding, negotiating response encoding etc.) above.
  456. // We'll just validate more 'normal' things here.
  457. let action1 = machine.send(headers: [:])
  458. assertThat(action1, .is(.success(.contains(":status", ["200"]))))
  459. let action2 = machine.send(headers: [:])
  460. assertThat(action2, .is(.failure()))
  461. }
  462. func testSendMetadataRequestStreamClosed() {
  463. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  464. var buffer = ByteBuffer()
  465. let action1 = machine.receive(buffer: &buffer, endStream: true)
  466. assertThat(action1, .is(true))
  467. let action2 = machine.readNextRequest()
  468. assertThat(action2, .is(.forwardEnd()))
  469. // Write some headers back.
  470. let action3 = machine.send(headers: [:])
  471. assertThat(action3, .is(.success(.contains(":status", ["200"]))))
  472. }
  473. func testSendMetadataWhenOpen() {
  474. var machine = self.makeStateMachine(state: .requestOpenResponseOpen)
  475. // Response stream is already open.
  476. let action = machine.send(headers: [:])
  477. assertThat(action, .is(.failure()))
  478. }
  479. func testSendMetadataNormalizesUserProvidedMetadata() {
  480. var machine = self.makeStateMachine(state: .requestOpenResponseIdle(pipelineConfigured: true))
  481. let action = machine.send(headers: ["FOO": "bar"])
  482. assertThat(action, .success(.contains(caseSensitive: "foo")))
  483. }
  484. // MARK: Send Data Tests
  485. func testSendData() {
  486. for startingState in [DesiredState.requestOpenResponseOpen, .requestClosedResponseOpen] {
  487. let machine = self.makeStateMachine(state: startingState)
  488. let buffer = ByteBuffer(repeating: 0, count: 1024)
  489. // We should be able to do this multiple times.
  490. for _ in 0 ..< 5 {
  491. let action = machine.send(
  492. buffer: buffer,
  493. allocator: self.allocator,
  494. compress: false
  495. )
  496. assertThat(action, .is(.success()))
  497. }
  498. // Set the compress flag, we're not setup to compress so the flag will just be ignored, we'll
  499. // write as normal.
  500. let action = machine.send(
  501. buffer: buffer,
  502. allocator: self.allocator,
  503. compress: true
  504. )
  505. assertThat(action, .is(.success()))
  506. }
  507. }
  508. func testSendDataAfterClose() {
  509. var machine = self.makeStateMachine(state: .requestClosedResponseOpen)
  510. let action1 = machine.send(status: .ok, trailers: [:])
  511. assertThat(action1, .is(.success(.contains("grpc-status", ["0"]))))
  512. // We're already closed, this should fail.
  513. let buffer = ByteBuffer(repeating: 0, count: 1024)
  514. let action2 = machine.send(
  515. buffer: buffer,
  516. allocator: self.allocator,
  517. compress: false
  518. )
  519. assertThat(action2, .is(.failure()))
  520. }
  521. func testSendDataBeforeMetadata() {
  522. let machine = self.makeStateMachine(state: .requestClosedResponseIdle(pipelineConfigured: true))
  523. // Response stream is still idle, so this should fail.
  524. let buffer = ByteBuffer(repeating: 0, count: 1024)
  525. let action2 = machine.send(
  526. buffer: buffer,
  527. allocator: self.allocator,
  528. compress: false
  529. )
  530. assertThat(action2, .is(.failure()))
  531. }
  532. // MARK: Send End
  533. func testSendEndWhenResponseStreamIsIdle() {
  534. for state in [
  535. DesiredState.requestOpenResponseIdle(pipelineConfigured: true),
  536. DesiredState.requestClosedResponseIdle(pipelineConfigured: true),
  537. ] {
  538. var machine = self.makeStateMachine(state: state)
  539. let action1 = machine.send(status: .ok, trailers: [:])
  540. // This'll be a trailers-only response.
  541. assertThat(action1, .is(.success(.trailersOnly(code: .ok))))
  542. // Already closed.
  543. let action2 = machine.send(status: .ok, trailers: [:])
  544. assertThat(action2, .is(.failure()))
  545. }
  546. }
  547. func testSendEndWhenResponseStreamIsOpen() {
  548. for state in [
  549. DesiredState.requestOpenResponseOpen,
  550. DesiredState.requestClosedResponseOpen,
  551. ] {
  552. var machine = self.makeStateMachine(state: state)
  553. let action = machine.send(
  554. status: GRPCStatus(code: .ok, message: "ok"),
  555. trailers: [:]
  556. )
  557. assertThat(action, .is(.success(.trailers(code: .ok, message: "ok"))))
  558. // Already closed.
  559. let action2 = machine.send(status: .ok, trailers: [:])
  560. assertThat(action2, .is(.failure()))
  561. }
  562. }
  563. }
  564. extension ServerMessageEncoding {
  565. fileprivate static func enabled(_ algorithms: CompressionAlgorithm...) -> ServerMessageEncoding {
  566. return .enabled(.init(enabledAlgorithms: algorithms, decompressionLimit: .absolute(.max)))
  567. }
  568. }
  569. class NoOpResponseWriter: GRPCServerResponseWriter {
  570. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  571. promise?.succeed(())
  572. }
  573. func sendMessage(
  574. _ bytes: ByteBuffer,
  575. metadata: MessageMetadata,
  576. promise: EventLoopPromise<Void>?
  577. ) {
  578. promise?.succeed(())
  579. }
  580. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  581. promise?.succeed(())
  582. }
  583. }