GRPCClientStreamHandlerTests.swift 27 KB


  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCHTTP2Core
  24. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  25. final class GRPCClientStreamHandlerTests: XCTestCase {
  26. func testH2FramesAreIgnored() throws {
  27. let handler = GRPCClientStreamHandler(
  28. methodDescriptor: .init(service: "test", method: "test"),
  29. scheme: .http,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maximumPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  39. // initialiser is internal, so I can't create one of these frames.
  40. .rstStream(.cancel),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  53. let handler = GRPCClientStreamHandler(
  54. methodDescriptor: .init(service: "test", method: "test"),
  55. scheme: .http,
  56. outboundEncoding: .none,
  57. acceptedEncodings: [],
  58. maximumPayloadSize: 1,
  59. skipStateMachineAssertions: true
  60. )
  61. let channel = EmbeddedChannel(handler: handler)
  62. // Send client's initial metadata
  63. let request = RPCRequestPart.metadata([:])
  64. XCTAssertNoThrow(try channel.writeOutbound(request))
  65. // Receive server's initial metadata without :status
  66. let serverInitialMetadata: HPACKHeaders = [
  67. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  68. ]
  69. XCTAssertNoThrow(
  70. try channel.writeInbound(
  71. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  72. )
  73. )
  74. XCTAssertEqual(
  75. try channel.readInbound(as: RPCResponsePart.self),
  76. .status(
  77. .init(code: .unknown, message: "HTTP Status Code is missing."),
  78. Metadata(headers: serverInitialMetadata)
  79. )
  80. )
  81. }
  82. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  83. let handler = GRPCClientStreamHandler(
  84. methodDescriptor: .init(service: "test", method: "test"),
  85. scheme: .http,
  86. outboundEncoding: .none,
  87. acceptedEncodings: [],
  88. maximumPayloadSize: 1,
  89. skipStateMachineAssertions: true
  90. )
  91. let channel = EmbeddedChannel(handler: handler)
  92. // Send client's initial metadata
  93. let request = RPCRequestPart.metadata([:])
  94. XCTAssertNoThrow(try channel.writeOutbound(request))
  95. // Receive server's initial metadata with 1xx status
  96. let serverInitialMetadata: HPACKHeaders = [
  97. GRPCHTTP2Keys.status.rawValue: "104",
  98. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  99. ]
  100. XCTAssertNoThrow(
  101. try channel.writeInbound(
  102. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  103. )
  104. )
  105. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  106. }
  107. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  108. let handler = GRPCClientStreamHandler(
  109. methodDescriptor: .init(service: "test", method: "test"),
  110. scheme: .http,
  111. outboundEncoding: .none,
  112. acceptedEncodings: [],
  113. maximumPayloadSize: 1,
  114. skipStateMachineAssertions: true
  115. )
  116. let channel = EmbeddedChannel(handler: handler)
  117. // Send client's initial metadata
  118. let request = RPCRequestPart.metadata([:])
  119. XCTAssertNoThrow(try channel.writeOutbound(request))
  120. // Receive server's initial metadata with non-200 and non-1xx :status
  121. let serverInitialMetadata: HPACKHeaders = [
  122. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  123. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  124. ]
  125. XCTAssertNoThrow(
  126. try channel.writeInbound(
  127. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  128. )
  129. )
  130. XCTAssertEqual(
  131. try channel.readInbound(as: RPCResponsePart.self),
  132. .status(
  133. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  134. Metadata(headers: serverInitialMetadata)
  135. )
  136. )
  137. }
  138. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  139. let handler = GRPCClientStreamHandler(
  140. methodDescriptor: .init(service: "test", method: "test"),
  141. scheme: .http,
  142. outboundEncoding: .none,
  143. acceptedEncodings: [],
  144. maximumPayloadSize: 1,
  145. skipStateMachineAssertions: true
  146. )
  147. let channel = EmbeddedChannel(handler: handler)
  148. // Send client's initial metadata
  149. let request = RPCRequestPart.metadata([:])
  150. XCTAssertNoThrow(try channel.writeOutbound(request))
  151. // Receive server's initial metadata without content-type
  152. let serverInitialMetadata: HPACKHeaders = [
  153. GRPCHTTP2Keys.status.rawValue: "200"
  154. ]
  155. XCTAssertNoThrow(
  156. try channel.writeInbound(
  157. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  158. )
  159. )
  160. XCTAssertEqual(
  161. try channel.readInbound(as: RPCResponsePart.self),
  162. .status(
  163. .init(code: .internalError, message: "Missing content-type header"),
  164. Metadata(headers: serverInitialMetadata)
  165. )
  166. )
  167. }
  168. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  169. let handler = GRPCClientStreamHandler(
  170. methodDescriptor: .init(service: "test", method: "test"),
  171. scheme: .http,
  172. outboundEncoding: .deflate,
  173. acceptedEncodings: [.deflate],
  174. maximumPayloadSize: 1
  175. )
  176. let channel = EmbeddedChannel(handler: handler)
  177. // Send client's initial metadata
  178. XCTAssertNoThrow(
  179. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  180. )
  181. // Make sure we have sent right metadata.
  182. let writtenMetadata = try channel.assertReadHeadersOutbound()
  183. XCTAssertEqual(
  184. writtenMetadata.headers,
  185. [
  186. GRPCHTTP2Keys.method.rawValue: "POST",
  187. GRPCHTTP2Keys.scheme.rawValue: "http",
  188. GRPCHTTP2Keys.path.rawValue: "test/test",
  189. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  190. GRPCHTTP2Keys.te.rawValue: "trailers",
  191. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  192. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  193. ]
  194. )
  195. // Server sends initial metadata with unsupported encoding
  196. let serverInitialMetadata: HPACKHeaders = [
  197. GRPCHTTP2Keys.status.rawValue: "200",
  198. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  199. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  200. ]
  201. XCTAssertNoThrow(
  202. try channel.writeInbound(
  203. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  204. )
  205. )
  206. XCTAssertEqual(
  207. try channel.readInbound(as: RPCResponsePart.self),
  208. .status(
  209. .init(
  210. code: .internalError,
  211. message:
  212. "The server picked a compression algorithm ('gzip') the client does not know about."
  213. ),
  214. Metadata(headers: serverInitialMetadata)
  215. )
  216. )
  217. }
  218. func testOverMaximumPayloadSize() throws {
  219. let handler = GRPCClientStreamHandler(
  220. methodDescriptor: .init(service: "test", method: "test"),
  221. scheme: .http,
  222. outboundEncoding: .none,
  223. acceptedEncodings: [],
  224. maximumPayloadSize: 1,
  225. skipStateMachineAssertions: true
  226. )
  227. let channel = EmbeddedChannel(handler: handler)
  228. // Send client's initial metadata
  229. XCTAssertNoThrow(
  230. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  231. )
  232. // Make sure we have sent right metadata.
  233. let writtenMetadata = try channel.assertReadHeadersOutbound()
  234. XCTAssertEqual(
  235. writtenMetadata.headers,
  236. [
  237. GRPCHTTP2Keys.method.rawValue: "POST",
  238. GRPCHTTP2Keys.scheme.rawValue: "http",
  239. GRPCHTTP2Keys.path.rawValue: "test/test",
  240. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  241. GRPCHTTP2Keys.te.rawValue: "trailers",
  242. ]
  243. )
  244. // Server sends initial metadata
  245. let serverInitialMetadata: HPACKHeaders = [
  246. GRPCHTTP2Keys.status.rawValue: "200",
  247. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  248. ]
  249. XCTAssertNoThrow(
  250. try channel.writeInbound(
  251. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  252. )
  253. )
  254. XCTAssertEqual(
  255. try channel.readInbound(as: RPCResponsePart.self),
  256. .metadata(Metadata(headers: serverInitialMetadata))
  257. )
  258. // Server sends message over payload limit
  259. var buffer = ByteBuffer()
  260. buffer.writeInteger(UInt8(0)) // not compressed
  261. buffer.writeInteger(UInt32(42)) // message length
  262. buffer.writeRepeatingByte(0, count: 42) // message
  263. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  264. data: .byteBuffer(buffer),
  265. endStream: false
  266. )
  267. XCTAssertThrowsError(
  268. ofType: RPCError.self,
  269. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  270. ) { error in
  271. XCTAssertEqual(error.code, .resourceExhausted)
  272. XCTAssertEqual(
  273. error.message,
  274. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  275. )
  276. }
  277. // Make sure we didn't read the received message
  278. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  279. }
  280. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  281. let handler = GRPCClientStreamHandler(
  282. methodDescriptor: .init(service: "test", method: "test"),
  283. scheme: .http,
  284. outboundEncoding: .none,
  285. acceptedEncodings: [],
  286. maximumPayloadSize: 100,
  287. skipStateMachineAssertions: true
  288. )
  289. let channel = EmbeddedChannel(handler: handler)
  290. // Send client's initial metadata
  291. XCTAssertNoThrow(
  292. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  293. )
  294. // Make sure we have sent right metadata.
  295. let writtenMetadata = try channel.assertReadHeadersOutbound()
  296. XCTAssertEqual(
  297. writtenMetadata.headers,
  298. [
  299. GRPCHTTP2Keys.method.rawValue: "POST",
  300. GRPCHTTP2Keys.scheme.rawValue: "http",
  301. GRPCHTTP2Keys.path.rawValue: "test/test",
  302. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  303. GRPCHTTP2Keys.te.rawValue: "trailers",
  304. ]
  305. )
  306. // Server sends initial metadata
  307. let serverInitialMetadata: HPACKHeaders = [
  308. GRPCHTTP2Keys.status.rawValue: "200",
  309. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  310. ]
  311. XCTAssertNoThrow(
  312. try channel.writeInbound(
  313. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  314. )
  315. )
  316. XCTAssertEqual(
  317. try channel.readInbound(as: RPCResponsePart.self),
  318. .metadata(Metadata(headers: serverInitialMetadata))
  319. )
  320. // Server sends message with EOS set.
  321. var buffer = ByteBuffer()
  322. buffer.writeInteger(UInt8(0)) // not compressed
  323. buffer.writeInteger(UInt32(42)) // message length
  324. buffer.writeRepeatingByte(0, count: 42) // message
  325. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  326. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  327. // Make sure we got status + trailers with the right error.
  328. XCTAssertEqual(
  329. try channel.readInbound(as: RPCResponsePart.self),
  330. .status(
  331. Status(
  332. code: .internalError,
  333. message:
  334. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  335. ),
  336. [:]
  337. )
  338. )
  339. }
  340. func testServerEndsStream() throws {
  341. let handler = GRPCClientStreamHandler(
  342. methodDescriptor: .init(service: "test", method: "test"),
  343. scheme: .http,
  344. outboundEncoding: .none,
  345. acceptedEncodings: [],
  346. maximumPayloadSize: 1,
  347. skipStateMachineAssertions: true
  348. )
  349. let channel = EmbeddedChannel(handler: handler)
  350. // Write client's initial metadata
  351. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  352. let clientInitialMetadata: HPACKHeaders = [
  353. GRPCHTTP2Keys.path.rawValue: "test/test",
  354. GRPCHTTP2Keys.scheme.rawValue: "http",
  355. GRPCHTTP2Keys.method.rawValue: "POST",
  356. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  357. GRPCHTTP2Keys.te.rawValue: "trailers",
  358. ]
  359. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  360. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  361. // Receive server's initial metadata with end stream set
  362. let serverInitialMetadata: HPACKHeaders = [
  363. GRPCHTTP2Keys.status.rawValue: "200",
  364. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  365. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  366. ]
  367. XCTAssertNoThrow(
  368. try channel.writeInbound(
  369. HTTP2Frame.FramePayload.headers(
  370. .init(
  371. headers: serverInitialMetadata,
  372. endStream: true
  373. )
  374. )
  375. )
  376. )
  377. XCTAssertEqual(
  378. try channel.readInbound(as: RPCResponsePart.self),
  379. .status(
  380. .init(code: .ok, message: ""),
  381. [
  382. GRPCHTTP2Keys.status.rawValue: "200",
  383. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  384. ]
  385. )
  386. )
  387. // We should throw if the server sends another message, since it's closed the stream already.
  388. var buffer = ByteBuffer()
  389. buffer.writeInteger(UInt8(0)) // not compressed
  390. buffer.writeInteger(UInt32(42)) // message length
  391. buffer.writeRepeatingByte(0, count: 42) // message
  392. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  393. XCTAssertThrowsError(
  394. ofType: RPCError.self,
  395. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  396. ) { error in
  397. XCTAssertEqual(error.code, .internalError)
  398. XCTAssertEqual(error.message, "Cannot have received anything from a closed server.")
  399. }
  400. }
  401. func testNormalFlow() throws {
  402. let handler = GRPCClientStreamHandler(
  403. methodDescriptor: .init(service: "test", method: "test"),
  404. scheme: .http,
  405. outboundEncoding: .none,
  406. acceptedEncodings: [],
  407. maximumPayloadSize: 100,
  408. skipStateMachineAssertions: true
  409. )
  410. let channel = EmbeddedChannel(handler: handler)
  411. // Send client's initial metadata
  412. let request = RPCRequestPart.metadata([:])
  413. XCTAssertNoThrow(try channel.writeOutbound(request))
  414. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  415. let writtenHeaders = try channel.assertReadHeadersOutbound()
  416. XCTAssertEqual(
  417. writtenHeaders.headers,
  418. [
  419. GRPCHTTP2Keys.method.rawValue: "POST",
  420. GRPCHTTP2Keys.scheme.rawValue: "http",
  421. GRPCHTTP2Keys.path.rawValue: "test/test",
  422. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  423. GRPCHTTP2Keys.te.rawValue: "trailers",
  424. ]
  425. )
  426. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  427. // Receive server's initial metadata
  428. let serverInitialMetadata: HPACKHeaders = [
  429. GRPCHTTP2Keys.status.rawValue: "200",
  430. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  431. "some-custom-header": "some-custom-value",
  432. ]
  433. XCTAssertNoThrow(
  434. try channel.writeInbound(
  435. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  436. )
  437. )
  438. XCTAssertEqual(
  439. try channel.readInbound(as: RPCResponsePart.self),
  440. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  441. )
  442. // Send a message
  443. XCTAssertNoThrow(
  444. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  445. )
  446. // Assert we wrote it successfully into the channel
  447. let writtenMessage = try channel.assertReadDataOutbound()
  448. var expectedBuffer = ByteBuffer()
  449. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  450. expectedBuffer.writeInteger(UInt32(42)) // message length
  451. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  452. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  453. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  454. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  455. // Flush to make sure the EOS is written.
  456. channel.flush()
  457. // Make sure the EOS frame was sent
  458. let emptyEOSFrame = try channel.assertReadDataOutbound()
  459. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  460. XCTAssertTrue(emptyEOSFrame.endStream)
  461. // Make sure we cannot write anymore because client's closed.
  462. XCTAssertThrowsError(
  463. ofType: RPCError.self,
  464. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  465. ) { error in
  466. XCTAssertEqual(error.code, .internalError)
  467. XCTAssertEqual(error.message, "Client is closed, cannot send a message.")
  468. }
  469. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  470. // it will be thrown when writing inbound.
  471. try? channel.throwIfErrorCaught()
  472. // Server sends back response message
  473. var buffer = ByteBuffer()
  474. buffer.writeInteger(UInt8(0)) // not compressed
  475. buffer.writeInteger(UInt32(42)) // message length
  476. buffer.writeRepeatingByte(0, count: 42) // message
  477. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  478. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  479. // Make sure we read the message properly
  480. XCTAssertEqual(
  481. try channel.readInbound(as: RPCResponsePart.self),
  482. RPCResponsePart.message([UInt8](repeating: 0, count: 42))
  483. )
  484. // Server sends status to end RPC
  485. XCTAssertNoThrow(
  486. try channel.writeInbound(
  487. HTTP2Frame.FramePayload.headers(
  488. .init(headers: [
  489. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  490. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  491. "custom-header": "custom-value",
  492. ])
  493. )
  494. )
  495. )
  496. XCTAssertEqual(
  497. try channel.readInbound(as: RPCResponsePart.self),
  498. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  499. )
  500. }
  501. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  502. let handler = GRPCClientStreamHandler(
  503. methodDescriptor: .init(service: "test", method: "test"),
  504. scheme: .http,
  505. outboundEncoding: .none,
  506. acceptedEncodings: [],
  507. maximumPayloadSize: 100,
  508. skipStateMachineAssertions: true
  509. )
  510. let channel = EmbeddedChannel(handler: handler)
  511. // Send client's initial metadata
  512. let request = RPCRequestPart.metadata([:])
  513. XCTAssertNoThrow(try channel.writeOutbound(request))
  514. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  515. let writtenHeaders = try channel.assertReadHeadersOutbound()
  516. XCTAssertEqual(
  517. writtenHeaders.headers,
  518. [
  519. GRPCHTTP2Keys.method.rawValue: "POST",
  520. GRPCHTTP2Keys.scheme.rawValue: "http",
  521. GRPCHTTP2Keys.path.rawValue: "test/test",
  522. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  523. GRPCHTTP2Keys.te.rawValue: "trailers",
  524. ]
  525. )
  526. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  527. // Receive server's initial metadata
  528. let serverInitialMetadata: HPACKHeaders = [
  529. GRPCHTTP2Keys.status.rawValue: "200",
  530. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  531. "some-custom-header": "some-custom-value",
  532. ]
  533. XCTAssertNoThrow(
  534. try channel.writeInbound(
  535. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  536. )
  537. )
  538. XCTAssertEqual(
  539. try channel.readInbound(as: RPCResponsePart.self),
  540. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  541. )
  542. // Send a message
  543. XCTAssertNoThrow(
  544. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  545. )
  546. // Assert we wrote it successfully into the channel
  547. let writtenMessage = try channel.assertReadDataOutbound()
  548. var expectedBuffer = ByteBuffer()
  549. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  550. expectedBuffer.writeInteger(UInt32(42)) // message length
  551. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  552. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  553. // Receive server's first message
  554. var buffer = ByteBuffer()
  555. buffer.writeInteger(UInt8(0)) // not compressed
  556. XCTAssertNoThrow(
  557. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  558. )
  559. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  560. buffer.clear()
  561. buffer.writeInteger(UInt32(30)) // message length
  562. XCTAssertNoThrow(
  563. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  564. )
  565. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  566. buffer.clear()
  567. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  568. XCTAssertNoThrow(
  569. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  570. )
  571. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  572. buffer.clear()
  573. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  574. XCTAssertNoThrow(
  575. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  576. )
  577. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  578. buffer.clear()
  579. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  580. XCTAssertNoThrow(
  581. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  582. )
  583. // Make sure we read the message properly
  584. XCTAssertEqual(
  585. try channel.readInbound(as: RPCResponsePart.self),
  586. RPCResponsePart.message(
  587. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  588. + [UInt8](repeating: 2, count: 10)
  589. )
  590. )
  591. }
  592. func testSendMultipleMessagesInSingleBuffer() throws {
  593. let handler = GRPCClientStreamHandler(
  594. methodDescriptor: .init(service: "test", method: "test"),
  595. scheme: .http,
  596. outboundEncoding: .none,
  597. acceptedEncodings: [],
  598. maximumPayloadSize: 100,
  599. skipStateMachineAssertions: true
  600. )
  601. let channel = EmbeddedChannel(handler: handler)
  602. // Send client's initial metadata
  603. let request = RPCRequestPart.metadata([:])
  604. XCTAssertNoThrow(try channel.writeOutbound(request))
  605. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  606. let writtenHeaders = try channel.assertReadHeadersOutbound()
  607. XCTAssertEqual(
  608. writtenHeaders.headers,
  609. [
  610. GRPCHTTP2Keys.method.rawValue: "POST",
  611. GRPCHTTP2Keys.scheme.rawValue: "http",
  612. GRPCHTTP2Keys.path.rawValue: "test/test",
  613. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  614. GRPCHTTP2Keys.te.rawValue: "trailers",
  615. ]
  616. )
  617. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  618. // Receive server's initial metadata
  619. let serverInitialMetadata: HPACKHeaders = [
  620. GRPCHTTP2Keys.status.rawValue: "200",
  621. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  622. "some-custom-header": "some-custom-value",
  623. ]
  624. XCTAssertNoThrow(
  625. try channel.writeInbound(
  626. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  627. )
  628. )
  629. XCTAssertEqual(
  630. try channel.readInbound(as: RPCResponsePart.self),
  631. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  632. )
  633. // This is where this test actually begins. We want to write two messages
  634. // without flushing, and make sure that no messages are sent down the pipeline
  635. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  636. // Write back first message and make sure nothing's written in the channel.
  637. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4))))
  638. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  639. // Write back second message and make sure nothing's written in the channel.
  640. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4))))
  641. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  642. // Now flush and check we *do* write the data.
  643. channel.flush()
  644. let writtenMessage = try channel.assertReadDataOutbound()
  645. // Make sure both messages have been framed together in the ByteBuffer.
  646. XCTAssertEqual(
  647. writtenMessage.data,
  648. .byteBuffer(
  649. .init(bytes: [
  650. // First message
  651. 0, // Compression disabled
  652. 0, 0, 0, 4, // Message length
  653. 1, 1, 1, 1, // First message data
  654. // Second message
  655. 0, // Compression disabled
  656. 0, 0, 0, 4, // Message length
  657. 2, 2, 2, 2, // Second message data
  658. ])
  659. )
  660. )
  661. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  662. }
  663. }
  664. extension EmbeddedChannel {
  665. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  666. guard
  667. case .headers(let writtenHeaders) = try XCTUnwrap(
  668. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  669. )
  670. else {
  671. throw TestError.assertionFailure("Expected to write headers")
  672. }
  673. return writtenHeaders
  674. }
  675. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  676. guard
  677. case .data(let writtenMessage) = try XCTUnwrap(
  678. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  679. )
  680. else {
  681. throw TestError.assertionFailure("Expected to write data")
  682. }
  683. return writtenMessage
  684. }
  685. }
  686. private enum TestError: Error {
  687. case assertionFailure(String)
  688. }