GRPCClientStreamHandlerTests.swift 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCClientStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCClientStreamHandler(
  27. methodDescriptor: .init(service: "test", method: "test"),
  28. scheme: .http,
  29. outboundEncoding: .none,
  30. acceptedEncodings: [],
  31. maxPayloadSize: 1
  32. )
  33. let channel = EmbeddedChannel(handler: handler)
  34. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  35. .ping(.init(), ack: false),
  36. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  37. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  38. // .priority(
  39. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  40. // ),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  53. let handler = GRPCClientStreamHandler(
  54. methodDescriptor: .init(service: "test", method: "test"),
  55. scheme: .http,
  56. outboundEncoding: .none,
  57. acceptedEncodings: [],
  58. maxPayloadSize: 1,
  59. skipStateMachineAssertions: true
  60. )
  61. let channel = EmbeddedChannel(handler: handler)
  62. // Send client's initial metadata
  63. let request = RPCRequestPart.metadata([:])
  64. XCTAssertNoThrow(try channel.writeOutbound(request))
  65. // Receive server's initial metadata without :status
  66. let serverInitialMetadata: HPACKHeaders = [
  67. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  68. ]
  69. XCTAssertNoThrow(
  70. try channel.writeInbound(
  71. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  72. )
  73. )
  74. XCTAssertEqual(
  75. try channel.readInbound(as: RPCResponsePart.self),
  76. .status(
  77. .init(code: .unknown, message: "HTTP Status Code is missing."),
  78. Metadata(headers: serverInitialMetadata)
  79. )
  80. )
  81. }
  82. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  83. let handler = GRPCClientStreamHandler(
  84. methodDescriptor: .init(service: "test", method: "test"),
  85. scheme: .http,
  86. outboundEncoding: .none,
  87. acceptedEncodings: [],
  88. maxPayloadSize: 1,
  89. skipStateMachineAssertions: true
  90. )
  91. let channel = EmbeddedChannel(handler: handler)
  92. // Send client's initial metadata
  93. let request = RPCRequestPart.metadata([:])
  94. XCTAssertNoThrow(try channel.writeOutbound(request))
  95. // Receive server's initial metadata with 1xx status
  96. let serverInitialMetadata: HPACKHeaders = [
  97. GRPCHTTP2Keys.status.rawValue: "104",
  98. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  99. ]
  100. XCTAssertNoThrow(
  101. try channel.writeInbound(
  102. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  103. )
  104. )
  105. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  106. }
  107. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  108. let handler = GRPCClientStreamHandler(
  109. methodDescriptor: .init(service: "test", method: "test"),
  110. scheme: .http,
  111. outboundEncoding: .none,
  112. acceptedEncodings: [],
  113. maxPayloadSize: 1,
  114. skipStateMachineAssertions: true
  115. )
  116. let channel = EmbeddedChannel(handler: handler)
  117. // Send client's initial metadata
  118. let request = RPCRequestPart.metadata([:])
  119. XCTAssertNoThrow(try channel.writeOutbound(request))
  120. // Receive server's initial metadata with non-200 and non-1xx :status
  121. let serverInitialMetadata: HPACKHeaders = [
  122. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  123. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  124. ]
  125. XCTAssertNoThrow(
  126. try channel.writeInbound(
  127. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  128. )
  129. )
  130. XCTAssertEqual(
  131. try channel.readInbound(as: RPCResponsePart.self),
  132. .status(
  133. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  134. Metadata(headers: serverInitialMetadata)
  135. )
  136. )
  137. }
  138. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  139. let handler = GRPCClientStreamHandler(
  140. methodDescriptor: .init(service: "test", method: "test"),
  141. scheme: .http,
  142. outboundEncoding: .none,
  143. acceptedEncodings: [],
  144. maxPayloadSize: 1,
  145. skipStateMachineAssertions: true
  146. )
  147. let channel = EmbeddedChannel(handler: handler)
  148. // Send client's initial metadata
  149. let request = RPCRequestPart.metadata([:])
  150. XCTAssertNoThrow(try channel.writeOutbound(request))
  151. // Receive server's initial metadata without content-type
  152. let serverInitialMetadata: HPACKHeaders = [
  153. GRPCHTTP2Keys.status.rawValue: "200"
  154. ]
  155. XCTAssertNoThrow(
  156. try channel.writeInbound(
  157. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  158. )
  159. )
  160. XCTAssertEqual(
  161. try channel.readInbound(as: RPCResponsePart.self),
  162. .status(
  163. .init(code: .internalError, message: "Missing content-type header"),
  164. Metadata(headers: serverInitialMetadata)
  165. )
  166. )
  167. }
  168. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  169. let handler = GRPCClientStreamHandler(
  170. methodDescriptor: .init(service: "test", method: "test"),
  171. scheme: .http,
  172. outboundEncoding: .deflate,
  173. acceptedEncodings: [.deflate],
  174. maxPayloadSize: 1
  175. )
  176. let channel = EmbeddedChannel(handler: handler)
  177. // Send client's initial metadata
  178. XCTAssertNoThrow(
  179. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  180. )
  181. // Make sure we have sent right metadata.
  182. let writtenMetadata = try channel.assertReadHeadersOutbound()
  183. XCTAssertEqual(
  184. writtenMetadata.headers,
  185. [
  186. GRPCHTTP2Keys.method.rawValue: "POST",
  187. GRPCHTTP2Keys.scheme.rawValue: "http",
  188. GRPCHTTP2Keys.path.rawValue: "/test/test",
  189. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  190. GRPCHTTP2Keys.te.rawValue: "trailers",
  191. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  192. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  193. ]
  194. )
  195. // Server sends initial metadata with unsupported encoding
  196. let serverInitialMetadata: HPACKHeaders = [
  197. GRPCHTTP2Keys.status.rawValue: "200",
  198. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  199. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  200. ]
  201. XCTAssertNoThrow(
  202. try channel.writeInbound(
  203. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  204. )
  205. )
  206. XCTAssertEqual(
  207. try channel.readInbound(as: RPCResponsePart.self),
  208. .status(
  209. .init(
  210. code: .internalError,
  211. message:
  212. "The server picked a compression algorithm ('gzip') the client does not know about."
  213. ),
  214. Metadata(headers: serverInitialMetadata)
  215. )
  216. )
  217. }
  218. func testOverMaximumPayloadSize() throws {
  219. let handler = GRPCClientStreamHandler(
  220. methodDescriptor: .init(service: "test", method: "test"),
  221. scheme: .http,
  222. outboundEncoding: .none,
  223. acceptedEncodings: [],
  224. maxPayloadSize: 1,
  225. skipStateMachineAssertions: true
  226. )
  227. let channel = EmbeddedChannel(handler: handler)
  228. // Send client's initial metadata
  229. XCTAssertNoThrow(
  230. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  231. )
  232. // Make sure we have sent right metadata.
  233. let writtenMetadata = try channel.assertReadHeadersOutbound()
  234. XCTAssertEqual(
  235. writtenMetadata.headers,
  236. [
  237. GRPCHTTP2Keys.method.rawValue: "POST",
  238. GRPCHTTP2Keys.scheme.rawValue: "http",
  239. GRPCHTTP2Keys.path.rawValue: "/test/test",
  240. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  241. GRPCHTTP2Keys.te.rawValue: "trailers",
  242. ]
  243. )
  244. // Server sends initial metadata
  245. let serverInitialMetadata: HPACKHeaders = [
  246. GRPCHTTP2Keys.status.rawValue: "200",
  247. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  248. ]
  249. XCTAssertNoThrow(
  250. try channel.writeInbound(
  251. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  252. )
  253. )
  254. XCTAssertEqual(
  255. try channel.readInbound(as: RPCResponsePart.self),
  256. .metadata(Metadata(headers: serverInitialMetadata))
  257. )
  258. // Server sends message over payload limit
  259. var buffer = ByteBuffer()
  260. buffer.writeInteger(UInt8(0)) // not compressed
  261. buffer.writeInteger(UInt32(42)) // message length
  262. buffer.writeRepeatingByte(0, count: 42) // message
  263. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  264. data: .byteBuffer(buffer),
  265. endStream: false
  266. )
  267. // Invalid payload should result in error status and stream being closed
  268. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  269. let part = try channel.readInbound(as: RPCResponsePart.self)
  270. XCTAssertEqual(
  271. part,
  272. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  273. )
  274. channel.embeddedEventLoop.run()
  275. try channel.closeFuture.wait()
  276. }
  277. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  278. let handler = GRPCClientStreamHandler(
  279. methodDescriptor: .init(service: "test", method: "test"),
  280. scheme: .http,
  281. outboundEncoding: .none,
  282. acceptedEncodings: [],
  283. maxPayloadSize: 100,
  284. skipStateMachineAssertions: true
  285. )
  286. let channel = EmbeddedChannel(handler: handler)
  287. // Send client's initial metadata
  288. XCTAssertNoThrow(
  289. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  290. )
  291. // Make sure we have sent right metadata.
  292. let writtenMetadata = try channel.assertReadHeadersOutbound()
  293. XCTAssertEqual(
  294. writtenMetadata.headers,
  295. [
  296. GRPCHTTP2Keys.method.rawValue: "POST",
  297. GRPCHTTP2Keys.scheme.rawValue: "http",
  298. GRPCHTTP2Keys.path.rawValue: "/test/test",
  299. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  300. GRPCHTTP2Keys.te.rawValue: "trailers",
  301. ]
  302. )
  303. // Server sends initial metadata
  304. let serverInitialMetadata: HPACKHeaders = [
  305. GRPCHTTP2Keys.status.rawValue: "200",
  306. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  307. ]
  308. XCTAssertNoThrow(
  309. try channel.writeInbound(
  310. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  311. )
  312. )
  313. XCTAssertEqual(
  314. try channel.readInbound(as: RPCResponsePart.self),
  315. .metadata(Metadata(headers: serverInitialMetadata))
  316. )
  317. // Server sends message with EOS set.
  318. var buffer = ByteBuffer()
  319. buffer.writeInteger(UInt8(0)) // not compressed
  320. buffer.writeInteger(UInt32(42)) // message length
  321. buffer.writeRepeatingByte(0, count: 42) // message
  322. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  323. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  324. // Make sure we got status + trailers with the right error.
  325. XCTAssertEqual(
  326. try channel.readInbound(as: RPCResponsePart.self),
  327. .status(
  328. Status(
  329. code: .internalError,
  330. message:
  331. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  332. ),
  333. [:]
  334. )
  335. )
  336. }
  337. func testServerEndsStream() throws {
  338. let handler = GRPCClientStreamHandler(
  339. methodDescriptor: .init(service: "test", method: "test"),
  340. scheme: .http,
  341. outboundEncoding: .none,
  342. acceptedEncodings: [],
  343. maxPayloadSize: 1,
  344. skipStateMachineAssertions: true
  345. )
  346. let channel = EmbeddedChannel(handler: handler)
  347. // Write client's initial metadata
  348. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  349. let clientInitialMetadata: HPACKHeaders = [
  350. GRPCHTTP2Keys.path.rawValue: "/test/test",
  351. GRPCHTTP2Keys.scheme.rawValue: "http",
  352. GRPCHTTP2Keys.method.rawValue: "POST",
  353. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  354. GRPCHTTP2Keys.te.rawValue: "trailers",
  355. ]
  356. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  357. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  358. // Receive server's initial metadata with end stream set
  359. let serverInitialMetadata: HPACKHeaders = [
  360. GRPCHTTP2Keys.status.rawValue: "200",
  361. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  362. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  363. ]
  364. XCTAssertNoThrow(
  365. try channel.writeInbound(
  366. HTTP2Frame.FramePayload.headers(
  367. .init(
  368. headers: serverInitialMetadata,
  369. endStream: true
  370. )
  371. )
  372. )
  373. )
  374. XCTAssertEqual(
  375. try channel.readInbound(as: RPCResponsePart.self),
  376. .status(
  377. .init(code: .ok, message: ""),
  378. [
  379. GRPCHTTP2Keys.status.rawValue: "200",
  380. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  381. ]
  382. )
  383. )
  384. // We should throw if the server sends another message, since it's closed the stream already.
  385. var buffer = ByteBuffer()
  386. buffer.writeInteger(UInt8(0)) // not compressed
  387. buffer.writeInteger(UInt32(42)) // message length
  388. buffer.writeRepeatingByte(0, count: 42) // message
  389. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  390. XCTAssertThrowsError(
  391. ofType: RPCError.self,
  392. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  393. ) { error in
  394. XCTAssertEqual(error.code, .internalError)
  395. XCTAssertEqual(error.message, "Invalid state")
  396. }
  397. }
  398. func testNormalFlow() throws {
  399. let handler = GRPCClientStreamHandler(
  400. methodDescriptor: .init(service: "test", method: "test"),
  401. scheme: .http,
  402. outboundEncoding: .none,
  403. acceptedEncodings: [],
  404. maxPayloadSize: 100,
  405. skipStateMachineAssertions: true
  406. )
  407. let channel = EmbeddedChannel(handler: handler)
  408. // Send client's initial metadata
  409. let request = RPCRequestPart.metadata([:])
  410. XCTAssertNoThrow(try channel.writeOutbound(request))
  411. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  412. let writtenHeaders = try channel.assertReadHeadersOutbound()
  413. XCTAssertEqual(
  414. writtenHeaders.headers,
  415. [
  416. GRPCHTTP2Keys.method.rawValue: "POST",
  417. GRPCHTTP2Keys.scheme.rawValue: "http",
  418. GRPCHTTP2Keys.path.rawValue: "/test/test",
  419. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  420. GRPCHTTP2Keys.te.rawValue: "trailers",
  421. ]
  422. )
  423. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  424. // Receive server's initial metadata
  425. let serverInitialMetadata: HPACKHeaders = [
  426. GRPCHTTP2Keys.status.rawValue: "200",
  427. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  428. "some-custom-header": "some-custom-value",
  429. ]
  430. XCTAssertNoThrow(
  431. try channel.writeInbound(
  432. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  433. )
  434. )
  435. XCTAssertEqual(
  436. try channel.readInbound(as: RPCResponsePart.self),
  437. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  438. )
  439. // Send a message
  440. XCTAssertNoThrow(
  441. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  442. )
  443. // Assert we wrote it successfully into the channel
  444. let writtenMessage = try channel.assertReadDataOutbound()
  445. var expectedBuffer = ByteBuffer()
  446. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  447. expectedBuffer.writeInteger(UInt32(42)) // message length
  448. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  449. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  450. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  451. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  452. // Flush to make sure the EOS is written.
  453. channel.flush()
  454. // Make sure the EOS frame was sent
  455. let emptyEOSFrame = try channel.assertReadDataOutbound()
  456. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  457. XCTAssertTrue(emptyEOSFrame.endStream)
  458. // Make sure we cannot write anymore because client's closed.
  459. XCTAssertThrowsError(
  460. ofType: RPCError.self,
  461. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  462. ) { error in
  463. XCTAssertEqual(error.code, .internalError)
  464. XCTAssertEqual(error.message, "Invalid state")
  465. }
  466. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  467. // it will be thrown when writing inbound.
  468. try? channel.throwIfErrorCaught()
  469. // Server sends back response message
  470. var buffer = ByteBuffer()
  471. buffer.writeInteger(UInt8(0)) // not compressed
  472. buffer.writeInteger(UInt32(42)) // message length
  473. buffer.writeRepeatingByte(0, count: 42) // message
  474. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  475. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  476. // Make sure we read the message properly
  477. XCTAssertEqual(
  478. try channel.readInbound(as: RPCResponsePart.self),
  479. RPCResponsePart.message([UInt8](repeating: 0, count: 42))
  480. )
  481. // Server sends status to end RPC
  482. XCTAssertNoThrow(
  483. try channel.writeInbound(
  484. HTTP2Frame.FramePayload.headers(
  485. .init(headers: [
  486. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  487. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  488. "custom-header": "custom-value",
  489. ])
  490. )
  491. )
  492. )
  493. XCTAssertEqual(
  494. try channel.readInbound(as: RPCResponsePart.self),
  495. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  496. )
  497. }
  498. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  499. let handler = GRPCClientStreamHandler(
  500. methodDescriptor: .init(service: "test", method: "test"),
  501. scheme: .http,
  502. outboundEncoding: .none,
  503. acceptedEncodings: [],
  504. maxPayloadSize: 100,
  505. skipStateMachineAssertions: true
  506. )
  507. let channel = EmbeddedChannel(handler: handler)
  508. // Send client's initial metadata
  509. let request = RPCRequestPart.metadata([:])
  510. XCTAssertNoThrow(try channel.writeOutbound(request))
  511. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  512. let writtenHeaders = try channel.assertReadHeadersOutbound()
  513. XCTAssertEqual(
  514. writtenHeaders.headers,
  515. [
  516. GRPCHTTP2Keys.method.rawValue: "POST",
  517. GRPCHTTP2Keys.scheme.rawValue: "http",
  518. GRPCHTTP2Keys.path.rawValue: "/test/test",
  519. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  520. GRPCHTTP2Keys.te.rawValue: "trailers",
  521. ]
  522. )
  523. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  524. // Receive server's initial metadata
  525. let serverInitialMetadata: HPACKHeaders = [
  526. GRPCHTTP2Keys.status.rawValue: "200",
  527. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  528. "some-custom-header": "some-custom-value",
  529. ]
  530. XCTAssertNoThrow(
  531. try channel.writeInbound(
  532. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  533. )
  534. )
  535. XCTAssertEqual(
  536. try channel.readInbound(as: RPCResponsePart.self),
  537. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  538. )
  539. // Send a message
  540. XCTAssertNoThrow(
  541. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  542. )
  543. // Assert we wrote it successfully into the channel
  544. let writtenMessage = try channel.assertReadDataOutbound()
  545. var expectedBuffer = ByteBuffer()
  546. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  547. expectedBuffer.writeInteger(UInt32(42)) // message length
  548. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  549. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  550. // Receive server's first message
  551. var buffer = ByteBuffer()
  552. buffer.writeInteger(UInt8(0)) // not compressed
  553. XCTAssertNoThrow(
  554. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  555. )
  556. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  557. buffer.clear()
  558. buffer.writeInteger(UInt32(30)) // message length
  559. XCTAssertNoThrow(
  560. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  561. )
  562. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  563. buffer.clear()
  564. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  565. XCTAssertNoThrow(
  566. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  567. )
  568. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  569. buffer.clear()
  570. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  571. XCTAssertNoThrow(
  572. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  573. )
  574. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  575. buffer.clear()
  576. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  577. XCTAssertNoThrow(
  578. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  579. )
  580. // Make sure we read the message properly
  581. XCTAssertEqual(
  582. try channel.readInbound(as: RPCResponsePart.self),
  583. RPCResponsePart.message(
  584. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  585. + [UInt8](repeating: 2, count: 10)
  586. )
  587. )
  588. }
  589. func testSendMultipleMessagesInSingleBuffer() throws {
  590. let handler = GRPCClientStreamHandler(
  591. methodDescriptor: .init(service: "test", method: "test"),
  592. scheme: .http,
  593. outboundEncoding: .none,
  594. acceptedEncodings: [],
  595. maxPayloadSize: 100,
  596. skipStateMachineAssertions: true
  597. )
  598. let channel = EmbeddedChannel(handler: handler)
  599. // Send client's initial metadata
  600. let request = RPCRequestPart.metadata([:])
  601. XCTAssertNoThrow(try channel.writeOutbound(request))
  602. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  603. let writtenHeaders = try channel.assertReadHeadersOutbound()
  604. XCTAssertEqual(
  605. writtenHeaders.headers,
  606. [
  607. GRPCHTTP2Keys.method.rawValue: "POST",
  608. GRPCHTTP2Keys.scheme.rawValue: "http",
  609. GRPCHTTP2Keys.path.rawValue: "/test/test",
  610. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  611. GRPCHTTP2Keys.te.rawValue: "trailers",
  612. ]
  613. )
  614. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  615. // Receive server's initial metadata
  616. let serverInitialMetadata: HPACKHeaders = [
  617. GRPCHTTP2Keys.status.rawValue: "200",
  618. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  619. "some-custom-header": "some-custom-value",
  620. ]
  621. XCTAssertNoThrow(
  622. try channel.writeInbound(
  623. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  624. )
  625. )
  626. XCTAssertEqual(
  627. try channel.readInbound(as: RPCResponsePart.self),
  628. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  629. )
  630. // This is where this test actually begins. We want to write two messages
  631. // without flushing, and make sure that no messages are sent down the pipeline
  632. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  633. // Write back first message and make sure nothing's written in the channel.
  634. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4))))
  635. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  636. // Write back second message and make sure nothing's written in the channel.
  637. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4))))
  638. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  639. // Now flush and check we *do* write the data.
  640. channel.flush()
  641. let writtenMessage = try channel.assertReadDataOutbound()
  642. // Make sure both messages have been framed together in the ByteBuffer.
  643. XCTAssertEqual(
  644. writtenMessage.data,
  645. .byteBuffer(
  646. .init(bytes: [
  647. // First message
  648. 0, // Compression disabled
  649. 0, 0, 0, 4, // Message length
  650. 1, 1, 1, 1, // First message data
  651. // Second message
  652. 0, // Compression disabled
  653. 0, 0, 0, 4, // Message length
  654. 2, 2, 2, 2, // Second message data
  655. ])
  656. )
  657. )
  658. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  659. }
  660. func testUnexpectedStreamClose_ErrorFired() throws {
  661. let handler = GRPCClientStreamHandler(
  662. methodDescriptor: .init(service: "test", method: "test"),
  663. scheme: .http,
  664. outboundEncoding: .none,
  665. acceptedEncodings: [],
  666. maxPayloadSize: 1,
  667. skipStateMachineAssertions: true
  668. )
  669. let channel = EmbeddedChannel(handler: handler)
  670. // Write client's initial metadata
  671. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  672. let clientInitialMetadata: HPACKHeaders = [
  673. GRPCHTTP2Keys.path.rawValue: "/test/test",
  674. GRPCHTTP2Keys.scheme.rawValue: "http",
  675. GRPCHTTP2Keys.method.rawValue: "POST",
  676. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  677. GRPCHTTP2Keys.te.rawValue: "trailers",
  678. ]
  679. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  680. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  681. // An error is fired down the pipeline
  682. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  683. channel.pipeline.fireErrorCaught(thrownError)
  684. // The client receives a status explaining the stream was closed because of the thrown error.
  685. XCTAssertEqual(
  686. try channel.readInbound(as: RPCResponsePart.self),
  687. .status(
  688. .init(
  689. code: .unavailable,
  690. message: "Stream unexpectedly closed with error."
  691. ),
  692. [:]
  693. )
  694. )
  695. // We should now be closed: check we can't write anymore.
  696. XCTAssertThrowsError(
  697. ofType: RPCError.self,
  698. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  699. ) { error in
  700. XCTAssertEqual(error.code, .internalError)
  701. XCTAssertEqual(error.message, "Invalid state")
  702. }
  703. }
  704. func testUnexpectedStreamClose_ChannelInactive() throws {
  705. let handler = GRPCClientStreamHandler(
  706. methodDescriptor: .init(service: "test", method: "test"),
  707. scheme: .http,
  708. outboundEncoding: .none,
  709. acceptedEncodings: [],
  710. maxPayloadSize: 1,
  711. skipStateMachineAssertions: true
  712. )
  713. let channel = EmbeddedChannel(handler: handler)
  714. // Write client's initial metadata
  715. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  716. let clientInitialMetadata: HPACKHeaders = [
  717. GRPCHTTP2Keys.path.rawValue: "/test/test",
  718. GRPCHTTP2Keys.scheme.rawValue: "http",
  719. GRPCHTTP2Keys.method.rawValue: "POST",
  720. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  721. GRPCHTTP2Keys.te.rawValue: "trailers",
  722. ]
  723. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  724. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  725. // Channel becomes inactive
  726. channel.pipeline.fireChannelInactive()
  727. // The client receives a status explaining the stream was closed.
  728. XCTAssertEqual(
  729. try channel.readInbound(as: RPCResponsePart.self),
  730. .status(
  731. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  732. [:]
  733. )
  734. )
  735. // We should now be closed: check we can't write anymore.
  736. XCTAssertThrowsError(
  737. ofType: RPCError.self,
  738. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  739. ) { error in
  740. XCTAssertEqual(error.code, .internalError)
  741. XCTAssertEqual(error.message, "Invalid state")
  742. }
  743. }
  744. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  745. let handler = GRPCClientStreamHandler(
  746. methodDescriptor: .init(service: "test", method: "test"),
  747. scheme: .http,
  748. outboundEncoding: .none,
  749. acceptedEncodings: [],
  750. maxPayloadSize: 1,
  751. skipStateMachineAssertions: true
  752. )
  753. let channel = EmbeddedChannel(handler: handler)
  754. // Write client's initial metadata
  755. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  756. let clientInitialMetadata: HPACKHeaders = [
  757. GRPCHTTP2Keys.path.rawValue: "/test/test",
  758. GRPCHTTP2Keys.scheme.rawValue: "http",
  759. GRPCHTTP2Keys.method.rawValue: "POST",
  760. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  761. GRPCHTTP2Keys.te.rawValue: "trailers",
  762. ]
  763. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  764. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  765. // Receive RST_STREAM
  766. XCTAssertNoThrow(
  767. try channel.writeInbound(
  768. HTTP2Frame.FramePayload.rstStream(.internalError)
  769. )
  770. )
  771. // The client receives a status explaining RST_STREAM was sent.
  772. XCTAssertEqual(
  773. try channel.readInbound(as: RPCResponsePart.self),
  774. .status(
  775. .init(
  776. code: .unavailable,
  777. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  778. ),
  779. [:]
  780. )
  781. )
  782. // We should now be closed: check we can't write anymore.
  783. XCTAssertThrowsError(
  784. ofType: RPCError.self,
  785. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  786. ) { error in
  787. XCTAssertEqual(error.code, .internalError)
  788. XCTAssertEqual(error.message, "Invalid state")
  789. }
  790. }
  791. }
  792. extension EmbeddedChannel {
  793. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  794. guard
  795. case .headers(let writtenHeaders) = try XCTUnwrap(
  796. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  797. )
  798. else {
  799. throw TestError.assertionFailure("Expected to write headers")
  800. }
  801. return writtenHeaders
  802. }
  803. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  804. guard
  805. case .data(let writtenMessage) = try XCTUnwrap(
  806. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  807. )
  808. else {
  809. throw TestError.assertionFailure("Expected to write data")
  810. }
  811. return writtenMessage
  812. }
  813. }
  814. private enum TestError: Error {
  815. case assertionFailure(String)
  816. }