GRPCServerStreamHandlerTests.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCServerStreamHandler(
  27. scheme: .http,
  28. acceptedEncodings: [],
  29. maximumPayloadSize: 1
  30. )
  31. let channel = EmbeddedChannel(handler: handler)
  32. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  33. .ping(.init(), ack: false),
  34. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  35. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  36. // initialiser is internal, so I can't create one of these frames.
  37. .rstStream(.cancel),
  38. .settings(.ack),
  39. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  40. .windowUpdate(windowSizeIncrement: 4),
  41. .alternativeService(origin: nil, field: nil),
  42. .origin([]),
  43. ]
  44. for toBeIgnored in framesToBeIgnored {
  45. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  46. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  47. }
  48. }
  49. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  50. let handler = GRPCServerStreamHandler(
  51. scheme: .http,
  52. acceptedEncodings: [],
  53. maximumPayloadSize: 1
  54. )
  55. let channel = EmbeddedChannel(handler: handler)
  56. // Receive client's initial metadata without content-type
  57. let clientInitialMetadata: HPACKHeaders = [
  58. GRPCHTTP2Keys.path.rawValue: "test/test",
  59. GRPCHTTP2Keys.scheme.rawValue: "http",
  60. GRPCHTTP2Keys.method.rawValue: "POST",
  61. GRPCHTTP2Keys.te.rawValue: "trailers",
  62. ]
  63. XCTAssertNoThrow(
  64. try channel.writeInbound(
  65. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  66. )
  67. )
  68. // Make sure we have sent a trailers-only response
  69. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  70. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  71. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  72. }
  73. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  74. let handler = GRPCServerStreamHandler(
  75. scheme: .http,
  76. acceptedEncodings: [],
  77. maximumPayloadSize: 1
  78. )
  79. let channel = EmbeddedChannel(handler: handler)
  80. // Receive client's initial metadata without :method
  81. let clientInitialMetadata: HPACKHeaders = [
  82. GRPCHTTP2Keys.path.rawValue: "test/test",
  83. GRPCHTTP2Keys.scheme.rawValue: "http",
  84. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  85. GRPCHTTP2Keys.te.rawValue: "trailers",
  86. ]
  87. XCTAssertNoThrow(
  88. try channel.writeInbound(
  89. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  90. )
  91. )
  92. // Make sure we have sent a trailers-only response
  93. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  94. XCTAssertEqual(
  95. writtenTrailersOnlyResponse.headers,
  96. [
  97. GRPCHTTP2Keys.status.rawValue: "200",
  98. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  99. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  100. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  101. ":method header is expected to be present and have a value of \"POST\".",
  102. ]
  103. )
  104. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  105. }
  106. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  107. let handler = GRPCServerStreamHandler(
  108. scheme: .http,
  109. acceptedEncodings: [],
  110. maximumPayloadSize: 1
  111. )
  112. let channel = EmbeddedChannel(handler: handler)
  113. // Receive client's initial metadata without :scheme
  114. let clientInitialMetadata: HPACKHeaders = [
  115. GRPCHTTP2Keys.path.rawValue: "test/test",
  116. GRPCHTTP2Keys.method.rawValue: "POST",
  117. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  118. GRPCHTTP2Keys.te.rawValue: "trailers",
  119. ]
  120. XCTAssertNoThrow(
  121. try channel.writeInbound(
  122. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  123. )
  124. )
  125. // Make sure we have sent a trailers-only response
  126. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  127. XCTAssertEqual(
  128. writtenTrailersOnlyResponse.headers,
  129. [
  130. GRPCHTTP2Keys.status.rawValue: "200",
  131. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  132. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  133. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  134. ":scheme header must be present and one of \"http\" or \"https\".",
  135. ]
  136. )
  137. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  138. }
  139. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  140. let handler = GRPCServerStreamHandler(
  141. scheme: .http,
  142. acceptedEncodings: [],
  143. maximumPayloadSize: 1
  144. )
  145. let channel = EmbeddedChannel(handler: handler)
  146. // Receive client's initial metadata without :path
  147. let clientInitialMetadata: HPACKHeaders = [
  148. GRPCHTTP2Keys.scheme.rawValue: "http",
  149. GRPCHTTP2Keys.method.rawValue: "POST",
  150. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  151. GRPCHTTP2Keys.te.rawValue: "trailers",
  152. ]
  153. XCTAssertNoThrow(
  154. try channel.writeInbound(
  155. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  156. )
  157. )
  158. // Make sure we have sent a trailers-only response
  159. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  160. XCTAssertEqual(
  161. writtenTrailersOnlyResponse.headers,
  162. [
  163. GRPCHTTP2Keys.status.rawValue: "200",
  164. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  165. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  166. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  167. ]
  168. )
  169. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  170. }
  171. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  172. let handler = GRPCServerStreamHandler(
  173. scheme: .http,
  174. acceptedEncodings: [],
  175. maximumPayloadSize: 1
  176. )
  177. let channel = EmbeddedChannel(handler: handler)
  178. // Receive client's initial metadata without TE
  179. let clientInitialMetadata: HPACKHeaders = [
  180. GRPCHTTP2Keys.path.rawValue: "test/test",
  181. GRPCHTTP2Keys.scheme.rawValue: "http",
  182. GRPCHTTP2Keys.method.rawValue: "POST",
  183. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  184. ]
  185. XCTAssertNoThrow(
  186. try channel.writeInbound(
  187. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  188. )
  189. )
  190. // Make sure we have sent a trailers-only response
  191. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  192. XCTAssertEqual(
  193. writtenTrailersOnlyResponse.headers,
  194. [
  195. GRPCHTTP2Keys.status.rawValue: "200",
  196. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  197. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  198. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  199. "\"te\" header is expected to be present and have a value of \"trailers\".",
  200. ]
  201. )
  202. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  203. }
  204. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  205. let handler = GRPCServerStreamHandler(
  206. scheme: .http,
  207. acceptedEncodings: [],
  208. maximumPayloadSize: 100
  209. )
  210. let channel = EmbeddedChannel(handler: handler)
  211. // Receive client's initial metadata
  212. let clientInitialMetadata: HPACKHeaders = [
  213. GRPCHTTP2Keys.path.rawValue: "test/test",
  214. GRPCHTTP2Keys.scheme.rawValue: "http",
  215. GRPCHTTP2Keys.method.rawValue: "POST",
  216. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  217. GRPCHTTP2Keys.te.rawValue: "trailers",
  218. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  219. ]
  220. XCTAssertNoThrow(
  221. try channel.writeInbound(
  222. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  223. )
  224. )
  225. // Make sure we have sent a trailers-only response
  226. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  227. XCTAssertEqual(
  228. writtenTrailersOnlyResponse.headers,
  229. [
  230. GRPCHTTP2Keys.status.rawValue: "200",
  231. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  232. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  233. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Compression is not supported",
  234. ]
  235. )
  236. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  237. }
  238. func testOverMaximumPayloadSize() throws {
  239. let handler = GRPCServerStreamHandler(
  240. scheme: .http,
  241. acceptedEncodings: [],
  242. maximumPayloadSize: 1
  243. )
  244. let channel = EmbeddedChannel(handler: handler)
  245. // Receive client's initial metadata
  246. let clientInitialMetadata: HPACKHeaders = [
  247. GRPCHTTP2Keys.path.rawValue: "test/test",
  248. GRPCHTTP2Keys.scheme.rawValue: "http",
  249. GRPCHTTP2Keys.method.rawValue: "POST",
  250. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  251. GRPCHTTP2Keys.te.rawValue: "trailers",
  252. ]
  253. XCTAssertNoThrow(
  254. try channel.writeInbound(
  255. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  256. )
  257. )
  258. // Make sure we haven't sent back an error response, and that we read the initial metadata
  259. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  260. XCTAssertEqual(
  261. try channel.readInbound(as: RPCRequestPart.self),
  262. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  263. )
  264. // Write back server's initial metadata
  265. let headers: HPACKHeaders = [
  266. "some-custom-header": "some-custom-value"
  267. ]
  268. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  269. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  270. // Make sure we wrote back the initial metadata
  271. let writtenHeaders = try channel.assertReadHeadersOutbound()
  272. XCTAssertEqual(
  273. writtenHeaders.headers,
  274. [
  275. GRPCHTTP2Keys.status.rawValue: "200",
  276. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  277. "some-custom-header": "some-custom-value",
  278. ]
  279. )
  280. // Receive client's message
  281. var buffer = ByteBuffer()
  282. buffer.writeInteger(UInt8(0)) // not compressed
  283. buffer.writeInteger(UInt32(42)) // message length
  284. buffer.writeRepeatingByte(0, count: 42) // message
  285. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  286. XCTAssertThrowsError(
  287. ofType: RPCError.self,
  288. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  289. ) { error in
  290. XCTAssertEqual(error.code, .resourceExhausted)
  291. XCTAssertEqual(
  292. error.message,
  293. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  294. )
  295. }
  296. // Make sure we haven't sent a response back and that we didn't read the received message
  297. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  298. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  299. }
  300. func testClientEndsStream() throws {
  301. let handler = GRPCServerStreamHandler(
  302. scheme: .http,
  303. acceptedEncodings: [],
  304. maximumPayloadSize: 100,
  305. skipStateMachineAssertions: true
  306. )
  307. let channel = EmbeddedChannel(handler: handler)
  308. // Receive client's initial metadata with end stream set
  309. let clientInitialMetadata: HPACKHeaders = [
  310. GRPCHTTP2Keys.path.rawValue: "test/test",
  311. GRPCHTTP2Keys.scheme.rawValue: "http",
  312. GRPCHTTP2Keys.method.rawValue: "POST",
  313. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  314. GRPCHTTP2Keys.te.rawValue: "trailers",
  315. ]
  316. XCTAssertNoThrow(
  317. try channel.writeInbound(
  318. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  319. )
  320. )
  321. // Make sure we haven't sent back an error response, and that we read the initial metadata
  322. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  323. XCTAssertEqual(
  324. try channel.readInbound(as: RPCRequestPart.self),
  325. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  326. )
  327. // Write back server's initial metadata
  328. let headers: HPACKHeaders = [
  329. "some-custom-header": "some-custom-value"
  330. ]
  331. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  332. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  333. // Make sure we wrote back the initial metadata
  334. let writtenHeaders = try channel.assertReadHeadersOutbound()
  335. XCTAssertEqual(
  336. writtenHeaders.headers,
  337. [
  338. GRPCHTTP2Keys.status.rawValue: "200",
  339. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  340. "some-custom-header": "some-custom-value",
  341. ]
  342. )
  343. // We should throw if the client sends another message, since it's closed the stream already.
  344. var buffer = ByteBuffer()
  345. buffer.writeInteger(UInt8(0)) // not compressed
  346. buffer.writeInteger(UInt32(42)) // message length
  347. buffer.writeRepeatingByte(0, count: 42) // message
  348. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  349. XCTAssertThrowsError(
  350. ofType: RPCError.self,
  351. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  352. ) { error in
  353. XCTAssertEqual(error.code, .internalError)
  354. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  355. }
  356. }
  357. func testNormalFlow() throws {
  358. let handler = GRPCServerStreamHandler(
  359. scheme: .http,
  360. acceptedEncodings: [],
  361. maximumPayloadSize: 100,
  362. skipStateMachineAssertions: true
  363. )
  364. let channel = EmbeddedChannel(handler: handler)
  365. // Receive client's initial metadata
  366. let clientInitialMetadata: HPACKHeaders = [
  367. GRPCHTTP2Keys.path.rawValue: "test/test",
  368. GRPCHTTP2Keys.scheme.rawValue: "http",
  369. GRPCHTTP2Keys.method.rawValue: "POST",
  370. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  371. GRPCHTTP2Keys.te.rawValue: "trailers",
  372. ]
  373. XCTAssertNoThrow(
  374. try channel.writeInbound(
  375. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  376. )
  377. )
  378. // Make sure we haven't sent back an error response, and that we read the initial metadata
  379. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  380. XCTAssertEqual(
  381. try channel.readInbound(as: RPCRequestPart.self),
  382. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  383. )
  384. // Write back server's initial metadata
  385. let headers: HPACKHeaders = [
  386. "some-custom-header": "some-custom-value"
  387. ]
  388. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  389. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  390. // Make sure we wrote back the initial metadata
  391. let writtenHeaders = try channel.assertReadHeadersOutbound()
  392. XCTAssertEqual(
  393. writtenHeaders.headers,
  394. [
  395. GRPCHTTP2Keys.status.rawValue: "200",
  396. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  397. "some-custom-header": "some-custom-value",
  398. ]
  399. )
  400. // Receive client's message
  401. var buffer = ByteBuffer()
  402. buffer.writeInteger(UInt8(0)) // not compressed
  403. buffer.writeInteger(UInt32(42)) // message length
  404. buffer.writeRepeatingByte(0, count: 42) // message
  405. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  406. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  407. // Make sure we haven't sent back an error response, and that we read the message properly
  408. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  409. XCTAssertEqual(
  410. try channel.readInbound(as: RPCRequestPart.self),
  411. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  412. )
  413. // Write back response
  414. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  415. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  416. // Make sure we wrote back the right message
  417. let writtenMessage = try channel.assertReadDataOutbound()
  418. var expectedBuffer = ByteBuffer()
  419. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  420. expectedBuffer.writeInteger(UInt32(42)) // message length
  421. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  422. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  423. // Send back status to end RPC
  424. let trailers = RPCResponsePart.status(
  425. .init(code: .dataLoss, message: "Test data loss"),
  426. ["custom-header": "custom-value"]
  427. )
  428. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  429. // Make sure we wrote back the status and trailers
  430. let writtenStatus = try channel.assertReadHeadersOutbound()
  431. XCTAssertTrue(writtenStatus.endStream)
  432. XCTAssertEqual(
  433. writtenStatus.headers,
  434. [
  435. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  436. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  437. "custom-header": "custom-value",
  438. ]
  439. )
  440. // Try writing and assert it throws to make sure we don't allow writes
  441. // after closing.
  442. XCTAssertThrowsError(
  443. ofType: RPCError.self,
  444. try channel.writeOutbound(trailers)
  445. ) { error in
  446. XCTAssertEqual(error.code, .internalError)
  447. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  448. }
  449. }
  450. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  451. let handler = GRPCServerStreamHandler(
  452. scheme: .http,
  453. acceptedEncodings: [],
  454. maximumPayloadSize: 100
  455. )
  456. let channel = EmbeddedChannel(handler: handler)
  457. // Receive client's initial metadata
  458. let clientInitialMetadata: HPACKHeaders = [
  459. GRPCHTTP2Keys.path.rawValue: "test/test",
  460. GRPCHTTP2Keys.scheme.rawValue: "http",
  461. GRPCHTTP2Keys.method.rawValue: "POST",
  462. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  463. GRPCHTTP2Keys.te.rawValue: "trailers",
  464. ]
  465. XCTAssertNoThrow(
  466. try channel.writeInbound(
  467. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  468. )
  469. )
  470. // Make sure we haven't sent back an error response, and that we read the initial metadata
  471. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  472. XCTAssertEqual(
  473. try channel.readInbound(as: RPCRequestPart.self),
  474. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  475. )
  476. // Write back server's initial metadata
  477. let headers: HPACKHeaders = [
  478. "some-custom-header": "some-custom-value"
  479. ]
  480. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  481. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  482. // Make sure we wrote back the initial metadata
  483. let writtenHeaders = try channel.assertReadHeadersOutbound()
  484. XCTAssertEqual(
  485. writtenHeaders.headers,
  486. [
  487. GRPCHTTP2Keys.status.rawValue: "200",
  488. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  489. "some-custom-header": "some-custom-value",
  490. ]
  491. )
  492. // Receive client's first message
  493. var buffer = ByteBuffer()
  494. buffer.writeInteger(UInt8(0)) // not compressed
  495. XCTAssertNoThrow(
  496. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  497. )
  498. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  499. buffer.clear()
  500. buffer.writeInteger(UInt32(30)) // message length
  501. XCTAssertNoThrow(
  502. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  503. )
  504. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  505. buffer.clear()
  506. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  507. XCTAssertNoThrow(
  508. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  509. )
  510. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  511. buffer.clear()
  512. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  513. XCTAssertNoThrow(
  514. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  515. )
  516. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  517. buffer.clear()
  518. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  519. XCTAssertNoThrow(
  520. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  521. )
  522. // Make sure we haven't sent back an error response, and that we read the message properly
  523. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  524. XCTAssertEqual(
  525. try channel.readInbound(as: RPCRequestPart.self),
  526. RPCRequestPart.message(
  527. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  528. + [UInt8](repeating: 2, count: 10)
  529. )
  530. )
  531. }
  532. func testSendMultipleMessagesInSingleBuffer() throws {
  533. let handler = GRPCServerStreamHandler(
  534. scheme: .http,
  535. acceptedEncodings: [],
  536. maximumPayloadSize: 100
  537. )
  538. let channel = EmbeddedChannel(handler: handler)
  539. // Receive client's initial metadata
  540. let clientInitialMetadata: HPACKHeaders = [
  541. GRPCHTTP2Keys.path.rawValue: "test/test",
  542. GRPCHTTP2Keys.scheme.rawValue: "http",
  543. GRPCHTTP2Keys.method.rawValue: "POST",
  544. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  545. GRPCHTTP2Keys.te.rawValue: "trailers",
  546. ]
  547. XCTAssertNoThrow(
  548. try channel.writeInbound(
  549. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  550. )
  551. )
  552. // Make sure we haven't sent back an error response, and that we read the initial metadata
  553. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  554. XCTAssertEqual(
  555. try channel.readInbound(as: RPCRequestPart.self),
  556. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  557. )
  558. // Write back server's initial metadata
  559. let headers: HPACKHeaders = [
  560. "some-custom-header": "some-custom-value"
  561. ]
  562. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  563. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  564. // Read out the metadata
  565. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  566. // This is where this test actually begins. We want to write two messages
  567. // without flushing, and make sure that no messages are sent down the pipeline
  568. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  569. // Write back first message and make sure nothing's written in the channel.
  570. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  571. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  572. // Write back second message and make sure nothing's written in the channel.
  573. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  574. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  575. // Now flush and check we *do* write the data.
  576. channel.flush()
  577. let writtenMessage = try channel.assertReadDataOutbound()
  578. // Make sure both messages have been framed together in the ByteBuffer.
  579. XCTAssertEqual(
  580. writtenMessage.data,
  581. .byteBuffer(
  582. .init(bytes: [
  583. // First message
  584. 0, // Compression disabled
  585. 0, 0, 0, 4, // Message length
  586. 1, 1, 1, 1, // First message data
  587. // Second message
  588. 0, // Compression disabled
  589. 0, 0, 0, 4, // Message length
  590. 2, 2, 2, 2, // Second message data
  591. ])
  592. )
  593. )
  594. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  595. }
  596. func testMessageAndStatusAreNotReordered() throws {
  597. let handler = GRPCServerStreamHandler(
  598. scheme: .http,
  599. acceptedEncodings: [],
  600. maximumPayloadSize: 100
  601. )
  602. let channel = EmbeddedChannel(handler: handler)
  603. // Receive client's initial metadata
  604. let clientInitialMetadata: HPACKHeaders = [
  605. GRPCHTTP2Keys.path.rawValue: "test/test",
  606. GRPCHTTP2Keys.scheme.rawValue: "http",
  607. GRPCHTTP2Keys.method.rawValue: "POST",
  608. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  609. GRPCHTTP2Keys.te.rawValue: "trailers",
  610. ]
  611. XCTAssertNoThrow(
  612. try channel.writeInbound(
  613. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  614. )
  615. )
  616. // Make sure we haven't sent back an error response, and that we read the initial metadata
  617. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  618. XCTAssertEqual(
  619. try channel.readInbound(as: RPCRequestPart.self),
  620. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  621. )
  622. // Write back server's initial metadata
  623. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  624. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  625. // Read out the metadata
  626. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  627. // This is where this test actually begins. We want to write a message followed
  628. // by status and trailers, and only flush after both writes.
  629. // Because messages are buffered and potentially bundled together in a single
  630. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  631. // and trailers won't be written before the messages.
  632. // Write back message and make sure nothing's written in the channel.
  633. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  634. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  635. // Write status + metadata and make sure nothing's written.
  636. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  637. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  638. // Now flush and check we *do* write the data in the right order: message first,
  639. // trailers second.
  640. channel.flush()
  641. let writtenMessage = try channel.assertReadDataOutbound()
  642. // Make sure we first get message.
  643. XCTAssertEqual(
  644. writtenMessage.data,
  645. .byteBuffer(
  646. .init(bytes: [
  647. // First message
  648. 0, // Compression disabled
  649. 0, 0, 0, 4, // Message length
  650. 1, 1, 1, 1, // First message data
  651. ])
  652. )
  653. )
  654. XCTAssertFalse(writtenMessage.endStream)
  655. // Make sure we get trailers.
  656. let writtenTrailers = try channel.assertReadHeadersOutbound()
  657. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  658. XCTAssertTrue(writtenTrailers.endStream)
  659. // Make sure we get nothing else.
  660. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  661. }
  662. }
  663. extension EmbeddedChannel {
  664. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  665. guard
  666. case .headers(let writtenHeaders) = try XCTUnwrap(
  667. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  668. )
  669. else {
  670. throw TestError.assertionFailure("Expected to write headers")
  671. }
  672. return writtenHeaders
  673. }
  674. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  675. guard
  676. case .data(let writtenMessage) = try XCTUnwrap(
  677. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  678. )
  679. else {
  680. throw TestError.assertionFailure("Expected to write data")
  681. }
  682. return writtenMessage
  683. }
  684. }
  685. private enum TestError: Error {
  686. case assertionFailure(String)
  687. }