GRPCClientStreamHandlerTests.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCHTTP2Core
  24. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  25. final class GRPCClientStreamHandlerTests: XCTestCase {
  26. func testH2FramesAreIgnored() throws {
  27. let handler = GRPCClientStreamHandler(
  28. methodDescriptor: .init(service: "test", method: "test"),
  29. scheme: .http,
  30. outboundEncoding: .identity,
  31. acceptedEncodings: [],
  32. maximumPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  39. // initialiser is internal, so I can't create one of these frames.
  40. .rstStream(.cancel),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  53. let handler = GRPCClientStreamHandler(
  54. methodDescriptor: .init(service: "test", method: "test"),
  55. scheme: .http,
  56. outboundEncoding: .identity,
  57. acceptedEncodings: [],
  58. maximumPayloadSize: 1,
  59. skipStateMachineAssertions: true
  60. )
  61. let channel = EmbeddedChannel(handler: handler)
  62. // Send client's initial metadata
  63. let request = RPCRequestPart.metadata([:])
  64. XCTAssertNoThrow(try channel.writeOutbound(request))
  65. // Receive server's initial metadata without :status
  66. let serverInitialMetadata: HPACKHeaders = [
  67. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  68. ]
  69. XCTAssertNoThrow(
  70. try channel.writeInbound(
  71. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  72. )
  73. )
  74. XCTAssertEqual(
  75. try channel.readInbound(as: RPCResponsePart.self),
  76. .status(
  77. .init(code: .unknown, message: "HTTP Status Code is missing."),
  78. Metadata(headers: serverInitialMetadata)
  79. )
  80. )
  81. }
  82. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  83. let handler = GRPCClientStreamHandler(
  84. methodDescriptor: .init(service: "test", method: "test"),
  85. scheme: .http,
  86. outboundEncoding: .identity,
  87. acceptedEncodings: [],
  88. maximumPayloadSize: 1,
  89. skipStateMachineAssertions: true
  90. )
  91. let channel = EmbeddedChannel(handler: handler)
  92. // Send client's initial metadata
  93. let request = RPCRequestPart.metadata([:])
  94. XCTAssertNoThrow(try channel.writeOutbound(request))
  95. // Receive server's initial metadata with 1xx status
  96. let serverInitialMetadata: HPACKHeaders = [
  97. GRPCHTTP2Keys.status.rawValue: "104",
  98. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  99. ]
  100. XCTAssertNoThrow(
  101. try channel.writeInbound(
  102. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  103. )
  104. )
  105. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  106. }
  107. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  108. let handler = GRPCClientStreamHandler(
  109. methodDescriptor: .init(service: "test", method: "test"),
  110. scheme: .http,
  111. outboundEncoding: .identity,
  112. acceptedEncodings: [],
  113. maximumPayloadSize: 1,
  114. skipStateMachineAssertions: true
  115. )
  116. let channel = EmbeddedChannel(handler: handler)
  117. // Send client's initial metadata
  118. let request = RPCRequestPart.metadata([:])
  119. XCTAssertNoThrow(try channel.writeOutbound(request))
  120. // Receive server's initial metadata with non-200 and non-1xx :status
  121. let serverInitialMetadata: HPACKHeaders = [
  122. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  123. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  124. ]
  125. XCTAssertNoThrow(
  126. try channel.writeInbound(
  127. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  128. )
  129. )
  130. XCTAssertEqual(
  131. try channel.readInbound(as: RPCResponsePart.self),
  132. .status(
  133. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  134. Metadata(headers: serverInitialMetadata)
  135. )
  136. )
  137. }
  138. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  139. let handler = GRPCClientStreamHandler(
  140. methodDescriptor: .init(service: "test", method: "test"),
  141. scheme: .http,
  142. outboundEncoding: .identity,
  143. acceptedEncodings: [],
  144. maximumPayloadSize: 1,
  145. skipStateMachineAssertions: true
  146. )
  147. let channel = EmbeddedChannel(handler: handler)
  148. // Send client's initial metadata
  149. let request = RPCRequestPart.metadata([:])
  150. XCTAssertNoThrow(try channel.writeOutbound(request))
  151. // Receive server's initial metadata without content-type
  152. let serverInitialMetadata: HPACKHeaders = [
  153. GRPCHTTP2Keys.status.rawValue: "200"
  154. ]
  155. XCTAssertNoThrow(
  156. try channel.writeInbound(
  157. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  158. )
  159. )
  160. XCTAssertEqual(
  161. try channel.readInbound(as: RPCResponsePart.self),
  162. .status(
  163. .init(code: .internalError, message: "Missing content-type header"),
  164. Metadata(headers: serverInitialMetadata)
  165. )
  166. )
  167. }
  168. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  169. let handler = GRPCClientStreamHandler(
  170. methodDescriptor: .init(service: "test", method: "test"),
  171. scheme: .http,
  172. outboundEncoding: .deflate,
  173. acceptedEncodings: [.deflate],
  174. maximumPayloadSize: 1
  175. )
  176. let channel = EmbeddedChannel(handler: handler)
  177. // Send client's initial metadata
  178. XCTAssertNoThrow(
  179. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  180. )
  181. // Make sure we have sent right metadata.
  182. let writtenMetadata = try channel.assertReadHeadersOutbound()
  183. XCTAssertEqual(
  184. writtenMetadata.headers,
  185. [
  186. GRPCHTTP2Keys.method.rawValue: "POST",
  187. GRPCHTTP2Keys.scheme.rawValue: "http",
  188. GRPCHTTP2Keys.path.rawValue: "test/test",
  189. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  190. GRPCHTTP2Keys.te.rawValue: "trailers",
  191. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  192. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  193. ]
  194. )
  195. // Server sends initial metadata with unsupported encoding
  196. let serverInitialMetadata: HPACKHeaders = [
  197. GRPCHTTP2Keys.status.rawValue: "200",
  198. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  199. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  200. ]
  201. XCTAssertNoThrow(
  202. try channel.writeInbound(
  203. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  204. )
  205. )
  206. XCTAssertEqual(
  207. try channel.readInbound(as: RPCResponsePart.self),
  208. .status(
  209. .init(
  210. code: .internalError,
  211. message:
  212. "The server picked a compression algorithm ('gzip') the client does not know about."
  213. ),
  214. Metadata(headers: serverInitialMetadata)
  215. )
  216. )
  217. }
  218. func testOverMaximumPayloadSize() throws {
  219. let handler = GRPCClientStreamHandler(
  220. methodDescriptor: .init(service: "test", method: "test"),
  221. scheme: .http,
  222. outboundEncoding: .identity,
  223. acceptedEncodings: [],
  224. maximumPayloadSize: 1,
  225. skipStateMachineAssertions: true
  226. )
  227. let channel = EmbeddedChannel(handler: handler)
  228. // Send client's initial metadata
  229. XCTAssertNoThrow(
  230. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  231. )
  232. // Make sure we have sent right metadata.
  233. let writtenMetadata = try channel.assertReadHeadersOutbound()
  234. XCTAssertEqual(
  235. writtenMetadata.headers,
  236. [
  237. GRPCHTTP2Keys.method.rawValue: "POST",
  238. GRPCHTTP2Keys.scheme.rawValue: "http",
  239. GRPCHTTP2Keys.path.rawValue: "test/test",
  240. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  241. GRPCHTTP2Keys.te.rawValue: "trailers",
  242. ]
  243. )
  244. // Server sends initial metadata
  245. let serverInitialMetadata: HPACKHeaders = [
  246. GRPCHTTP2Keys.status.rawValue: "200",
  247. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  248. ]
  249. XCTAssertNoThrow(
  250. try channel.writeInbound(
  251. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  252. )
  253. )
  254. XCTAssertEqual(
  255. try channel.readInbound(as: RPCResponsePart.self),
  256. .metadata(Metadata(headers: serverInitialMetadata))
  257. )
  258. // Server sends message over payload limit
  259. var buffer = ByteBuffer()
  260. buffer.writeInteger(UInt8(0)) // not compressed
  261. buffer.writeInteger(UInt32(42)) // message length
  262. buffer.writeRepeatingByte(0, count: 42) // message
  263. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  264. XCTAssertThrowsError(
  265. ofType: RPCError.self,
  266. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  267. ) { error in
  268. XCTAssertEqual(error.code, .resourceExhausted)
  269. XCTAssertEqual(
  270. error.message,
  271. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  272. )
  273. }
  274. // Make sure we didn't read the received message
  275. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  276. }
  277. func testServerEndsStream() throws {
  278. let handler = GRPCClientStreamHandler(
  279. methodDescriptor: .init(service: "test", method: "test"),
  280. scheme: .http,
  281. outboundEncoding: .identity,
  282. acceptedEncodings: [],
  283. maximumPayloadSize: 1,
  284. skipStateMachineAssertions: true
  285. )
  286. let channel = EmbeddedChannel(handler: handler)
  287. // Write client's initial metadata
  288. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  289. let clientInitialMetadata: HPACKHeaders = [
  290. GRPCHTTP2Keys.path.rawValue: "test/test",
  291. GRPCHTTP2Keys.scheme.rawValue: "http",
  292. GRPCHTTP2Keys.method.rawValue: "POST",
  293. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  294. GRPCHTTP2Keys.te.rawValue: "trailers",
  295. ]
  296. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  297. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  298. // Receive server's initial metadata with end stream set
  299. let serverInitialMetadata: HPACKHeaders = [
  300. GRPCHTTP2Keys.status.rawValue: "200",
  301. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  302. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  303. ]
  304. XCTAssertNoThrow(
  305. try channel.writeInbound(
  306. HTTP2Frame.FramePayload.headers(
  307. .init(
  308. headers: serverInitialMetadata,
  309. endStream: true
  310. )
  311. )
  312. )
  313. )
  314. XCTAssertEqual(
  315. try channel.readInbound(as: RPCResponsePart.self),
  316. .status(
  317. .init(code: .ok, message: ""),
  318. [
  319. GRPCHTTP2Keys.status.rawValue: "200",
  320. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  321. ]
  322. )
  323. )
  324. // We should throw if the server sends another message, since it's closed the stream already.
  325. var buffer = ByteBuffer()
  326. buffer.writeInteger(UInt8(0)) // not compressed
  327. buffer.writeInteger(UInt32(42)) // message length
  328. buffer.writeRepeatingByte(0, count: 42) // message
  329. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  330. XCTAssertThrowsError(
  331. ofType: RPCError.self,
  332. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  333. ) { error in
  334. XCTAssertEqual(error.code, .internalError)
  335. XCTAssertEqual(error.message, "Cannot have received anything from a closed server.")
  336. }
  337. }
  338. func testNormalFlow() throws {
  339. let handler = GRPCClientStreamHandler(
  340. methodDescriptor: .init(service: "test", method: "test"),
  341. scheme: .http,
  342. outboundEncoding: .identity,
  343. acceptedEncodings: [],
  344. maximumPayloadSize: 100,
  345. skipStateMachineAssertions: true
  346. )
  347. let channel = EmbeddedChannel(handler: handler)
  348. // Send client's initial metadata
  349. let request = RPCRequestPart.metadata([:])
  350. XCTAssertNoThrow(try channel.writeOutbound(request))
  351. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  352. let writtenHeaders = try channel.assertReadHeadersOutbound()
  353. XCTAssertEqual(
  354. writtenHeaders.headers,
  355. [
  356. GRPCHTTP2Keys.method.rawValue: "POST",
  357. GRPCHTTP2Keys.scheme.rawValue: "http",
  358. GRPCHTTP2Keys.path.rawValue: "test/test",
  359. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  360. GRPCHTTP2Keys.te.rawValue: "trailers",
  361. ]
  362. )
  363. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  364. // Receive server's initial metadata
  365. let serverInitialMetadata: HPACKHeaders = [
  366. GRPCHTTP2Keys.status.rawValue: "200",
  367. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  368. "some-custom-header": "some-custom-value",
  369. ]
  370. XCTAssertNoThrow(
  371. try channel.writeInbound(
  372. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  373. )
  374. )
  375. XCTAssertEqual(
  376. try channel.readInbound(as: RPCResponsePart.self),
  377. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  378. )
  379. // Send a message
  380. XCTAssertNoThrow(
  381. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  382. )
  383. // Assert we wrote it successfully into the channel
  384. let writtenMessage = try channel.assertReadDataOutbound()
  385. var expectedBuffer = ByteBuffer()
  386. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  387. expectedBuffer.writeInteger(UInt32(42)) // message length
  388. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  389. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  390. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  391. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  392. // Flush to make sure the EOS is written.
  393. channel.flush()
  394. // Make sure the EOS frame was sent
  395. let emptyEOSFrame = try channel.assertReadDataOutbound()
  396. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  397. XCTAssertTrue(emptyEOSFrame.endStream)
  398. // Make sure we cannot write anymore because client's closed.
  399. XCTAssertThrowsError(
  400. ofType: RPCError.self,
  401. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  402. ) { error in
  403. XCTAssertEqual(error.code, .internalError)
  404. XCTAssertEqual(error.message, "Client is closed, cannot send a message.")
  405. }
  406. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  407. // it will be thrown when writing inbound.
  408. try? channel.throwIfErrorCaught()
  409. // Server sends back response message
  410. var buffer = ByteBuffer()
  411. buffer.writeInteger(UInt8(0)) // not compressed
  412. buffer.writeInteger(UInt32(42)) // message length
  413. buffer.writeRepeatingByte(0, count: 42) // message
  414. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  415. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  416. // Make sure we read the message properly
  417. XCTAssertEqual(
  418. try channel.readInbound(as: RPCResponsePart.self),
  419. RPCResponsePart.message([UInt8](repeating: 0, count: 42))
  420. )
  421. // Server sends status to end RPC
  422. XCTAssertNoThrow(
  423. try channel.writeInbound(
  424. HTTP2Frame.FramePayload.headers(
  425. .init(headers: [
  426. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  427. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  428. "custom-header": "custom-value",
  429. ])
  430. )
  431. )
  432. )
  433. XCTAssertEqual(
  434. try channel.readInbound(as: RPCResponsePart.self),
  435. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  436. )
  437. }
  438. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  439. let handler = GRPCClientStreamHandler(
  440. methodDescriptor: .init(service: "test", method: "test"),
  441. scheme: .http,
  442. outboundEncoding: .identity,
  443. acceptedEncodings: [],
  444. maximumPayloadSize: 100,
  445. skipStateMachineAssertions: true
  446. )
  447. let channel = EmbeddedChannel(handler: handler)
  448. // Send client's initial metadata
  449. let request = RPCRequestPart.metadata([:])
  450. XCTAssertNoThrow(try channel.writeOutbound(request))
  451. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  452. let writtenHeaders = try channel.assertReadHeadersOutbound()
  453. XCTAssertEqual(
  454. writtenHeaders.headers,
  455. [
  456. GRPCHTTP2Keys.method.rawValue: "POST",
  457. GRPCHTTP2Keys.scheme.rawValue: "http",
  458. GRPCHTTP2Keys.path.rawValue: "test/test",
  459. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  460. GRPCHTTP2Keys.te.rawValue: "trailers",
  461. ]
  462. )
  463. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  464. // Receive server's initial metadata
  465. let serverInitialMetadata: HPACKHeaders = [
  466. GRPCHTTP2Keys.status.rawValue: "200",
  467. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  468. "some-custom-header": "some-custom-value",
  469. ]
  470. XCTAssertNoThrow(
  471. try channel.writeInbound(
  472. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  473. )
  474. )
  475. XCTAssertEqual(
  476. try channel.readInbound(as: RPCResponsePart.self),
  477. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  478. )
  479. // Send a message
  480. XCTAssertNoThrow(
  481. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  482. )
  483. // Assert we wrote it successfully into the channel
  484. let writtenMessage = try channel.assertReadDataOutbound()
  485. var expectedBuffer = ByteBuffer()
  486. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  487. expectedBuffer.writeInteger(UInt32(42)) // message length
  488. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  489. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  490. // Receive server's first message
  491. var buffer = ByteBuffer()
  492. buffer.writeInteger(UInt8(0)) // not compressed
  493. XCTAssertNoThrow(
  494. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  495. )
  496. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  497. buffer.clear()
  498. buffer.writeInteger(UInt32(30)) // message length
  499. XCTAssertNoThrow(
  500. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  501. )
  502. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  503. buffer.clear()
  504. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  505. XCTAssertNoThrow(
  506. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  507. )
  508. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  509. buffer.clear()
  510. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  511. XCTAssertNoThrow(
  512. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  513. )
  514. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  515. buffer.clear()
  516. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  517. XCTAssertNoThrow(
  518. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  519. )
  520. // Make sure we read the message properly
  521. XCTAssertEqual(
  522. try channel.readInbound(as: RPCResponsePart.self),
  523. RPCResponsePart.message(
  524. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  525. + [UInt8](repeating: 2, count: 10)
  526. )
  527. )
  528. }
  529. func testSendMultipleMessagesInSingleBuffer() throws {
  530. let handler = GRPCClientStreamHandler(
  531. methodDescriptor: .init(service: "test", method: "test"),
  532. scheme: .http,
  533. outboundEncoding: .identity,
  534. acceptedEncodings: [],
  535. maximumPayloadSize: 100,
  536. skipStateMachineAssertions: true
  537. )
  538. let channel = EmbeddedChannel(handler: handler)
  539. // Send client's initial metadata
  540. let request = RPCRequestPart.metadata([:])
  541. XCTAssertNoThrow(try channel.writeOutbound(request))
  542. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  543. let writtenHeaders = try channel.assertReadHeadersOutbound()
  544. XCTAssertEqual(
  545. writtenHeaders.headers,
  546. [
  547. GRPCHTTP2Keys.method.rawValue: "POST",
  548. GRPCHTTP2Keys.scheme.rawValue: "http",
  549. GRPCHTTP2Keys.path.rawValue: "test/test",
  550. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  551. GRPCHTTP2Keys.te.rawValue: "trailers",
  552. ]
  553. )
  554. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  555. // Receive server's initial metadata
  556. let serverInitialMetadata: HPACKHeaders = [
  557. GRPCHTTP2Keys.status.rawValue: "200",
  558. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  559. "some-custom-header": "some-custom-value",
  560. ]
  561. XCTAssertNoThrow(
  562. try channel.writeInbound(
  563. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  564. )
  565. )
  566. XCTAssertEqual(
  567. try channel.readInbound(as: RPCResponsePart.self),
  568. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  569. )
  570. // This is where this test actually begins. We want to write two messages
  571. // without flushing, and make sure that no messages are sent down the pipeline
  572. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  573. // Write back first message and make sure nothing's written in the channel.
  574. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4))))
  575. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  576. // Write back second message and make sure nothing's written in the channel.
  577. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4))))
  578. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  579. // Now flush and check we *do* write the data.
  580. channel.flush()
  581. let writtenMessage = try channel.assertReadDataOutbound()
  582. // Make sure both messages have been framed together in the ByteBuffer.
  583. XCTAssertEqual(
  584. writtenMessage.data,
  585. .byteBuffer(
  586. .init(bytes: [
  587. // First message
  588. 0, // Compression disabled
  589. 0, 0, 0, 4, // Message length
  590. 1, 1, 1, 1, // First message data
  591. // Second message
  592. 0, // Compression disabled
  593. 0, 0, 0, 4, // Message length
  594. 2, 2, 2, 2, // Second message data
  595. ])
  596. )
  597. )
  598. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  599. }
  600. }
  601. extension EmbeddedChannel {
  602. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  603. guard
  604. case .headers(let writtenHeaders) = try XCTUnwrap(
  605. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  606. )
  607. else {
  608. throw TestError.assertionFailure("Expected to write headers")
  609. }
  610. return writtenHeaders
  611. }
  612. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  613. guard
  614. case .data(let writtenMessage) = try XCTUnwrap(
  615. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  616. )
  617. else {
  618. throw TestError.assertionFailure("Expected to write data")
  619. }
  620. return writtenMessage
  621. }
  622. }
  623. private enum TestError: Error {
  624. case assertionFailure(String)
  625. }