GRPCServerStreamHandlerTests.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCServerStreamHandler(
  27. scheme: .http,
  28. acceptedEncodings: [],
  29. maximumPayloadSize: 1
  30. )
  31. let channel = EmbeddedChannel(handler: handler)
  32. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  33. .ping(.init(), ack: false),
  34. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  35. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  36. // initialiser is internal, so I can't create one of these frames.
  37. .rstStream(.cancel),
  38. .settings(.ack),
  39. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  40. .windowUpdate(windowSizeIncrement: 4),
  41. .alternativeService(origin: nil, field: nil),
  42. .origin([]),
  43. ]
  44. for toBeIgnored in framesToBeIgnored {
  45. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  46. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  47. }
  48. }
  49. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  50. let handler = GRPCServerStreamHandler(
  51. scheme: .http,
  52. acceptedEncodings: [],
  53. maximumPayloadSize: 1
  54. )
  55. let channel = EmbeddedChannel(handler: handler)
  56. // Receive client's initial metadata without content-type
  57. let clientInitialMetadata: HPACKHeaders = [
  58. GRPCHTTP2Keys.path.rawValue: "test/test",
  59. GRPCHTTP2Keys.scheme.rawValue: "http",
  60. GRPCHTTP2Keys.method.rawValue: "POST",
  61. GRPCHTTP2Keys.te.rawValue: "trailers",
  62. ]
  63. XCTAssertNoThrow(
  64. try channel.writeInbound(
  65. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  66. )
  67. )
  68. // Make sure we have sent a trailers-only response
  69. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  70. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  71. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  72. }
  73. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  74. let handler = GRPCServerStreamHandler(
  75. scheme: .http,
  76. acceptedEncodings: [],
  77. maximumPayloadSize: 1
  78. )
  79. let channel = EmbeddedChannel(handler: handler)
  80. // Receive client's initial metadata without :method
  81. let clientInitialMetadata: HPACKHeaders = [
  82. GRPCHTTP2Keys.path.rawValue: "test/test",
  83. GRPCHTTP2Keys.scheme.rawValue: "http",
  84. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  85. GRPCHTTP2Keys.te.rawValue: "trailers",
  86. ]
  87. XCTAssertNoThrow(
  88. try channel.writeInbound(
  89. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  90. )
  91. )
  92. // Make sure we have sent a trailers-only response
  93. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  94. XCTAssertEqual(
  95. writtenTrailersOnlyResponse.headers,
  96. [
  97. GRPCHTTP2Keys.status.rawValue: "200",
  98. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  99. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  100. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  101. ":method header is expected to be present and have a value of \"POST\".",
  102. ]
  103. )
  104. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  105. }
  106. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  107. let handler = GRPCServerStreamHandler(
  108. scheme: .http,
  109. acceptedEncodings: [],
  110. maximumPayloadSize: 1
  111. )
  112. let channel = EmbeddedChannel(handler: handler)
  113. // Receive client's initial metadata without :scheme
  114. let clientInitialMetadata: HPACKHeaders = [
  115. GRPCHTTP2Keys.path.rawValue: "test/test",
  116. GRPCHTTP2Keys.method.rawValue: "POST",
  117. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  118. GRPCHTTP2Keys.te.rawValue: "trailers",
  119. ]
  120. XCTAssertNoThrow(
  121. try channel.writeInbound(
  122. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  123. )
  124. )
  125. // Make sure we have sent a trailers-only response
  126. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  127. XCTAssertEqual(
  128. writtenTrailersOnlyResponse.headers,
  129. [
  130. GRPCHTTP2Keys.status.rawValue: "200",
  131. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  132. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  133. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  134. ":scheme header must be present and one of \"http\" or \"https\".",
  135. ]
  136. )
  137. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  138. }
  139. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  140. let handler = GRPCServerStreamHandler(
  141. scheme: .http,
  142. acceptedEncodings: [],
  143. maximumPayloadSize: 1
  144. )
  145. let channel = EmbeddedChannel(handler: handler)
  146. // Receive client's initial metadata without :path
  147. let clientInitialMetadata: HPACKHeaders = [
  148. GRPCHTTP2Keys.scheme.rawValue: "http",
  149. GRPCHTTP2Keys.method.rawValue: "POST",
  150. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  151. GRPCHTTP2Keys.te.rawValue: "trailers",
  152. ]
  153. XCTAssertNoThrow(
  154. try channel.writeInbound(
  155. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  156. )
  157. )
  158. // Make sure we have sent a trailers-only response
  159. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  160. XCTAssertEqual(
  161. writtenTrailersOnlyResponse.headers,
  162. [
  163. GRPCHTTP2Keys.status.rawValue: "200",
  164. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  165. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  166. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  167. ]
  168. )
  169. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  170. }
  171. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  172. let handler = GRPCServerStreamHandler(
  173. scheme: .http,
  174. acceptedEncodings: [],
  175. maximumPayloadSize: 1
  176. )
  177. let channel = EmbeddedChannel(handler: handler)
  178. // Receive client's initial metadata without TE
  179. let clientInitialMetadata: HPACKHeaders = [
  180. GRPCHTTP2Keys.path.rawValue: "test/test",
  181. GRPCHTTP2Keys.scheme.rawValue: "http",
  182. GRPCHTTP2Keys.method.rawValue: "POST",
  183. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  184. ]
  185. XCTAssertNoThrow(
  186. try channel.writeInbound(
  187. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  188. )
  189. )
  190. // Make sure we have sent a trailers-only response
  191. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  192. XCTAssertEqual(
  193. writtenTrailersOnlyResponse.headers,
  194. [
  195. GRPCHTTP2Keys.status.rawValue: "200",
  196. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  197. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  198. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  199. "\"te\" header is expected to be present and have a value of \"trailers\".",
  200. ]
  201. )
  202. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  203. }
  204. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  205. let handler = GRPCServerStreamHandler(
  206. scheme: .http,
  207. acceptedEncodings: [],
  208. maximumPayloadSize: 100
  209. )
  210. let channel = EmbeddedChannel(handler: handler)
  211. // Receive client's initial metadata
  212. let clientInitialMetadata: HPACKHeaders = [
  213. GRPCHTTP2Keys.path.rawValue: "test/test",
  214. GRPCHTTP2Keys.scheme.rawValue: "http",
  215. GRPCHTTP2Keys.method.rawValue: "POST",
  216. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  217. GRPCHTTP2Keys.te.rawValue: "trailers",
  218. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  219. ]
  220. XCTAssertNoThrow(
  221. try channel.writeInbound(
  222. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  223. )
  224. )
  225. // Make sure we haven't sent back an error response, and that we read the initial metadata
  226. // Make sure we have sent a trailers-only response
  227. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  228. XCTAssertEqual(
  229. writtenTrailersOnlyResponse.headers,
  230. [
  231. GRPCHTTP2Keys.status.rawValue: "200",
  232. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  233. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  234. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Compression is not supported",
  235. ]
  236. )
  237. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  238. }
  239. func testOverMaximumPayloadSize() throws {
  240. let handler = GRPCServerStreamHandler(
  241. scheme: .http,
  242. acceptedEncodings: [],
  243. maximumPayloadSize: 1
  244. )
  245. let channel = EmbeddedChannel(handler: handler)
  246. // Receive client's initial metadata
  247. let clientInitialMetadata: HPACKHeaders = [
  248. GRPCHTTP2Keys.path.rawValue: "test/test",
  249. GRPCHTTP2Keys.scheme.rawValue: "http",
  250. GRPCHTTP2Keys.method.rawValue: "POST",
  251. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  252. GRPCHTTP2Keys.te.rawValue: "trailers",
  253. ]
  254. XCTAssertNoThrow(
  255. try channel.writeInbound(
  256. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  257. )
  258. )
  259. // Make sure we haven't sent back an error response, and that we read the initial metadata
  260. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  261. XCTAssertEqual(
  262. try channel.readInbound(as: RPCRequestPart.self),
  263. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  264. )
  265. // Write back server's initial metadata
  266. let headers: HPACKHeaders = [
  267. "some-custom-header": "some-custom-value"
  268. ]
  269. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  270. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  271. // Make sure we wrote back the initial metadata
  272. let writtenHeaders = try channel.assertReadHeadersOutbound()
  273. XCTAssertEqual(
  274. writtenHeaders.headers,
  275. [
  276. GRPCHTTP2Keys.status.rawValue: "200",
  277. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  278. "some-custom-header": "some-custom-value",
  279. ]
  280. )
  281. // Receive client's message
  282. var buffer = ByteBuffer()
  283. buffer.writeInteger(UInt8(0)) // not compressed
  284. buffer.writeInteger(UInt32(42)) // message length
  285. buffer.writeRepeatingByte(0, count: 42) // message
  286. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  287. XCTAssertThrowsError(
  288. ofType: RPCError.self,
  289. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  290. ) { error in
  291. XCTAssertEqual(error.code, .resourceExhausted)
  292. XCTAssertEqual(
  293. error.message,
  294. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  295. )
  296. }
  297. // Make sure we haven't sent a response back and that we didn't read the received message
  298. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  299. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  300. }
  301. func testClientEndsStream() throws {
  302. let handler = GRPCServerStreamHandler(
  303. scheme: .http,
  304. acceptedEncodings: [],
  305. maximumPayloadSize: 100,
  306. skipStateMachineAssertions: true
  307. )
  308. let channel = EmbeddedChannel(handler: handler)
  309. // Receive client's initial metadata with end stream set
  310. let clientInitialMetadata: HPACKHeaders = [
  311. GRPCHTTP2Keys.path.rawValue: "test/test",
  312. GRPCHTTP2Keys.scheme.rawValue: "http",
  313. GRPCHTTP2Keys.method.rawValue: "POST",
  314. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  315. GRPCHTTP2Keys.te.rawValue: "trailers",
  316. ]
  317. XCTAssertNoThrow(
  318. try channel.writeInbound(
  319. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  320. )
  321. )
  322. // Make sure we haven't sent back an error response, and that we read the initial metadata
  323. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  324. XCTAssertEqual(
  325. try channel.readInbound(as: RPCRequestPart.self),
  326. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  327. )
  328. // Write back server's initial metadata
  329. let headers: HPACKHeaders = [
  330. "some-custom-header": "some-custom-value"
  331. ]
  332. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  333. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  334. // Make sure we wrote back the initial metadata
  335. let writtenHeaders = try channel.assertReadHeadersOutbound()
  336. XCTAssertEqual(
  337. writtenHeaders.headers,
  338. [
  339. GRPCHTTP2Keys.status.rawValue: "200",
  340. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  341. "some-custom-header": "some-custom-value",
  342. ]
  343. )
  344. // We should throw if the client sends another message, since it's closed the stream already.
  345. var buffer = ByteBuffer()
  346. buffer.writeInteger(UInt8(0)) // not compressed
  347. buffer.writeInteger(UInt32(42)) // message length
  348. buffer.writeRepeatingByte(0, count: 42) // message
  349. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  350. XCTAssertThrowsError(
  351. ofType: RPCError.self,
  352. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  353. ) { error in
  354. XCTAssertEqual(error.code, .internalError)
  355. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  356. }
  357. }
  358. func testNormalFlow() throws {
  359. let handler = GRPCServerStreamHandler(
  360. scheme: .http,
  361. acceptedEncodings: [],
  362. maximumPayloadSize: 100
  363. )
  364. let channel = EmbeddedChannel(handler: handler)
  365. // Receive client's initial metadata
  366. let clientInitialMetadata: HPACKHeaders = [
  367. GRPCHTTP2Keys.path.rawValue: "test/test",
  368. GRPCHTTP2Keys.scheme.rawValue: "http",
  369. GRPCHTTP2Keys.method.rawValue: "POST",
  370. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  371. GRPCHTTP2Keys.te.rawValue: "trailers",
  372. ]
  373. XCTAssertNoThrow(
  374. try channel.writeInbound(
  375. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  376. )
  377. )
  378. // Make sure we haven't sent back an error response, and that we read the initial metadata
  379. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  380. XCTAssertEqual(
  381. try channel.readInbound(as: RPCRequestPart.self),
  382. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  383. )
  384. // Write back server's initial metadata
  385. let headers: HPACKHeaders = [
  386. "some-custom-header": "some-custom-value"
  387. ]
  388. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  389. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  390. // Make sure we wrote back the initial metadata
  391. let writtenHeaders = try channel.assertReadHeadersOutbound()
  392. XCTAssertEqual(
  393. writtenHeaders.headers,
  394. [
  395. GRPCHTTP2Keys.status.rawValue: "200",
  396. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  397. "some-custom-header": "some-custom-value",
  398. ]
  399. )
  400. // Receive client's message
  401. var buffer = ByteBuffer()
  402. buffer.writeInteger(UInt8(0)) // not compressed
  403. buffer.writeInteger(UInt32(42)) // message length
  404. buffer.writeRepeatingByte(0, count: 42) // message
  405. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  406. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  407. // Make sure we haven't sent back an error response, and that we read the message properly
  408. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  409. XCTAssertEqual(
  410. try channel.readInbound(as: RPCRequestPart.self),
  411. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  412. )
  413. // Write back response
  414. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  415. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  416. // Make sure we wrote back the right message
  417. let writtenMessage = try channel.assertReadDataOutbound()
  418. var expectedBuffer = ByteBuffer()
  419. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  420. expectedBuffer.writeInteger(UInt32(42)) // message length
  421. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  422. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  423. // Send back status to end RPC
  424. let trailers = RPCResponsePart.status(
  425. .init(code: .dataLoss, message: "Test data loss"),
  426. ["custom-header": "custom-value"]
  427. )
  428. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  429. // Make sure we wrote back the status and trailers
  430. let writtenStatus = try channel.assertReadHeadersOutbound()
  431. XCTAssertTrue(writtenStatus.endStream)
  432. XCTAssertEqual(
  433. writtenStatus.headers,
  434. [
  435. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  436. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  437. "custom-header": "custom-value",
  438. ]
  439. )
  440. }
  441. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  442. let handler = GRPCServerStreamHandler(
  443. scheme: .http,
  444. acceptedEncodings: [],
  445. maximumPayloadSize: 100
  446. )
  447. let channel = EmbeddedChannel(handler: handler)
  448. // Receive client's initial metadata
  449. let clientInitialMetadata: HPACKHeaders = [
  450. GRPCHTTP2Keys.path.rawValue: "test/test",
  451. GRPCHTTP2Keys.scheme.rawValue: "http",
  452. GRPCHTTP2Keys.method.rawValue: "POST",
  453. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  454. GRPCHTTP2Keys.te.rawValue: "trailers",
  455. ]
  456. XCTAssertNoThrow(
  457. try channel.writeInbound(
  458. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  459. )
  460. )
  461. // Make sure we haven't sent back an error response, and that we read the initial metadata
  462. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  463. XCTAssertEqual(
  464. try channel.readInbound(as: RPCRequestPart.self),
  465. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  466. )
  467. // Write back server's initial metadata
  468. let headers: HPACKHeaders = [
  469. "some-custom-header": "some-custom-value"
  470. ]
  471. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  472. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  473. // Make sure we wrote back the initial metadata
  474. let writtenHeaders = try channel.assertReadHeadersOutbound()
  475. XCTAssertEqual(
  476. writtenHeaders.headers,
  477. [
  478. GRPCHTTP2Keys.status.rawValue: "200",
  479. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  480. "some-custom-header": "some-custom-value",
  481. ]
  482. )
  483. // Receive client's first message
  484. var buffer = ByteBuffer()
  485. buffer.writeInteger(UInt8(0)) // not compressed
  486. XCTAssertNoThrow(
  487. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  488. )
  489. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  490. buffer.clear()
  491. buffer.writeInteger(UInt32(30)) // message length
  492. XCTAssertNoThrow(
  493. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  494. )
  495. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  496. buffer.clear()
  497. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  498. XCTAssertNoThrow(
  499. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  500. )
  501. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  502. buffer.clear()
  503. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  504. XCTAssertNoThrow(
  505. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  506. )
  507. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  508. buffer.clear()
  509. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  510. XCTAssertNoThrow(
  511. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  512. )
  513. // Make sure we haven't sent back an error response, and that we read the message properly
  514. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  515. XCTAssertEqual(
  516. try channel.readInbound(as: RPCRequestPart.self),
  517. RPCRequestPart.message(
  518. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  519. + [UInt8](repeating: 2, count: 10)
  520. )
  521. )
  522. }
  523. func testSendMultipleMessagesInSingleBuffer() throws {
  524. let handler = GRPCServerStreamHandler(
  525. scheme: .http,
  526. acceptedEncodings: [],
  527. maximumPayloadSize: 100
  528. )
  529. let channel = EmbeddedChannel(handler: handler)
  530. // Receive client's initial metadata
  531. let clientInitialMetadata: HPACKHeaders = [
  532. GRPCHTTP2Keys.path.rawValue: "test/test",
  533. GRPCHTTP2Keys.scheme.rawValue: "http",
  534. GRPCHTTP2Keys.method.rawValue: "POST",
  535. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  536. GRPCHTTP2Keys.te.rawValue: "trailers",
  537. ]
  538. XCTAssertNoThrow(
  539. try channel.writeInbound(
  540. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  541. )
  542. )
  543. // Make sure we haven't sent back an error response, and that we read the initial metadata
  544. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  545. XCTAssertEqual(
  546. try channel.readInbound(as: RPCRequestPart.self),
  547. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  548. )
  549. // Write back server's initial metadata
  550. let headers: HPACKHeaders = [
  551. "some-custom-header": "some-custom-value"
  552. ]
  553. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  554. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  555. // Read out the metadata
  556. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  557. // This is where this test actually begins. We want to write two messages
  558. // without flushing, and make sure that no messages are sent down the pipeline
  559. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  560. // Write back first message and make sure nothing's written in the channel.
  561. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  562. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  563. // Write back second message and make sure nothing's written in the channel.
  564. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  565. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  566. // Now flush and check we *do* write the data.
  567. channel.flush()
  568. let writtenMessage = try channel.assertReadDataOutbound()
  569. // Make sure both messages have been framed together in the ByteBuffer.
  570. XCTAssertEqual(
  571. writtenMessage.data,
  572. .byteBuffer(
  573. .init(bytes: [
  574. // First message
  575. 0, // Compression disabled
  576. 0, 0, 0, 4, // Message length
  577. 1, 1, 1, 1, // First message data
  578. // Second message
  579. 0, // Compression disabled
  580. 0, 0, 0, 4, // Message length
  581. 2, 2, 2, 2, // Second message data
  582. ])
  583. )
  584. )
  585. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  586. }
  587. func testMessageAndStatusAreNotReordered() throws {
  588. let handler = GRPCServerStreamHandler(
  589. scheme: .http,
  590. acceptedEncodings: [],
  591. maximumPayloadSize: 100
  592. )
  593. let channel = EmbeddedChannel(handler: handler)
  594. // Receive client's initial metadata
  595. let clientInitialMetadata: HPACKHeaders = [
  596. GRPCHTTP2Keys.path.rawValue: "test/test",
  597. GRPCHTTP2Keys.scheme.rawValue: "http",
  598. GRPCHTTP2Keys.method.rawValue: "POST",
  599. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  600. GRPCHTTP2Keys.te.rawValue: "trailers",
  601. ]
  602. XCTAssertNoThrow(
  603. try channel.writeInbound(
  604. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  605. )
  606. )
  607. // Make sure we haven't sent back an error response, and that we read the initial metadata
  608. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  609. XCTAssertEqual(
  610. try channel.readInbound(as: RPCRequestPart.self),
  611. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  612. )
  613. // Write back server's initial metadata
  614. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  615. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  616. // Read out the metadata
  617. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  618. // This is where this test actually begins. We want to write a message followed
  619. // by status and trailers, and only flush after both writes.
  620. // Because messages are buffered and potentially bundled together in a single
  621. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  622. // and trailers won't be written before the messages.
  623. // Write back message and make sure nothing's written in the channel.
  624. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  625. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  626. // Write status + metadata and make sure nothing's written.
  627. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  628. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  629. // Now flush and check we *do* write the data in the right order: message first,
  630. // trailers second.
  631. channel.flush()
  632. let writtenMessage = try channel.assertReadDataOutbound()
  633. // Make sure we first get message.
  634. XCTAssertEqual(
  635. writtenMessage.data,
  636. .byteBuffer(
  637. .init(bytes: [
  638. // First message
  639. 0, // Compression disabled
  640. 0, 0, 0, 4, // Message length
  641. 1, 1, 1, 1, // First message data
  642. ])
  643. )
  644. )
  645. XCTAssertFalse(writtenMessage.endStream)
  646. // Make sure we get trailers.
  647. let writtenTrailers = try channel.assertReadHeadersOutbound()
  648. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  649. XCTAssertTrue(writtenTrailers.endStream)
  650. // Make sure we get nothing else.
  651. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  652. }
  653. }
  654. extension EmbeddedChannel {
  655. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  656. guard
  657. case .headers(let writtenHeaders) = try XCTUnwrap(
  658. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  659. )
  660. else {
  661. throw TestError.assertionFailure("Expected to write headers")
  662. }
  663. return writtenHeaders
  664. }
  665. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  666. guard
  667. case .data(let writtenMessage) = try XCTUnwrap(
  668. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  669. )
  670. else {
  671. throw TestError.assertionFailure("Expected to write data")
  672. }
  673. return writtenMessage
  674. }
  675. }
  676. private enum TestError: Error {
  677. case assertionFailure(String)
  678. }