GRPCClientStreamHandlerTests.swift 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCHTTP2Core
  24. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  25. final class GRPCClientStreamHandlerTests: XCTestCase {
  26. func testH2FramesAreIgnored() throws {
  27. let handler = GRPCClientStreamHandler(
  28. methodDescriptor: .init(service: "test", method: "test"),
  29. scheme: .http,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maximumPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  39. // .priority(
  40. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  41. // ),
  42. .settings(.ack),
  43. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  44. .windowUpdate(windowSizeIncrement: 4),
  45. .alternativeService(origin: nil, field: nil),
  46. .origin([]),
  47. ]
  48. for toBeIgnored in framesToBeIgnored {
  49. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  50. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  51. }
  52. }
  53. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  54. let handler = GRPCClientStreamHandler(
  55. methodDescriptor: .init(service: "test", method: "test"),
  56. scheme: .http,
  57. outboundEncoding: .none,
  58. acceptedEncodings: [],
  59. maximumPayloadSize: 1,
  60. skipStateMachineAssertions: true
  61. )
  62. let channel = EmbeddedChannel(handler: handler)
  63. // Send client's initial metadata
  64. let request = RPCRequestPart.metadata([:])
  65. XCTAssertNoThrow(try channel.writeOutbound(request))
  66. // Receive server's initial metadata without :status
  67. let serverInitialMetadata: HPACKHeaders = [
  68. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  69. ]
  70. XCTAssertNoThrow(
  71. try channel.writeInbound(
  72. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  73. )
  74. )
  75. XCTAssertEqual(
  76. try channel.readInbound(as: RPCResponsePart.self),
  77. .status(
  78. .init(code: .unknown, message: "HTTP Status Code is missing."),
  79. Metadata(headers: serverInitialMetadata)
  80. )
  81. )
  82. }
  83. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  84. let handler = GRPCClientStreamHandler(
  85. methodDescriptor: .init(service: "test", method: "test"),
  86. scheme: .http,
  87. outboundEncoding: .none,
  88. acceptedEncodings: [],
  89. maximumPayloadSize: 1,
  90. skipStateMachineAssertions: true
  91. )
  92. let channel = EmbeddedChannel(handler: handler)
  93. // Send client's initial metadata
  94. let request = RPCRequestPart.metadata([:])
  95. XCTAssertNoThrow(try channel.writeOutbound(request))
  96. // Receive server's initial metadata with 1xx status
  97. let serverInitialMetadata: HPACKHeaders = [
  98. GRPCHTTP2Keys.status.rawValue: "104",
  99. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  100. ]
  101. XCTAssertNoThrow(
  102. try channel.writeInbound(
  103. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  104. )
  105. )
  106. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  107. }
  108. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  109. let handler = GRPCClientStreamHandler(
  110. methodDescriptor: .init(service: "test", method: "test"),
  111. scheme: .http,
  112. outboundEncoding: .none,
  113. acceptedEncodings: [],
  114. maximumPayloadSize: 1,
  115. skipStateMachineAssertions: true
  116. )
  117. let channel = EmbeddedChannel(handler: handler)
  118. // Send client's initial metadata
  119. let request = RPCRequestPart.metadata([:])
  120. XCTAssertNoThrow(try channel.writeOutbound(request))
  121. // Receive server's initial metadata with non-200 and non-1xx :status
  122. let serverInitialMetadata: HPACKHeaders = [
  123. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  124. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  125. ]
  126. XCTAssertNoThrow(
  127. try channel.writeInbound(
  128. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  129. )
  130. )
  131. XCTAssertEqual(
  132. try channel.readInbound(as: RPCResponsePart.self),
  133. .status(
  134. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  135. Metadata(headers: serverInitialMetadata)
  136. )
  137. )
  138. }
  139. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  140. let handler = GRPCClientStreamHandler(
  141. methodDescriptor: .init(service: "test", method: "test"),
  142. scheme: .http,
  143. outboundEncoding: .none,
  144. acceptedEncodings: [],
  145. maximumPayloadSize: 1,
  146. skipStateMachineAssertions: true
  147. )
  148. let channel = EmbeddedChannel(handler: handler)
  149. // Send client's initial metadata
  150. let request = RPCRequestPart.metadata([:])
  151. XCTAssertNoThrow(try channel.writeOutbound(request))
  152. // Receive server's initial metadata without content-type
  153. let serverInitialMetadata: HPACKHeaders = [
  154. GRPCHTTP2Keys.status.rawValue: "200"
  155. ]
  156. XCTAssertNoThrow(
  157. try channel.writeInbound(
  158. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  159. )
  160. )
  161. XCTAssertEqual(
  162. try channel.readInbound(as: RPCResponsePart.self),
  163. .status(
  164. .init(code: .internalError, message: "Missing content-type header"),
  165. Metadata(headers: serverInitialMetadata)
  166. )
  167. )
  168. }
  169. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  170. let handler = GRPCClientStreamHandler(
  171. methodDescriptor: .init(service: "test", method: "test"),
  172. scheme: .http,
  173. outboundEncoding: .deflate,
  174. acceptedEncodings: [.deflate],
  175. maximumPayloadSize: 1
  176. )
  177. let channel = EmbeddedChannel(handler: handler)
  178. // Send client's initial metadata
  179. XCTAssertNoThrow(
  180. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  181. )
  182. // Make sure we have sent right metadata.
  183. let writtenMetadata = try channel.assertReadHeadersOutbound()
  184. XCTAssertEqual(
  185. writtenMetadata.headers,
  186. [
  187. GRPCHTTP2Keys.method.rawValue: "POST",
  188. GRPCHTTP2Keys.scheme.rawValue: "http",
  189. GRPCHTTP2Keys.path.rawValue: "/test/test",
  190. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  191. GRPCHTTP2Keys.te.rawValue: "trailers",
  192. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  193. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  194. ]
  195. )
  196. // Server sends initial metadata with unsupported encoding
  197. let serverInitialMetadata: HPACKHeaders = [
  198. GRPCHTTP2Keys.status.rawValue: "200",
  199. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  200. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  201. ]
  202. XCTAssertNoThrow(
  203. try channel.writeInbound(
  204. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  205. )
  206. )
  207. XCTAssertEqual(
  208. try channel.readInbound(as: RPCResponsePart.self),
  209. .status(
  210. .init(
  211. code: .internalError,
  212. message:
  213. "The server picked a compression algorithm ('gzip') the client does not know about."
  214. ),
  215. Metadata(headers: serverInitialMetadata)
  216. )
  217. )
  218. }
  219. func testOverMaximumPayloadSize() throws {
  220. let handler = GRPCClientStreamHandler(
  221. methodDescriptor: .init(service: "test", method: "test"),
  222. scheme: .http,
  223. outboundEncoding: .none,
  224. acceptedEncodings: [],
  225. maximumPayloadSize: 1,
  226. skipStateMachineAssertions: true
  227. )
  228. let channel = EmbeddedChannel(handler: handler)
  229. // Send client's initial metadata
  230. XCTAssertNoThrow(
  231. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  232. )
  233. // Make sure we have sent right metadata.
  234. let writtenMetadata = try channel.assertReadHeadersOutbound()
  235. XCTAssertEqual(
  236. writtenMetadata.headers,
  237. [
  238. GRPCHTTP2Keys.method.rawValue: "POST",
  239. GRPCHTTP2Keys.scheme.rawValue: "http",
  240. GRPCHTTP2Keys.path.rawValue: "/test/test",
  241. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  242. GRPCHTTP2Keys.te.rawValue: "trailers",
  243. ]
  244. )
  245. // Server sends initial metadata
  246. let serverInitialMetadata: HPACKHeaders = [
  247. GRPCHTTP2Keys.status.rawValue: "200",
  248. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  249. ]
  250. XCTAssertNoThrow(
  251. try channel.writeInbound(
  252. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  253. )
  254. )
  255. XCTAssertEqual(
  256. try channel.readInbound(as: RPCResponsePart.self),
  257. .metadata(Metadata(headers: serverInitialMetadata))
  258. )
  259. // Server sends message over payload limit
  260. var buffer = ByteBuffer()
  261. buffer.writeInteger(UInt8(0)) // not compressed
  262. buffer.writeInteger(UInt32(42)) // message length
  263. buffer.writeRepeatingByte(0, count: 42) // message
  264. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  265. data: .byteBuffer(buffer),
  266. endStream: false
  267. )
  268. // Invalid payload should result in error status and stream being closed
  269. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  270. let part = try channel.readInbound(as: RPCResponsePart.self)
  271. XCTAssertEqual(
  272. part,
  273. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  274. )
  275. channel.embeddedEventLoop.run()
  276. try channel.closeFuture.wait()
  277. }
  278. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  279. let handler = GRPCClientStreamHandler(
  280. methodDescriptor: .init(service: "test", method: "test"),
  281. scheme: .http,
  282. outboundEncoding: .none,
  283. acceptedEncodings: [],
  284. maximumPayloadSize: 100,
  285. skipStateMachineAssertions: true
  286. )
  287. let channel = EmbeddedChannel(handler: handler)
  288. // Send client's initial metadata
  289. XCTAssertNoThrow(
  290. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  291. )
  292. // Make sure we have sent right metadata.
  293. let writtenMetadata = try channel.assertReadHeadersOutbound()
  294. XCTAssertEqual(
  295. writtenMetadata.headers,
  296. [
  297. GRPCHTTP2Keys.method.rawValue: "POST",
  298. GRPCHTTP2Keys.scheme.rawValue: "http",
  299. GRPCHTTP2Keys.path.rawValue: "/test/test",
  300. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  301. GRPCHTTP2Keys.te.rawValue: "trailers",
  302. ]
  303. )
  304. // Server sends initial metadata
  305. let serverInitialMetadata: HPACKHeaders = [
  306. GRPCHTTP2Keys.status.rawValue: "200",
  307. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  308. ]
  309. XCTAssertNoThrow(
  310. try channel.writeInbound(
  311. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  312. )
  313. )
  314. XCTAssertEqual(
  315. try channel.readInbound(as: RPCResponsePart.self),
  316. .metadata(Metadata(headers: serverInitialMetadata))
  317. )
  318. // Server sends message with EOS set.
  319. var buffer = ByteBuffer()
  320. buffer.writeInteger(UInt8(0)) // not compressed
  321. buffer.writeInteger(UInt32(42)) // message length
  322. buffer.writeRepeatingByte(0, count: 42) // message
  323. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  324. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  325. // Make sure we got status + trailers with the right error.
  326. XCTAssertEqual(
  327. try channel.readInbound(as: RPCResponsePart.self),
  328. .status(
  329. Status(
  330. code: .internalError,
  331. message:
  332. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  333. ),
  334. [:]
  335. )
  336. )
  337. }
  338. func testServerEndsStream() throws {
  339. let handler = GRPCClientStreamHandler(
  340. methodDescriptor: .init(service: "test", method: "test"),
  341. scheme: .http,
  342. outboundEncoding: .none,
  343. acceptedEncodings: [],
  344. maximumPayloadSize: 1,
  345. skipStateMachineAssertions: true
  346. )
  347. let channel = EmbeddedChannel(handler: handler)
  348. // Write client's initial metadata
  349. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  350. let clientInitialMetadata: HPACKHeaders = [
  351. GRPCHTTP2Keys.path.rawValue: "/test/test",
  352. GRPCHTTP2Keys.scheme.rawValue: "http",
  353. GRPCHTTP2Keys.method.rawValue: "POST",
  354. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  355. GRPCHTTP2Keys.te.rawValue: "trailers",
  356. ]
  357. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  358. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  359. // Receive server's initial metadata with end stream set
  360. let serverInitialMetadata: HPACKHeaders = [
  361. GRPCHTTP2Keys.status.rawValue: "200",
  362. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  363. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  364. ]
  365. XCTAssertNoThrow(
  366. try channel.writeInbound(
  367. HTTP2Frame.FramePayload.headers(
  368. .init(
  369. headers: serverInitialMetadata,
  370. endStream: true
  371. )
  372. )
  373. )
  374. )
  375. XCTAssertEqual(
  376. try channel.readInbound(as: RPCResponsePart.self),
  377. .status(
  378. .init(code: .ok, message: ""),
  379. [
  380. GRPCHTTP2Keys.status.rawValue: "200",
  381. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  382. ]
  383. )
  384. )
  385. // We should throw if the server sends another message, since it's closed the stream already.
  386. var buffer = ByteBuffer()
  387. buffer.writeInteger(UInt8(0)) // not compressed
  388. buffer.writeInteger(UInt32(42)) // message length
  389. buffer.writeRepeatingByte(0, count: 42) // message
  390. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  391. XCTAssertThrowsError(
  392. ofType: RPCError.self,
  393. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  394. ) { error in
  395. XCTAssertEqual(error.code, .internalError)
  396. XCTAssertEqual(error.message, "Invalid state")
  397. }
  398. }
  399. func testNormalFlow() throws {
  400. let handler = GRPCClientStreamHandler(
  401. methodDescriptor: .init(service: "test", method: "test"),
  402. scheme: .http,
  403. outboundEncoding: .none,
  404. acceptedEncodings: [],
  405. maximumPayloadSize: 100,
  406. skipStateMachineAssertions: true
  407. )
  408. let channel = EmbeddedChannel(handler: handler)
  409. // Send client's initial metadata
  410. let request = RPCRequestPart.metadata([:])
  411. XCTAssertNoThrow(try channel.writeOutbound(request))
  412. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  413. let writtenHeaders = try channel.assertReadHeadersOutbound()
  414. XCTAssertEqual(
  415. writtenHeaders.headers,
  416. [
  417. GRPCHTTP2Keys.method.rawValue: "POST",
  418. GRPCHTTP2Keys.scheme.rawValue: "http",
  419. GRPCHTTP2Keys.path.rawValue: "/test/test",
  420. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  421. GRPCHTTP2Keys.te.rawValue: "trailers",
  422. ]
  423. )
  424. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  425. // Receive server's initial metadata
  426. let serverInitialMetadata: HPACKHeaders = [
  427. GRPCHTTP2Keys.status.rawValue: "200",
  428. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  429. "some-custom-header": "some-custom-value",
  430. ]
  431. XCTAssertNoThrow(
  432. try channel.writeInbound(
  433. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  434. )
  435. )
  436. XCTAssertEqual(
  437. try channel.readInbound(as: RPCResponsePart.self),
  438. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  439. )
  440. // Send a message
  441. XCTAssertNoThrow(
  442. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  443. )
  444. // Assert we wrote it successfully into the channel
  445. let writtenMessage = try channel.assertReadDataOutbound()
  446. var expectedBuffer = ByteBuffer()
  447. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  448. expectedBuffer.writeInteger(UInt32(42)) // message length
  449. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  450. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  451. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  452. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  453. // Flush to make sure the EOS is written.
  454. channel.flush()
  455. // Make sure the EOS frame was sent
  456. let emptyEOSFrame = try channel.assertReadDataOutbound()
  457. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  458. XCTAssertTrue(emptyEOSFrame.endStream)
  459. // Make sure we cannot write anymore because client's closed.
  460. XCTAssertThrowsError(
  461. ofType: RPCError.self,
  462. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  463. ) { error in
  464. XCTAssertEqual(error.code, .internalError)
  465. XCTAssertEqual(error.message, "Invalid state")
  466. }
  467. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  468. // it will be thrown when writing inbound.
  469. try? channel.throwIfErrorCaught()
  470. // Server sends back response message
  471. var buffer = ByteBuffer()
  472. buffer.writeInteger(UInt8(0)) // not compressed
  473. buffer.writeInteger(UInt32(42)) // message length
  474. buffer.writeRepeatingByte(0, count: 42) // message
  475. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  476. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  477. // Make sure we read the message properly
  478. XCTAssertEqual(
  479. try channel.readInbound(as: RPCResponsePart.self),
  480. RPCResponsePart.message([UInt8](repeating: 0, count: 42))
  481. )
  482. // Server sends status to end RPC
  483. XCTAssertNoThrow(
  484. try channel.writeInbound(
  485. HTTP2Frame.FramePayload.headers(
  486. .init(headers: [
  487. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  488. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  489. "custom-header": "custom-value",
  490. ])
  491. )
  492. )
  493. )
  494. XCTAssertEqual(
  495. try channel.readInbound(as: RPCResponsePart.self),
  496. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  497. )
  498. }
  499. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  500. let handler = GRPCClientStreamHandler(
  501. methodDescriptor: .init(service: "test", method: "test"),
  502. scheme: .http,
  503. outboundEncoding: .none,
  504. acceptedEncodings: [],
  505. maximumPayloadSize: 100,
  506. skipStateMachineAssertions: true
  507. )
  508. let channel = EmbeddedChannel(handler: handler)
  509. // Send client's initial metadata
  510. let request = RPCRequestPart.metadata([:])
  511. XCTAssertNoThrow(try channel.writeOutbound(request))
  512. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  513. let writtenHeaders = try channel.assertReadHeadersOutbound()
  514. XCTAssertEqual(
  515. writtenHeaders.headers,
  516. [
  517. GRPCHTTP2Keys.method.rawValue: "POST",
  518. GRPCHTTP2Keys.scheme.rawValue: "http",
  519. GRPCHTTP2Keys.path.rawValue: "/test/test",
  520. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  521. GRPCHTTP2Keys.te.rawValue: "trailers",
  522. ]
  523. )
  524. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  525. // Receive server's initial metadata
  526. let serverInitialMetadata: HPACKHeaders = [
  527. GRPCHTTP2Keys.status.rawValue: "200",
  528. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  529. "some-custom-header": "some-custom-value",
  530. ]
  531. XCTAssertNoThrow(
  532. try channel.writeInbound(
  533. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  534. )
  535. )
  536. XCTAssertEqual(
  537. try channel.readInbound(as: RPCResponsePart.self),
  538. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  539. )
  540. // Send a message
  541. XCTAssertNoThrow(
  542. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  543. )
  544. // Assert we wrote it successfully into the channel
  545. let writtenMessage = try channel.assertReadDataOutbound()
  546. var expectedBuffer = ByteBuffer()
  547. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  548. expectedBuffer.writeInteger(UInt32(42)) // message length
  549. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  550. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  551. // Receive server's first message
  552. var buffer = ByteBuffer()
  553. buffer.writeInteger(UInt8(0)) // not compressed
  554. XCTAssertNoThrow(
  555. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  556. )
  557. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  558. buffer.clear()
  559. buffer.writeInteger(UInt32(30)) // message length
  560. XCTAssertNoThrow(
  561. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  562. )
  563. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  564. buffer.clear()
  565. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  566. XCTAssertNoThrow(
  567. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  568. )
  569. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  570. buffer.clear()
  571. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  572. XCTAssertNoThrow(
  573. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  574. )
  575. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  576. buffer.clear()
  577. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  578. XCTAssertNoThrow(
  579. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  580. )
  581. // Make sure we read the message properly
  582. XCTAssertEqual(
  583. try channel.readInbound(as: RPCResponsePart.self),
  584. RPCResponsePart.message(
  585. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  586. + [UInt8](repeating: 2, count: 10)
  587. )
  588. )
  589. }
  590. func testSendMultipleMessagesInSingleBuffer() throws {
  591. let handler = GRPCClientStreamHandler(
  592. methodDescriptor: .init(service: "test", method: "test"),
  593. scheme: .http,
  594. outboundEncoding: .none,
  595. acceptedEncodings: [],
  596. maximumPayloadSize: 100,
  597. skipStateMachineAssertions: true
  598. )
  599. let channel = EmbeddedChannel(handler: handler)
  600. // Send client's initial metadata
  601. let request = RPCRequestPart.metadata([:])
  602. XCTAssertNoThrow(try channel.writeOutbound(request))
  603. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  604. let writtenHeaders = try channel.assertReadHeadersOutbound()
  605. XCTAssertEqual(
  606. writtenHeaders.headers,
  607. [
  608. GRPCHTTP2Keys.method.rawValue: "POST",
  609. GRPCHTTP2Keys.scheme.rawValue: "http",
  610. GRPCHTTP2Keys.path.rawValue: "/test/test",
  611. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  612. GRPCHTTP2Keys.te.rawValue: "trailers",
  613. ]
  614. )
  615. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  616. // Receive server's initial metadata
  617. let serverInitialMetadata: HPACKHeaders = [
  618. GRPCHTTP2Keys.status.rawValue: "200",
  619. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  620. "some-custom-header": "some-custom-value",
  621. ]
  622. XCTAssertNoThrow(
  623. try channel.writeInbound(
  624. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  625. )
  626. )
  627. XCTAssertEqual(
  628. try channel.readInbound(as: RPCResponsePart.self),
  629. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  630. )
  631. // This is where this test actually begins. We want to write two messages
  632. // without flushing, and make sure that no messages are sent down the pipeline
  633. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  634. // Write back first message and make sure nothing's written in the channel.
  635. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4))))
  636. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  637. // Write back second message and make sure nothing's written in the channel.
  638. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4))))
  639. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  640. // Now flush and check we *do* write the data.
  641. channel.flush()
  642. let writtenMessage = try channel.assertReadDataOutbound()
  643. // Make sure both messages have been framed together in the ByteBuffer.
  644. XCTAssertEqual(
  645. writtenMessage.data,
  646. .byteBuffer(
  647. .init(bytes: [
  648. // First message
  649. 0, // Compression disabled
  650. 0, 0, 0, 4, // Message length
  651. 1, 1, 1, 1, // First message data
  652. // Second message
  653. 0, // Compression disabled
  654. 0, 0, 0, 4, // Message length
  655. 2, 2, 2, 2, // Second message data
  656. ])
  657. )
  658. )
  659. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  660. }
  661. func testUnexpectedStreamClose_ErrorFired() throws {
  662. let handler = GRPCClientStreamHandler(
  663. methodDescriptor: .init(service: "test", method: "test"),
  664. scheme: .http,
  665. outboundEncoding: .none,
  666. acceptedEncodings: [],
  667. maximumPayloadSize: 1,
  668. skipStateMachineAssertions: true
  669. )
  670. let channel = EmbeddedChannel(handler: handler)
  671. // Write client's initial metadata
  672. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  673. let clientInitialMetadata: HPACKHeaders = [
  674. GRPCHTTP2Keys.path.rawValue: "/test/test",
  675. GRPCHTTP2Keys.scheme.rawValue: "http",
  676. GRPCHTTP2Keys.method.rawValue: "POST",
  677. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  678. GRPCHTTP2Keys.te.rawValue: "trailers",
  679. ]
  680. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  681. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  682. // An error is fired down the pipeline
  683. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  684. channel.pipeline.fireErrorCaught(thrownError)
  685. // The client receives a status explaining the stream was closed because of the thrown error.
  686. XCTAssertEqual(
  687. try channel.readInbound(as: RPCResponsePart.self),
  688. .status(
  689. .init(
  690. code: .unavailable,
  691. message: "Stream unexpectedly closed with error."
  692. ),
  693. [:]
  694. )
  695. )
  696. // We should now be closed: check we can't write anymore.
  697. XCTAssertThrowsError(
  698. ofType: RPCError.self,
  699. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  700. ) { error in
  701. XCTAssertEqual(error.code, .internalError)
  702. XCTAssertEqual(error.message, "Invalid state")
  703. }
  704. }
  705. func testUnexpectedStreamClose_ChannelInactive() throws {
  706. let handler = GRPCClientStreamHandler(
  707. methodDescriptor: .init(service: "test", method: "test"),
  708. scheme: .http,
  709. outboundEncoding: .none,
  710. acceptedEncodings: [],
  711. maximumPayloadSize: 1,
  712. skipStateMachineAssertions: true
  713. )
  714. let channel = EmbeddedChannel(handler: handler)
  715. // Write client's initial metadata
  716. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  717. let clientInitialMetadata: HPACKHeaders = [
  718. GRPCHTTP2Keys.path.rawValue: "/test/test",
  719. GRPCHTTP2Keys.scheme.rawValue: "http",
  720. GRPCHTTP2Keys.method.rawValue: "POST",
  721. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  722. GRPCHTTP2Keys.te.rawValue: "trailers",
  723. ]
  724. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  725. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  726. // Channel becomes inactive
  727. channel.pipeline.fireChannelInactive()
  728. // The client receives a status explaining the stream was closed.
  729. XCTAssertEqual(
  730. try channel.readInbound(as: RPCResponsePart.self),
  731. .status(
  732. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  733. [:]
  734. )
  735. )
  736. // We should now be closed: check we can't write anymore.
  737. XCTAssertThrowsError(
  738. ofType: RPCError.self,
  739. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  740. ) { error in
  741. XCTAssertEqual(error.code, .internalError)
  742. XCTAssertEqual(error.message, "Invalid state")
  743. }
  744. }
  745. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  746. let handler = GRPCClientStreamHandler(
  747. methodDescriptor: .init(service: "test", method: "test"),
  748. scheme: .http,
  749. outboundEncoding: .none,
  750. acceptedEncodings: [],
  751. maximumPayloadSize: 1,
  752. skipStateMachineAssertions: true
  753. )
  754. let channel = EmbeddedChannel(handler: handler)
  755. // Write client's initial metadata
  756. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  757. let clientInitialMetadata: HPACKHeaders = [
  758. GRPCHTTP2Keys.path.rawValue: "/test/test",
  759. GRPCHTTP2Keys.scheme.rawValue: "http",
  760. GRPCHTTP2Keys.method.rawValue: "POST",
  761. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  762. GRPCHTTP2Keys.te.rawValue: "trailers",
  763. ]
  764. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  765. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  766. // Receive RST_STREAM
  767. XCTAssertNoThrow(
  768. try channel.writeInbound(
  769. HTTP2Frame.FramePayload.rstStream(.internalError)
  770. )
  771. )
  772. // The client receives a status explaining RST_STREAM was sent.
  773. XCTAssertEqual(
  774. try channel.readInbound(as: RPCResponsePart.self),
  775. .status(
  776. .init(
  777. code: .unavailable,
  778. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  779. ),
  780. [:]
  781. )
  782. )
  783. // We should now be closed: check we can't write anymore.
  784. XCTAssertThrowsError(
  785. ofType: RPCError.self,
  786. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  787. ) { error in
  788. XCTAssertEqual(error.code, .internalError)
  789. XCTAssertEqual(error.message, "Invalid state")
  790. }
  791. }
  792. }
  793. extension EmbeddedChannel {
  794. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  795. guard
  796. case .headers(let writtenHeaders) = try XCTUnwrap(
  797. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  798. )
  799. else {
  800. throw TestError.assertionFailure("Expected to write headers")
  801. }
  802. return writtenHeaders
  803. }
  804. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  805. guard
  806. case .data(let writtenMessage) = try XCTUnwrap(
  807. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  808. )
  809. else {
  810. throw TestError.assertionFailure("Expected to write data")
  811. }
  812. return writtenMessage
  813. }
  814. }
  815. private enum TestError: Error {
  816. case assertionFailure(String)
  817. }