2
0

GRPCServerStreamHandlerTests.swift 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCServerStreamHandler(
  27. scheme: .http,
  28. acceptedEncodings: [],
  29. maximumPayloadSize: 1
  30. )
  31. let channel = EmbeddedChannel(handler: handler)
  32. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  33. .ping(.init(), ack: false),
  34. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  35. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  36. // initialiser is internal, so I can't create one of these frames.
  37. .rstStream(.cancel),
  38. .settings(.ack),
  39. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  40. .windowUpdate(windowSizeIncrement: 4),
  41. .alternativeService(origin: nil, field: nil),
  42. .origin([]),
  43. ]
  44. for toBeIgnored in framesToBeIgnored {
  45. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  46. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  47. }
  48. }
  49. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  50. let handler = GRPCServerStreamHandler(
  51. scheme: .http,
  52. acceptedEncodings: [],
  53. maximumPayloadSize: 1
  54. )
  55. let channel = EmbeddedChannel(handler: handler)
  56. // Receive client's initial metadata without content-type
  57. let clientInitialMetadata: HPACKHeaders = [
  58. GRPCHTTP2Keys.path.rawValue: "test/test",
  59. GRPCHTTP2Keys.scheme.rawValue: "http",
  60. GRPCHTTP2Keys.method.rawValue: "POST",
  61. GRPCHTTP2Keys.te.rawValue: "trailers",
  62. ]
  63. XCTAssertNoThrow(
  64. try channel.writeInbound(
  65. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  66. )
  67. )
  68. // Make sure we have sent a trailers-only response
  69. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  70. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  71. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  72. }
  73. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  74. let handler = GRPCServerStreamHandler(
  75. scheme: .http,
  76. acceptedEncodings: [],
  77. maximumPayloadSize: 1
  78. )
  79. let channel = EmbeddedChannel(handler: handler)
  80. // Receive client's initial metadata without :method
  81. let clientInitialMetadata: HPACKHeaders = [
  82. GRPCHTTP2Keys.path.rawValue: "test/test",
  83. GRPCHTTP2Keys.scheme.rawValue: "http",
  84. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  85. GRPCHTTP2Keys.te.rawValue: "trailers",
  86. ]
  87. XCTAssertNoThrow(
  88. try channel.writeInbound(
  89. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  90. )
  91. )
  92. // Make sure we have sent a trailers-only response
  93. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  94. XCTAssertEqual(
  95. writtenTrailersOnlyResponse.headers,
  96. [
  97. GRPCHTTP2Keys.status.rawValue: "200",
  98. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  99. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  100. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  101. ":method header is expected to be present and have a value of \"POST\".",
  102. ]
  103. )
  104. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  105. }
  106. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  107. let handler = GRPCServerStreamHandler(
  108. scheme: .http,
  109. acceptedEncodings: [],
  110. maximumPayloadSize: 1
  111. )
  112. let channel = EmbeddedChannel(handler: handler)
  113. // Receive client's initial metadata without :scheme
  114. let clientInitialMetadata: HPACKHeaders = [
  115. GRPCHTTP2Keys.path.rawValue: "test/test",
  116. GRPCHTTP2Keys.method.rawValue: "POST",
  117. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  118. GRPCHTTP2Keys.te.rawValue: "trailers",
  119. ]
  120. XCTAssertNoThrow(
  121. try channel.writeInbound(
  122. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  123. )
  124. )
  125. // Make sure we have sent a trailers-only response
  126. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  127. XCTAssertEqual(
  128. writtenTrailersOnlyResponse.headers,
  129. [
  130. GRPCHTTP2Keys.status.rawValue: "200",
  131. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  132. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  133. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  134. ":scheme header must be present and one of \"http\" or \"https\".",
  135. ]
  136. )
  137. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  138. }
  139. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  140. let handler = GRPCServerStreamHandler(
  141. scheme: .http,
  142. acceptedEncodings: [],
  143. maximumPayloadSize: 1
  144. )
  145. let channel = EmbeddedChannel(handler: handler)
  146. // Receive client's initial metadata without :path
  147. let clientInitialMetadata: HPACKHeaders = [
  148. GRPCHTTP2Keys.scheme.rawValue: "http",
  149. GRPCHTTP2Keys.method.rawValue: "POST",
  150. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  151. GRPCHTTP2Keys.te.rawValue: "trailers",
  152. ]
  153. XCTAssertNoThrow(
  154. try channel.writeInbound(
  155. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  156. )
  157. )
  158. // Make sure we have sent a trailers-only response
  159. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  160. XCTAssertEqual(
  161. writtenTrailersOnlyResponse.headers,
  162. [
  163. GRPCHTTP2Keys.status.rawValue: "200",
  164. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  165. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  166. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  167. ]
  168. )
  169. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  170. }
  171. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  172. let handler = GRPCServerStreamHandler(
  173. scheme: .http,
  174. acceptedEncodings: [],
  175. maximumPayloadSize: 1
  176. )
  177. let channel = EmbeddedChannel(handler: handler)
  178. // Receive client's initial metadata without TE
  179. let clientInitialMetadata: HPACKHeaders = [
  180. GRPCHTTP2Keys.path.rawValue: "test/test",
  181. GRPCHTTP2Keys.scheme.rawValue: "http",
  182. GRPCHTTP2Keys.method.rawValue: "POST",
  183. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  184. ]
  185. XCTAssertNoThrow(
  186. try channel.writeInbound(
  187. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  188. )
  189. )
  190. // Make sure we have sent a trailers-only response
  191. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  192. XCTAssertEqual(
  193. writtenTrailersOnlyResponse.headers,
  194. [
  195. GRPCHTTP2Keys.status.rawValue: "200",
  196. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  197. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  198. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  199. "\"te\" header is expected to be present and have a value of \"trailers\".",
  200. ]
  201. )
  202. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  203. }
  204. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  205. let handler = GRPCServerStreamHandler(
  206. scheme: .http,
  207. acceptedEncodings: [],
  208. maximumPayloadSize: 100
  209. )
  210. let channel = EmbeddedChannel(handler: handler)
  211. // Receive client's initial metadata
  212. let clientInitialMetadata: HPACKHeaders = [
  213. GRPCHTTP2Keys.path.rawValue: "test/test",
  214. GRPCHTTP2Keys.scheme.rawValue: "http",
  215. GRPCHTTP2Keys.method.rawValue: "POST",
  216. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  217. GRPCHTTP2Keys.te.rawValue: "trailers",
  218. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  219. ]
  220. XCTAssertNoThrow(
  221. try channel.writeInbound(
  222. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  223. )
  224. )
  225. // Make sure we have sent a trailers-only response
  226. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  227. XCTAssertEqual(
  228. writtenTrailersOnlyResponse.headers,
  229. [
  230. GRPCHTTP2Keys.status.rawValue: "200",
  231. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  232. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  233. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  234. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  235. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  236. ]
  237. )
  238. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  239. }
  240. func testOverMaximumPayloadSize() throws {
  241. let handler = GRPCServerStreamHandler(
  242. scheme: .http,
  243. acceptedEncodings: [],
  244. maximumPayloadSize: 1
  245. )
  246. let channel = EmbeddedChannel(handler: handler)
  247. // Receive client's initial metadata
  248. let clientInitialMetadata: HPACKHeaders = [
  249. GRPCHTTP2Keys.path.rawValue: "test/test",
  250. GRPCHTTP2Keys.scheme.rawValue: "http",
  251. GRPCHTTP2Keys.method.rawValue: "POST",
  252. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  253. GRPCHTTP2Keys.te.rawValue: "trailers",
  254. ]
  255. XCTAssertNoThrow(
  256. try channel.writeInbound(
  257. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  258. )
  259. )
  260. // Make sure we haven't sent back an error response, and that we read the initial metadata
  261. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  262. XCTAssertEqual(
  263. try channel.readInbound(as: RPCRequestPart.self),
  264. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  265. )
  266. // Write back server's initial metadata
  267. let headers: HPACKHeaders = [
  268. "some-custom-header": "some-custom-value"
  269. ]
  270. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  271. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  272. // Make sure we wrote back the initial metadata
  273. let writtenHeaders = try channel.assertReadHeadersOutbound()
  274. XCTAssertEqual(
  275. writtenHeaders.headers,
  276. [
  277. GRPCHTTP2Keys.status.rawValue: "200",
  278. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  279. "some-custom-header": "some-custom-value",
  280. ]
  281. )
  282. // Receive client's message
  283. var buffer = ByteBuffer()
  284. buffer.writeInteger(UInt8(0)) // not compressed
  285. buffer.writeInteger(UInt32(42)) // message length
  286. buffer.writeRepeatingByte(0, count: 42) // message
  287. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  288. XCTAssertThrowsError(
  289. ofType: RPCError.self,
  290. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  291. ) { error in
  292. XCTAssertEqual(error.code, .resourceExhausted)
  293. XCTAssertEqual(
  294. error.message,
  295. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  296. )
  297. }
  298. // Make sure we haven't sent a response back and that we didn't read the received message
  299. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  300. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  301. }
  302. func testClientEndsStream() throws {
  303. let handler = GRPCServerStreamHandler(
  304. scheme: .http,
  305. acceptedEncodings: [],
  306. maximumPayloadSize: 100,
  307. skipStateMachineAssertions: true
  308. )
  309. let channel = EmbeddedChannel(handler: handler)
  310. // Receive client's initial metadata with end stream set
  311. let clientInitialMetadata: HPACKHeaders = [
  312. GRPCHTTP2Keys.path.rawValue: "test/test",
  313. GRPCHTTP2Keys.scheme.rawValue: "http",
  314. GRPCHTTP2Keys.method.rawValue: "POST",
  315. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  316. GRPCHTTP2Keys.te.rawValue: "trailers",
  317. ]
  318. XCTAssertNoThrow(
  319. try channel.writeInbound(
  320. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  321. )
  322. )
  323. // Make sure we haven't sent back an error response, and that we read the initial metadata
  324. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  325. XCTAssertEqual(
  326. try channel.readInbound(as: RPCRequestPart.self),
  327. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  328. )
  329. // Write back server's initial metadata
  330. let headers: HPACKHeaders = [
  331. "some-custom-header": "some-custom-value"
  332. ]
  333. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  334. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  335. // Make sure we wrote back the initial metadata
  336. let writtenHeaders = try channel.assertReadHeadersOutbound()
  337. XCTAssertEqual(
  338. writtenHeaders.headers,
  339. [
  340. GRPCHTTP2Keys.status.rawValue: "200",
  341. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  342. "some-custom-header": "some-custom-value",
  343. ]
  344. )
  345. // We should throw if the client sends another message, since it's closed the stream already.
  346. var buffer = ByteBuffer()
  347. buffer.writeInteger(UInt8(0)) // not compressed
  348. buffer.writeInteger(UInt32(42)) // message length
  349. buffer.writeRepeatingByte(0, count: 42) // message
  350. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  351. XCTAssertThrowsError(
  352. ofType: RPCError.self,
  353. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  354. ) { error in
  355. XCTAssertEqual(error.code, .internalError)
  356. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  357. }
  358. }
  359. func testNormalFlow() throws {
  360. let handler = GRPCServerStreamHandler(
  361. scheme: .http,
  362. acceptedEncodings: [],
  363. maximumPayloadSize: 100,
  364. skipStateMachineAssertions: true
  365. )
  366. let channel = EmbeddedChannel(handler: handler)
  367. // Receive client's initial metadata
  368. let clientInitialMetadata: HPACKHeaders = [
  369. GRPCHTTP2Keys.path.rawValue: "test/test",
  370. GRPCHTTP2Keys.scheme.rawValue: "http",
  371. GRPCHTTP2Keys.method.rawValue: "POST",
  372. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  373. GRPCHTTP2Keys.te.rawValue: "trailers",
  374. ]
  375. XCTAssertNoThrow(
  376. try channel.writeInbound(
  377. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  378. )
  379. )
  380. // Make sure we haven't sent back an error response, and that we read the initial metadata
  381. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  382. XCTAssertEqual(
  383. try channel.readInbound(as: RPCRequestPart.self),
  384. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  385. )
  386. // Write back server's initial metadata
  387. let headers: HPACKHeaders = [
  388. "some-custom-header": "some-custom-value"
  389. ]
  390. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  391. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  392. // Make sure we wrote back the initial metadata
  393. let writtenHeaders = try channel.assertReadHeadersOutbound()
  394. XCTAssertEqual(
  395. writtenHeaders.headers,
  396. [
  397. GRPCHTTP2Keys.status.rawValue: "200",
  398. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  399. "some-custom-header": "some-custom-value",
  400. ]
  401. )
  402. // Receive client's message
  403. var buffer = ByteBuffer()
  404. buffer.writeInteger(UInt8(0)) // not compressed
  405. buffer.writeInteger(UInt32(42)) // message length
  406. buffer.writeRepeatingByte(0, count: 42) // message
  407. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  408. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  409. // Make sure we haven't sent back an error response, and that we read the message properly
  410. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  411. XCTAssertEqual(
  412. try channel.readInbound(as: RPCRequestPart.self),
  413. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  414. )
  415. // Write back response
  416. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  417. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  418. // Make sure we wrote back the right message
  419. let writtenMessage = try channel.assertReadDataOutbound()
  420. var expectedBuffer = ByteBuffer()
  421. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  422. expectedBuffer.writeInteger(UInt32(42)) // message length
  423. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  424. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  425. // Send back status to end RPC
  426. let trailers = RPCResponsePart.status(
  427. .init(code: .dataLoss, message: "Test data loss"),
  428. ["custom-header": "custom-value"]
  429. )
  430. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  431. // Make sure we wrote back the status and trailers
  432. let writtenStatus = try channel.assertReadHeadersOutbound()
  433. XCTAssertTrue(writtenStatus.endStream)
  434. XCTAssertEqual(
  435. writtenStatus.headers,
  436. [
  437. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  438. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  439. "custom-header": "custom-value",
  440. ]
  441. )
  442. // Try writing and assert it throws to make sure we don't allow writes
  443. // after closing.
  444. XCTAssertThrowsError(
  445. ofType: RPCError.self,
  446. try channel.writeOutbound(trailers)
  447. ) { error in
  448. XCTAssertEqual(error.code, .internalError)
  449. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  450. }
  451. }
  452. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  453. let handler = GRPCServerStreamHandler(
  454. scheme: .http,
  455. acceptedEncodings: [],
  456. maximumPayloadSize: 100
  457. )
  458. let channel = EmbeddedChannel(handler: handler)
  459. // Receive client's initial metadata
  460. let clientInitialMetadata: HPACKHeaders = [
  461. GRPCHTTP2Keys.path.rawValue: "test/test",
  462. GRPCHTTP2Keys.scheme.rawValue: "http",
  463. GRPCHTTP2Keys.method.rawValue: "POST",
  464. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  465. GRPCHTTP2Keys.te.rawValue: "trailers",
  466. ]
  467. XCTAssertNoThrow(
  468. try channel.writeInbound(
  469. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  470. )
  471. )
  472. // Make sure we haven't sent back an error response, and that we read the initial metadata
  473. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  474. XCTAssertEqual(
  475. try channel.readInbound(as: RPCRequestPart.self),
  476. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  477. )
  478. // Write back server's initial metadata
  479. let headers: HPACKHeaders = [
  480. "some-custom-header": "some-custom-value"
  481. ]
  482. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  483. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  484. // Make sure we wrote back the initial metadata
  485. let writtenHeaders = try channel.assertReadHeadersOutbound()
  486. XCTAssertEqual(
  487. writtenHeaders.headers,
  488. [
  489. GRPCHTTP2Keys.status.rawValue: "200",
  490. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  491. "some-custom-header": "some-custom-value",
  492. ]
  493. )
  494. // Receive client's first message
  495. var buffer = ByteBuffer()
  496. buffer.writeInteger(UInt8(0)) // not compressed
  497. XCTAssertNoThrow(
  498. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  499. )
  500. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  501. buffer.clear()
  502. buffer.writeInteger(UInt32(30)) // message length
  503. XCTAssertNoThrow(
  504. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  505. )
  506. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  507. buffer.clear()
  508. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  509. XCTAssertNoThrow(
  510. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  511. )
  512. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  513. buffer.clear()
  514. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  515. XCTAssertNoThrow(
  516. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  517. )
  518. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  519. buffer.clear()
  520. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  521. XCTAssertNoThrow(
  522. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  523. )
  524. // Make sure we haven't sent back an error response, and that we read the message properly
  525. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  526. XCTAssertEqual(
  527. try channel.readInbound(as: RPCRequestPart.self),
  528. RPCRequestPart.message(
  529. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  530. + [UInt8](repeating: 2, count: 10)
  531. )
  532. )
  533. }
  534. func testSendMultipleMessagesInSingleBuffer() throws {
  535. let handler = GRPCServerStreamHandler(
  536. scheme: .http,
  537. acceptedEncodings: [],
  538. maximumPayloadSize: 100
  539. )
  540. let channel = EmbeddedChannel(handler: handler)
  541. // Receive client's initial metadata
  542. let clientInitialMetadata: HPACKHeaders = [
  543. GRPCHTTP2Keys.path.rawValue: "test/test",
  544. GRPCHTTP2Keys.scheme.rawValue: "http",
  545. GRPCHTTP2Keys.method.rawValue: "POST",
  546. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  547. GRPCHTTP2Keys.te.rawValue: "trailers",
  548. ]
  549. XCTAssertNoThrow(
  550. try channel.writeInbound(
  551. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  552. )
  553. )
  554. // Make sure we haven't sent back an error response, and that we read the initial metadata
  555. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  556. XCTAssertEqual(
  557. try channel.readInbound(as: RPCRequestPart.self),
  558. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  559. )
  560. // Write back server's initial metadata
  561. let headers: HPACKHeaders = [
  562. "some-custom-header": "some-custom-value"
  563. ]
  564. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  565. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  566. // Read out the metadata
  567. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  568. // This is where this test actually begins. We want to write two messages
  569. // without flushing, and make sure that no messages are sent down the pipeline
  570. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  571. // Write back first message and make sure nothing's written in the channel.
  572. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  573. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  574. // Write back second message and make sure nothing's written in the channel.
  575. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  576. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  577. // Now flush and check we *do* write the data.
  578. channel.flush()
  579. let writtenMessage = try channel.assertReadDataOutbound()
  580. // Make sure both messages have been framed together in the ByteBuffer.
  581. XCTAssertEqual(
  582. writtenMessage.data,
  583. .byteBuffer(
  584. .init(bytes: [
  585. // First message
  586. 0, // Compression disabled
  587. 0, 0, 0, 4, // Message length
  588. 1, 1, 1, 1, // First message data
  589. // Second message
  590. 0, // Compression disabled
  591. 0, 0, 0, 4, // Message length
  592. 2, 2, 2, 2, // Second message data
  593. ])
  594. )
  595. )
  596. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  597. }
  598. func testMessageAndStatusAreNotReordered() throws {
  599. let handler = GRPCServerStreamHandler(
  600. scheme: .http,
  601. acceptedEncodings: [],
  602. maximumPayloadSize: 100
  603. )
  604. let channel = EmbeddedChannel(handler: handler)
  605. // Receive client's initial metadata
  606. let clientInitialMetadata: HPACKHeaders = [
  607. GRPCHTTP2Keys.path.rawValue: "test/test",
  608. GRPCHTTP2Keys.scheme.rawValue: "http",
  609. GRPCHTTP2Keys.method.rawValue: "POST",
  610. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  611. GRPCHTTP2Keys.te.rawValue: "trailers",
  612. ]
  613. XCTAssertNoThrow(
  614. try channel.writeInbound(
  615. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  616. )
  617. )
  618. // Make sure we haven't sent back an error response, and that we read the initial metadata
  619. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  620. XCTAssertEqual(
  621. try channel.readInbound(as: RPCRequestPart.self),
  622. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  623. )
  624. // Write back server's initial metadata
  625. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  626. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  627. // Read out the metadata
  628. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  629. // This is where this test actually begins. We want to write a message followed
  630. // by status and trailers, and only flush after both writes.
  631. // Because messages are buffered and potentially bundled together in a single
  632. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  633. // and trailers won't be written before the messages.
  634. // Write back message and make sure nothing's written in the channel.
  635. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  636. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  637. // Write status + metadata and make sure nothing's written.
  638. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  639. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  640. // Now flush and check we *do* write the data in the right order: message first,
  641. // trailers second.
  642. channel.flush()
  643. let writtenMessage = try channel.assertReadDataOutbound()
  644. // Make sure we first get message.
  645. XCTAssertEqual(
  646. writtenMessage.data,
  647. .byteBuffer(
  648. .init(bytes: [
  649. // First message
  650. 0, // Compression disabled
  651. 0, 0, 0, 4, // Message length
  652. 1, 1, 1, 1, // First message data
  653. ])
  654. )
  655. )
  656. XCTAssertFalse(writtenMessage.endStream)
  657. // Make sure we get trailers.
  658. let writtenTrailers = try channel.assertReadHeadersOutbound()
  659. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  660. XCTAssertTrue(writtenTrailers.endStream)
  661. // Make sure we get nothing else.
  662. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  663. }
  664. }
  665. extension EmbeddedChannel {
  666. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  667. guard
  668. case .headers(let writtenHeaders) = try XCTUnwrap(
  669. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  670. )
  671. else {
  672. throw TestError.assertionFailure("Expected to write headers")
  673. }
  674. return writtenHeaders
  675. }
  676. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  677. guard
  678. case .data(let writtenMessage) = try XCTUnwrap(
  679. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  680. )
  681. else {
  682. throw TestError.assertionFailure("Expected to write data")
  683. }
  684. return writtenMessage
  685. }
  686. }
  687. private enum TestError: Error {
  688. case assertionFailure(String)
  689. }