GRPCClientStreamHandlerTests.swift 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCClientStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCClientStreamHandler(
  27. methodDescriptor: .testTest,
  28. scheme: .http,
  29. authority: nil,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maxPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  39. // .priority(
  40. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  41. // ),
  42. .settings(.ack),
  43. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  44. .windowUpdate(windowSizeIncrement: 4),
  45. .alternativeService(origin: nil, field: nil),
  46. .origin([]),
  47. ]
  48. for toBeIgnored in framesToBeIgnored {
  49. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  50. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  51. }
  52. }
  53. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  54. let handler = GRPCClientStreamHandler(
  55. methodDescriptor: .testTest,
  56. scheme: .http,
  57. authority: nil,
  58. outboundEncoding: .none,
  59. acceptedEncodings: [],
  60. maxPayloadSize: 1,
  61. skipStateMachineAssertions: true
  62. )
  63. let channel = EmbeddedChannel(handler: handler)
  64. // Send client's initial metadata
  65. let request = RPCRequestPart.metadata([:])
  66. XCTAssertNoThrow(try channel.writeOutbound(request))
  67. // Receive server's initial metadata without :status
  68. let serverInitialMetadata: HPACKHeaders = [
  69. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  70. ]
  71. XCTAssertNoThrow(
  72. try channel.writeInbound(
  73. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  74. )
  75. )
  76. XCTAssertEqual(
  77. try channel.readInbound(as: RPCResponsePart.self),
  78. .status(
  79. .init(code: .unknown, message: "HTTP Status Code is missing."),
  80. Metadata(headers: serverInitialMetadata)
  81. )
  82. )
  83. }
  84. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  85. let handler = GRPCClientStreamHandler(
  86. methodDescriptor: .testTest,
  87. scheme: .http,
  88. authority: nil,
  89. outboundEncoding: .none,
  90. acceptedEncodings: [],
  91. maxPayloadSize: 1,
  92. skipStateMachineAssertions: true
  93. )
  94. let channel = EmbeddedChannel(handler: handler)
  95. // Send client's initial metadata
  96. let request = RPCRequestPart.metadata([:])
  97. XCTAssertNoThrow(try channel.writeOutbound(request))
  98. // Receive server's initial metadata with 1xx status
  99. let serverInitialMetadata: HPACKHeaders = [
  100. GRPCHTTP2Keys.status.rawValue: "104",
  101. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  102. ]
  103. XCTAssertNoThrow(
  104. try channel.writeInbound(
  105. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  106. )
  107. )
  108. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  109. }
  110. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  111. let handler = GRPCClientStreamHandler(
  112. methodDescriptor: .testTest,
  113. scheme: .http,
  114. authority: nil,
  115. outboundEncoding: .none,
  116. acceptedEncodings: [],
  117. maxPayloadSize: 1,
  118. skipStateMachineAssertions: true
  119. )
  120. let channel = EmbeddedChannel(handler: handler)
  121. // Send client's initial metadata
  122. let request = RPCRequestPart.metadata([:])
  123. XCTAssertNoThrow(try channel.writeOutbound(request))
  124. // Receive server's initial metadata with non-200 and non-1xx :status
  125. let serverInitialMetadata: HPACKHeaders = [
  126. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  127. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  128. ]
  129. XCTAssertNoThrow(
  130. try channel.writeInbound(
  131. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  132. )
  133. )
  134. XCTAssertEqual(
  135. try channel.readInbound(as: RPCResponsePart.self),
  136. .status(
  137. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  138. Metadata(headers: serverInitialMetadata)
  139. )
  140. )
  141. }
  142. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  143. let handler = GRPCClientStreamHandler(
  144. methodDescriptor: .testTest,
  145. scheme: .http,
  146. authority: nil,
  147. outboundEncoding: .none,
  148. acceptedEncodings: [],
  149. maxPayloadSize: 1,
  150. skipStateMachineAssertions: true
  151. )
  152. let channel = EmbeddedChannel(handler: handler)
  153. // Send client's initial metadata
  154. let request = RPCRequestPart.metadata([:])
  155. XCTAssertNoThrow(try channel.writeOutbound(request))
  156. // Receive server's initial metadata without content-type
  157. let serverInitialMetadata: HPACKHeaders = [
  158. GRPCHTTP2Keys.status.rawValue: "200"
  159. ]
  160. XCTAssertNoThrow(
  161. try channel.writeInbound(
  162. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  163. )
  164. )
  165. XCTAssertEqual(
  166. try channel.readInbound(as: RPCResponsePart.self),
  167. .status(
  168. .init(code: .internalError, message: "Missing content-type header"),
  169. Metadata(headers: serverInitialMetadata)
  170. )
  171. )
  172. }
  173. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  174. let handler = GRPCClientStreamHandler(
  175. methodDescriptor: .testTest,
  176. scheme: .http,
  177. authority: nil,
  178. outboundEncoding: .deflate,
  179. acceptedEncodings: [.deflate],
  180. maxPayloadSize: 1
  181. )
  182. let channel = EmbeddedChannel(handler: handler)
  183. // Send client's initial metadata
  184. XCTAssertNoThrow(
  185. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  186. )
  187. // Make sure we have sent right metadata.
  188. let writtenMetadata = try channel.assertReadHeadersOutbound()
  189. XCTAssertEqual(
  190. writtenMetadata.headers,
  191. [
  192. GRPCHTTP2Keys.method.rawValue: "POST",
  193. GRPCHTTP2Keys.scheme.rawValue: "http",
  194. GRPCHTTP2Keys.path.rawValue: "/test/test",
  195. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  196. GRPCHTTP2Keys.te.rawValue: "trailers",
  197. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  198. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  199. ]
  200. )
  201. // Server sends initial metadata with unsupported encoding
  202. let serverInitialMetadata: HPACKHeaders = [
  203. GRPCHTTP2Keys.status.rawValue: "200",
  204. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  205. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  206. ]
  207. XCTAssertNoThrow(
  208. try channel.writeInbound(
  209. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  210. )
  211. )
  212. XCTAssertEqual(
  213. try channel.readInbound(as: RPCResponsePart.self),
  214. .status(
  215. .init(
  216. code: .internalError,
  217. message:
  218. "The server picked a compression algorithm ('gzip') the client does not know about."
  219. ),
  220. Metadata(headers: serverInitialMetadata)
  221. )
  222. )
  223. }
  224. func testOverMaximumPayloadSize() throws {
  225. let handler = GRPCClientStreamHandler(
  226. methodDescriptor: .testTest,
  227. scheme: .http,
  228. authority: nil,
  229. outboundEncoding: .none,
  230. acceptedEncodings: [],
  231. maxPayloadSize: 1,
  232. skipStateMachineAssertions: true
  233. )
  234. let channel = EmbeddedChannel(handler: handler)
  235. // Send client's initial metadata
  236. XCTAssertNoThrow(
  237. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  238. )
  239. // Make sure we have sent right metadata.
  240. let writtenMetadata = try channel.assertReadHeadersOutbound()
  241. XCTAssertEqual(
  242. writtenMetadata.headers,
  243. [
  244. GRPCHTTP2Keys.method.rawValue: "POST",
  245. GRPCHTTP2Keys.scheme.rawValue: "http",
  246. GRPCHTTP2Keys.path.rawValue: "/test/test",
  247. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  248. GRPCHTTP2Keys.te.rawValue: "trailers",
  249. ]
  250. )
  251. // Server sends initial metadata
  252. let serverInitialMetadata: HPACKHeaders = [
  253. GRPCHTTP2Keys.status.rawValue: "200",
  254. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  255. ]
  256. XCTAssertNoThrow(
  257. try channel.writeInbound(
  258. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  259. )
  260. )
  261. XCTAssertEqual(
  262. try channel.readInbound(as: RPCResponsePart.self),
  263. .metadata(Metadata(headers: serverInitialMetadata))
  264. )
  265. // Server sends message over payload limit
  266. var buffer = ByteBuffer()
  267. buffer.writeInteger(UInt8(0)) // not compressed
  268. buffer.writeInteger(UInt32(42)) // message length
  269. buffer.writeRepeatingByte(0, count: 42) // message
  270. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  271. data: .byteBuffer(buffer),
  272. endStream: false
  273. )
  274. // Invalid payload should result in error status and stream being closed
  275. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  276. let part = try channel.readInbound(as: RPCResponsePart.self)
  277. XCTAssertEqual(
  278. part,
  279. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  280. )
  281. channel.embeddedEventLoop.run()
  282. try channel.closeFuture.wait()
  283. }
  284. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  285. let handler = GRPCClientStreamHandler(
  286. methodDescriptor: .testTest,
  287. scheme: .http,
  288. authority: nil,
  289. outboundEncoding: .none,
  290. acceptedEncodings: [],
  291. maxPayloadSize: 100,
  292. skipStateMachineAssertions: true
  293. )
  294. let channel = EmbeddedChannel(handler: handler)
  295. // Send client's initial metadata
  296. XCTAssertNoThrow(
  297. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  298. )
  299. // Make sure we have sent right metadata.
  300. let writtenMetadata = try channel.assertReadHeadersOutbound()
  301. XCTAssertEqual(
  302. writtenMetadata.headers,
  303. [
  304. GRPCHTTP2Keys.method.rawValue: "POST",
  305. GRPCHTTP2Keys.scheme.rawValue: "http",
  306. GRPCHTTP2Keys.path.rawValue: "/test/test",
  307. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  308. GRPCHTTP2Keys.te.rawValue: "trailers",
  309. ]
  310. )
  311. // Server sends initial metadata
  312. let serverInitialMetadata: HPACKHeaders = [
  313. GRPCHTTP2Keys.status.rawValue: "200",
  314. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  315. ]
  316. XCTAssertNoThrow(
  317. try channel.writeInbound(
  318. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  319. )
  320. )
  321. XCTAssertEqual(
  322. try channel.readInbound(as: RPCResponsePart.self),
  323. .metadata(Metadata(headers: serverInitialMetadata))
  324. )
  325. // Server sends message with EOS set.
  326. var buffer = ByteBuffer()
  327. buffer.writeInteger(UInt8(0)) // not compressed
  328. buffer.writeInteger(UInt32(42)) // message length
  329. buffer.writeRepeatingByte(0, count: 42) // message
  330. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  331. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  332. // Make sure we got status + trailers with the right error.
  333. XCTAssertEqual(
  334. try channel.readInbound(as: RPCResponsePart.self),
  335. .status(
  336. Status(
  337. code: .internalError,
  338. message:
  339. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  340. ),
  341. [:]
  342. )
  343. )
  344. }
  345. func testServerEndsStream() throws {
  346. let handler = GRPCClientStreamHandler(
  347. methodDescriptor: .testTest,
  348. scheme: .http,
  349. authority: nil,
  350. outboundEncoding: .none,
  351. acceptedEncodings: [],
  352. maxPayloadSize: 1,
  353. skipStateMachineAssertions: true
  354. )
  355. let channel = EmbeddedChannel(handler: handler)
  356. // Write client's initial metadata
  357. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  358. let clientInitialMetadata: HPACKHeaders = [
  359. GRPCHTTP2Keys.path.rawValue: "/test/test",
  360. GRPCHTTP2Keys.scheme.rawValue: "http",
  361. GRPCHTTP2Keys.method.rawValue: "POST",
  362. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  363. GRPCHTTP2Keys.te.rawValue: "trailers",
  364. ]
  365. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  366. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  367. // Receive server's initial metadata with end stream set
  368. let serverInitialMetadata: HPACKHeaders = [
  369. GRPCHTTP2Keys.status.rawValue: "200",
  370. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  371. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  372. ]
  373. XCTAssertNoThrow(
  374. try channel.writeInbound(
  375. HTTP2Frame.FramePayload.headers(
  376. .init(
  377. headers: serverInitialMetadata,
  378. endStream: true
  379. )
  380. )
  381. )
  382. )
  383. XCTAssertEqual(
  384. try channel.readInbound(as: RPCResponsePart.self),
  385. .status(
  386. .init(code: .ok, message: ""),
  387. [
  388. GRPCHTTP2Keys.status.rawValue: "200",
  389. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  390. ]
  391. )
  392. )
  393. // We should throw if the server sends another message, since it's closed the stream already.
  394. var buffer = ByteBuffer()
  395. buffer.writeInteger(UInt8(0)) // not compressed
  396. buffer.writeInteger(UInt32(42)) // message length
  397. buffer.writeRepeatingByte(0, count: 42) // message
  398. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  399. XCTAssertThrowsError(
  400. ofType: RPCError.self,
  401. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  402. ) { error in
  403. XCTAssertEqual(error.code, .internalError)
  404. XCTAssertEqual(error.message, "Invalid state")
  405. }
  406. }
  407. func testNormalFlow() throws {
  408. let handler = GRPCClientStreamHandler(
  409. methodDescriptor: .testTest,
  410. scheme: .http,
  411. authority: nil,
  412. outboundEncoding: .none,
  413. acceptedEncodings: [],
  414. maxPayloadSize: 100,
  415. skipStateMachineAssertions: true
  416. )
  417. let channel = EmbeddedChannel(handler: handler)
  418. // Send client's initial metadata
  419. let request = RPCRequestPart.metadata([:])
  420. XCTAssertNoThrow(try channel.writeOutbound(request))
  421. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  422. let writtenHeaders = try channel.assertReadHeadersOutbound()
  423. XCTAssertEqual(
  424. writtenHeaders.headers,
  425. [
  426. GRPCHTTP2Keys.method.rawValue: "POST",
  427. GRPCHTTP2Keys.scheme.rawValue: "http",
  428. GRPCHTTP2Keys.path.rawValue: "/test/test",
  429. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  430. GRPCHTTP2Keys.te.rawValue: "trailers",
  431. ]
  432. )
  433. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  434. // Receive server's initial metadata
  435. let serverInitialMetadata: HPACKHeaders = [
  436. GRPCHTTP2Keys.status.rawValue: "200",
  437. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  438. "some-custom-header": "some-custom-value",
  439. ]
  440. XCTAssertNoThrow(
  441. try channel.writeInbound(
  442. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  443. )
  444. )
  445. XCTAssertEqual(
  446. try channel.readInbound(as: RPCResponsePart.self),
  447. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  448. )
  449. // Send a message
  450. XCTAssertNoThrow(
  451. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  452. )
  453. // Assert we wrote it successfully into the channel
  454. let writtenMessage = try channel.assertReadDataOutbound()
  455. var expectedBuffer = ByteBuffer()
  456. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  457. expectedBuffer.writeInteger(UInt32(42)) // message length
  458. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  459. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  460. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  461. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  462. // Make sure the EOS frame was sent
  463. let emptyEOSFrame = try channel.assertReadDataOutbound()
  464. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  465. XCTAssertTrue(emptyEOSFrame.endStream)
  466. // Make sure that, if we flush again, we're not writing anything else down
  467. // the stream. We should have closed at this point.
  468. channel.flush()
  469. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  470. // Make sure we cannot write anymore because client's closed.
  471. XCTAssertThrowsError(
  472. ofType: RPCError.self,
  473. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  474. ) { error in
  475. XCTAssertEqual(error.code, .internalError)
  476. XCTAssertEqual(error.message, "Invalid state")
  477. }
  478. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  479. // it will be thrown when writing inbound.
  480. try? channel.throwIfErrorCaught()
  481. // Server sends back response message
  482. var buffer = ByteBuffer()
  483. buffer.writeInteger(UInt8(0)) // not compressed
  484. buffer.writeInteger(UInt32(42)) // message length
  485. buffer.writeRepeatingByte(0, count: 42) // message
  486. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  487. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  488. // Make sure we read the message properly
  489. XCTAssertEqual(
  490. try channel.readInbound(as: RPCResponsePart.self),
  491. RPCResponsePart.message([UInt8](repeating: 0, count: 42))
  492. )
  493. // Server sends status to end RPC
  494. XCTAssertNoThrow(
  495. try channel.writeInbound(
  496. HTTP2Frame.FramePayload.headers(
  497. .init(headers: [
  498. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  499. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  500. "custom-header": "custom-value",
  501. ])
  502. )
  503. )
  504. )
  505. XCTAssertEqual(
  506. try channel.readInbound(as: RPCResponsePart.self),
  507. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  508. )
  509. }
  510. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  511. let handler = GRPCClientStreamHandler(
  512. methodDescriptor: .testTest,
  513. scheme: .http,
  514. authority: nil,
  515. outboundEncoding: .none,
  516. acceptedEncodings: [],
  517. maxPayloadSize: 100,
  518. skipStateMachineAssertions: true
  519. )
  520. let channel = EmbeddedChannel(handler: handler)
  521. // Send client's initial metadata
  522. let request = RPCRequestPart.metadata([:])
  523. XCTAssertNoThrow(try channel.writeOutbound(request))
  524. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  525. let writtenHeaders = try channel.assertReadHeadersOutbound()
  526. XCTAssertEqual(
  527. writtenHeaders.headers,
  528. [
  529. GRPCHTTP2Keys.method.rawValue: "POST",
  530. GRPCHTTP2Keys.scheme.rawValue: "http",
  531. GRPCHTTP2Keys.path.rawValue: "/test/test",
  532. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  533. GRPCHTTP2Keys.te.rawValue: "trailers",
  534. ]
  535. )
  536. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  537. // Receive server's initial metadata
  538. let serverInitialMetadata: HPACKHeaders = [
  539. GRPCHTTP2Keys.status.rawValue: "200",
  540. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  541. "some-custom-header": "some-custom-value",
  542. ]
  543. XCTAssertNoThrow(
  544. try channel.writeInbound(
  545. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  546. )
  547. )
  548. XCTAssertEqual(
  549. try channel.readInbound(as: RPCResponsePart.self),
  550. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  551. )
  552. // Send a message
  553. XCTAssertNoThrow(
  554. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  555. )
  556. // Assert we wrote it successfully into the channel
  557. let writtenMessage = try channel.assertReadDataOutbound()
  558. var expectedBuffer = ByteBuffer()
  559. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  560. expectedBuffer.writeInteger(UInt32(42)) // message length
  561. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  562. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  563. // Receive server's first message
  564. var buffer = ByteBuffer()
  565. buffer.writeInteger(UInt8(0)) // not compressed
  566. XCTAssertNoThrow(
  567. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  568. )
  569. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  570. buffer.clear()
  571. buffer.writeInteger(UInt32(30)) // message length
  572. XCTAssertNoThrow(
  573. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  574. )
  575. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  576. buffer.clear()
  577. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  578. XCTAssertNoThrow(
  579. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  580. )
  581. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  582. buffer.clear()
  583. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  584. XCTAssertNoThrow(
  585. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  586. )
  587. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  588. buffer.clear()
  589. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  590. XCTAssertNoThrow(
  591. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  592. )
  593. // Make sure we read the message properly
  594. XCTAssertEqual(
  595. try channel.readInbound(as: RPCResponsePart.self),
  596. RPCResponsePart.message(
  597. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  598. + [UInt8](repeating: 2, count: 10)
  599. )
  600. )
  601. }
  602. func testSendMultipleMessagesInSingleBuffer() throws {
  603. let handler = GRPCClientStreamHandler(
  604. methodDescriptor: .testTest,
  605. scheme: .http,
  606. authority: nil,
  607. outboundEncoding: .none,
  608. acceptedEncodings: [],
  609. maxPayloadSize: 100,
  610. skipStateMachineAssertions: true
  611. )
  612. let channel = EmbeddedChannel(handler: handler)
  613. // Send client's initial metadata
  614. let request = RPCRequestPart.metadata([:])
  615. XCTAssertNoThrow(try channel.writeOutbound(request))
  616. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  617. let writtenHeaders = try channel.assertReadHeadersOutbound()
  618. XCTAssertEqual(
  619. writtenHeaders.headers,
  620. [
  621. GRPCHTTP2Keys.method.rawValue: "POST",
  622. GRPCHTTP2Keys.scheme.rawValue: "http",
  623. GRPCHTTP2Keys.path.rawValue: "/test/test",
  624. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  625. GRPCHTTP2Keys.te.rawValue: "trailers",
  626. ]
  627. )
  628. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  629. // Receive server's initial metadata
  630. let serverInitialMetadata: HPACKHeaders = [
  631. GRPCHTTP2Keys.status.rawValue: "200",
  632. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  633. "some-custom-header": "some-custom-value",
  634. ]
  635. XCTAssertNoThrow(
  636. try channel.writeInbound(
  637. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  638. )
  639. )
  640. XCTAssertEqual(
  641. try channel.readInbound(as: RPCResponsePart.self),
  642. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  643. )
  644. // This is where this test actually begins. We want to write two messages
  645. // without flushing, and make sure that no messages are sent down the pipeline
  646. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  647. // Write back first message and make sure nothing's written in the channel.
  648. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4))))
  649. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  650. // Write back second message and make sure nothing's written in the channel.
  651. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4))))
  652. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  653. // Now flush and check we *do* write the data.
  654. channel.flush()
  655. let writtenMessage = try channel.assertReadDataOutbound()
  656. // Make sure both messages have been framed together in the ByteBuffer.
  657. XCTAssertEqual(
  658. writtenMessage.data,
  659. .byteBuffer(
  660. .init(bytes: [
  661. // First message
  662. 0, // Compression disabled
  663. 0, 0, 0, 4, // Message length
  664. 1, 1, 1, 1, // First message data
  665. // Second message
  666. 0, // Compression disabled
  667. 0, 0, 0, 4, // Message length
  668. 2, 2, 2, 2, // Second message data
  669. ])
  670. )
  671. )
  672. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  673. }
  674. func testUnexpectedStreamClose_ErrorFired() throws {
  675. let handler = GRPCClientStreamHandler(
  676. methodDescriptor: .testTest,
  677. scheme: .http,
  678. authority: nil,
  679. outboundEncoding: .none,
  680. acceptedEncodings: [],
  681. maxPayloadSize: 1,
  682. skipStateMachineAssertions: true
  683. )
  684. let channel = EmbeddedChannel(handler: handler)
  685. // Write client's initial metadata
  686. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  687. let clientInitialMetadata: HPACKHeaders = [
  688. GRPCHTTP2Keys.path.rawValue: "/test/test",
  689. GRPCHTTP2Keys.scheme.rawValue: "http",
  690. GRPCHTTP2Keys.method.rawValue: "POST",
  691. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  692. GRPCHTTP2Keys.te.rawValue: "trailers",
  693. ]
  694. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  695. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  696. // An error is fired down the pipeline
  697. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  698. channel.pipeline.fireErrorCaught(thrownError)
  699. // The client receives a status explaining the stream was closed because of the thrown error.
  700. XCTAssertEqual(
  701. try channel.readInbound(as: RPCResponsePart.self),
  702. .status(
  703. .init(
  704. code: .unavailable,
  705. message: "Stream unexpectedly closed with error."
  706. ),
  707. [:]
  708. )
  709. )
  710. // We should now be closed: check we can't write anymore.
  711. XCTAssertThrowsError(
  712. ofType: RPCError.self,
  713. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  714. ) { error in
  715. XCTAssertEqual(error.code, .internalError)
  716. XCTAssertEqual(error.message, "Invalid state")
  717. }
  718. }
  719. func testUnexpectedStreamClose_ChannelInactive() throws {
  720. let handler = GRPCClientStreamHandler(
  721. methodDescriptor: .testTest,
  722. scheme: .http,
  723. authority: nil,
  724. outboundEncoding: .none,
  725. acceptedEncodings: [],
  726. maxPayloadSize: 1,
  727. skipStateMachineAssertions: true
  728. )
  729. let channel = EmbeddedChannel(handler: handler)
  730. // Write client's initial metadata
  731. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  732. let clientInitialMetadata: HPACKHeaders = [
  733. GRPCHTTP2Keys.path.rawValue: "/test/test",
  734. GRPCHTTP2Keys.scheme.rawValue: "http",
  735. GRPCHTTP2Keys.method.rawValue: "POST",
  736. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  737. GRPCHTTP2Keys.te.rawValue: "trailers",
  738. ]
  739. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  740. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  741. // Channel becomes inactive
  742. channel.pipeline.fireChannelInactive()
  743. // The client receives a status explaining the stream was closed.
  744. XCTAssertEqual(
  745. try channel.readInbound(as: RPCResponsePart.self),
  746. .status(
  747. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  748. [:]
  749. )
  750. )
  751. // We should now be closed: check we can't write anymore.
  752. XCTAssertThrowsError(
  753. ofType: RPCError.self,
  754. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  755. ) { error in
  756. XCTAssertEqual(error.code, .internalError)
  757. XCTAssertEqual(error.message, "Invalid state")
  758. }
  759. }
  760. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  761. let handler = GRPCClientStreamHandler(
  762. methodDescriptor: .testTest,
  763. scheme: .http,
  764. authority: nil,
  765. outboundEncoding: .none,
  766. acceptedEncodings: [],
  767. maxPayloadSize: 1,
  768. skipStateMachineAssertions: true
  769. )
  770. let channel = EmbeddedChannel(handler: handler)
  771. // Write client's initial metadata
  772. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  773. let clientInitialMetadata: HPACKHeaders = [
  774. GRPCHTTP2Keys.path.rawValue: "/test/test",
  775. GRPCHTTP2Keys.scheme.rawValue: "http",
  776. GRPCHTTP2Keys.method.rawValue: "POST",
  777. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  778. GRPCHTTP2Keys.te.rawValue: "trailers",
  779. ]
  780. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  781. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  782. // Receive RST_STREAM
  783. XCTAssertNoThrow(
  784. try channel.writeInbound(
  785. HTTP2Frame.FramePayload.rstStream(.internalError)
  786. )
  787. )
  788. // The client receives a status explaining RST_STREAM was sent.
  789. XCTAssertEqual(
  790. try channel.readInbound(as: RPCResponsePart.self),
  791. .status(
  792. .init(
  793. code: .unavailable,
  794. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  795. ),
  796. [:]
  797. )
  798. )
  799. // We should now be closed: check we can't write anymore.
  800. XCTAssertThrowsError(
  801. ofType: RPCError.self,
  802. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  803. ) { error in
  804. XCTAssertEqual(error.code, .internalError)
  805. XCTAssertEqual(error.message, "Invalid state")
  806. }
  807. }
  808. }
  809. extension EmbeddedChannel {
  810. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  811. guard
  812. case .headers(let writtenHeaders) = try XCTUnwrap(
  813. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  814. )
  815. else {
  816. throw TestError.assertionFailure("Expected to write headers")
  817. }
  818. return writtenHeaders
  819. }
  820. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  821. guard
  822. case .data(let writtenMessage) = try XCTUnwrap(
  823. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  824. )
  825. else {
  826. throw TestError.assertionFailure("Expected to write data")
  827. }
  828. return writtenMessage
  829. }
  830. }
  831. private enum TestError: Error {
  832. case assertionFailure(String)
  833. }
  834. extension MethodDescriptor {
  835. static let testTest = Self(fullyQualifiedService: "test", method: "test")
  836. }