GRPCClientStreamHandlerTests.swift 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCClientStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCClientStreamHandler(
  27. methodDescriptor: .testTest,
  28. scheme: .http,
  29. authority: nil,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maxPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. .priority(
  39. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  40. ),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  53. let handler = GRPCClientStreamHandler(
  54. methodDescriptor: .testTest,
  55. scheme: .http,
  56. authority: nil,
  57. outboundEncoding: .none,
  58. acceptedEncodings: [],
  59. maxPayloadSize: 1,
  60. skipStateMachineAssertions: true
  61. )
  62. let channel = EmbeddedChannel(handler: handler)
  63. // Send client's initial metadata
  64. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  65. XCTAssertNoThrow(try channel.writeOutbound(request))
  66. // Receive server's initial metadata without :status
  67. let serverInitialMetadata: HPACKHeaders = [
  68. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  69. ]
  70. XCTAssertNoThrow(
  71. try channel.writeInbound(
  72. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  73. )
  74. )
  75. XCTAssertEqual(
  76. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  77. .status(
  78. .init(code: .unknown, message: "HTTP Status Code is missing."),
  79. Metadata(headers: serverInitialMetadata)
  80. )
  81. )
  82. }
  83. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  84. let handler = GRPCClientStreamHandler(
  85. methodDescriptor: .testTest,
  86. scheme: .http,
  87. authority: nil,
  88. outboundEncoding: .none,
  89. acceptedEncodings: [],
  90. maxPayloadSize: 1,
  91. skipStateMachineAssertions: true
  92. )
  93. let channel = EmbeddedChannel(handler: handler)
  94. // Send client's initial metadata
  95. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  96. XCTAssertNoThrow(try channel.writeOutbound(request))
  97. // Receive server's initial metadata with 1xx status
  98. let serverInitialMetadata: HPACKHeaders = [
  99. GRPCHTTP2Keys.status.rawValue: "104",
  100. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  101. ]
  102. XCTAssertNoThrow(
  103. try channel.writeInbound(
  104. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  105. )
  106. )
  107. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  108. }
  109. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  110. let handler = GRPCClientStreamHandler(
  111. methodDescriptor: .testTest,
  112. scheme: .http,
  113. authority: nil,
  114. outboundEncoding: .none,
  115. acceptedEncodings: [],
  116. maxPayloadSize: 1,
  117. skipStateMachineAssertions: true
  118. )
  119. let channel = EmbeddedChannel(handler: handler)
  120. // Send client's initial metadata
  121. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  122. XCTAssertNoThrow(try channel.writeOutbound(request))
  123. // Receive server's initial metadata with non-200 and non-1xx :status
  124. let serverInitialMetadata: HPACKHeaders = [
  125. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  126. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  127. ]
  128. XCTAssertNoThrow(
  129. try channel.writeInbound(
  130. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  131. )
  132. )
  133. XCTAssertEqual(
  134. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  135. .status(
  136. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  137. Metadata(headers: serverInitialMetadata)
  138. )
  139. )
  140. }
  141. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  142. let handler = GRPCClientStreamHandler(
  143. methodDescriptor: .testTest,
  144. scheme: .http,
  145. authority: nil,
  146. outboundEncoding: .none,
  147. acceptedEncodings: [],
  148. maxPayloadSize: 1,
  149. skipStateMachineAssertions: true
  150. )
  151. let channel = EmbeddedChannel(handler: handler)
  152. // Send client's initial metadata
  153. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  154. XCTAssertNoThrow(try channel.writeOutbound(request))
  155. // Receive server's initial metadata without content-type
  156. let serverInitialMetadata: HPACKHeaders = [
  157. GRPCHTTP2Keys.status.rawValue: "200"
  158. ]
  159. XCTAssertNoThrow(
  160. try channel.writeInbound(
  161. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  162. )
  163. )
  164. XCTAssertEqual(
  165. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  166. .status(
  167. .init(code: .internalError, message: "Missing content-type header"),
  168. Metadata(headers: serverInitialMetadata)
  169. )
  170. )
  171. }
  172. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  173. let handler = GRPCClientStreamHandler(
  174. methodDescriptor: .testTest,
  175. scheme: .http,
  176. authority: nil,
  177. outboundEncoding: .deflate,
  178. acceptedEncodings: [.deflate],
  179. maxPayloadSize: 1
  180. )
  181. let channel = EmbeddedChannel(handler: handler)
  182. // Send client's initial metadata
  183. XCTAssertNoThrow(
  184. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  185. )
  186. // Make sure we have sent right metadata.
  187. let writtenMetadata = try channel.assertReadHeadersOutbound()
  188. XCTAssertEqual(
  189. writtenMetadata.headers,
  190. [
  191. GRPCHTTP2Keys.method.rawValue: "POST",
  192. GRPCHTTP2Keys.scheme.rawValue: "http",
  193. GRPCHTTP2Keys.path.rawValue: "/test/test",
  194. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  195. GRPCHTTP2Keys.te.rawValue: "trailers",
  196. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  197. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  198. ]
  199. )
  200. // Server sends initial metadata with unsupported encoding
  201. let serverInitialMetadata: HPACKHeaders = [
  202. GRPCHTTP2Keys.status.rawValue: "200",
  203. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  204. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  205. ]
  206. XCTAssertNoThrow(
  207. try channel.writeInbound(
  208. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  209. )
  210. )
  211. XCTAssertEqual(
  212. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  213. .status(
  214. .init(
  215. code: .internalError,
  216. message:
  217. "The server picked a compression algorithm ('gzip') the client does not know about."
  218. ),
  219. Metadata(headers: serverInitialMetadata)
  220. )
  221. )
  222. }
  223. func testOverMaximumPayloadSize() throws {
  224. let handler = GRPCClientStreamHandler(
  225. methodDescriptor: .testTest,
  226. scheme: .http,
  227. authority: nil,
  228. outboundEncoding: .none,
  229. acceptedEncodings: [],
  230. maxPayloadSize: 1,
  231. skipStateMachineAssertions: true
  232. )
  233. let channel = EmbeddedChannel(handler: handler)
  234. // Send client's initial metadata
  235. XCTAssertNoThrow(
  236. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  237. )
  238. // Make sure we have sent right metadata.
  239. let writtenMetadata = try channel.assertReadHeadersOutbound()
  240. XCTAssertEqual(
  241. writtenMetadata.headers,
  242. [
  243. GRPCHTTP2Keys.method.rawValue: "POST",
  244. GRPCHTTP2Keys.scheme.rawValue: "http",
  245. GRPCHTTP2Keys.path.rawValue: "/test/test",
  246. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  247. GRPCHTTP2Keys.te.rawValue: "trailers",
  248. ]
  249. )
  250. // Server sends initial metadata
  251. let serverInitialMetadata: HPACKHeaders = [
  252. GRPCHTTP2Keys.status.rawValue: "200",
  253. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  254. ]
  255. XCTAssertNoThrow(
  256. try channel.writeInbound(
  257. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  258. )
  259. )
  260. XCTAssertEqual(
  261. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  262. .metadata(Metadata(headers: serverInitialMetadata))
  263. )
  264. // Server sends message over payload limit
  265. var buffer = ByteBuffer()
  266. buffer.writeInteger(UInt8(0)) // not compressed
  267. buffer.writeInteger(UInt32(42)) // message length
  268. buffer.writeRepeatingByte(0, count: 42) // message
  269. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  270. data: .byteBuffer(buffer),
  271. endStream: false
  272. )
  273. // Invalid payload should result in error status and stream being closed
  274. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  275. let part = try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self)
  276. XCTAssertEqual(
  277. part,
  278. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  279. )
  280. channel.embeddedEventLoop.run()
  281. try channel.closeFuture.wait()
  282. }
  283. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  284. let handler = GRPCClientStreamHandler(
  285. methodDescriptor: .testTest,
  286. scheme: .http,
  287. authority: nil,
  288. outboundEncoding: .none,
  289. acceptedEncodings: [],
  290. maxPayloadSize: 100,
  291. skipStateMachineAssertions: true
  292. )
  293. let channel = EmbeddedChannel(handler: handler)
  294. // Send client's initial metadata
  295. XCTAssertNoThrow(
  296. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  297. )
  298. // Make sure we have sent right metadata.
  299. let writtenMetadata = try channel.assertReadHeadersOutbound()
  300. XCTAssertEqual(
  301. writtenMetadata.headers,
  302. [
  303. GRPCHTTP2Keys.method.rawValue: "POST",
  304. GRPCHTTP2Keys.scheme.rawValue: "http",
  305. GRPCHTTP2Keys.path.rawValue: "/test/test",
  306. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  307. GRPCHTTP2Keys.te.rawValue: "trailers",
  308. ]
  309. )
  310. // Server sends initial metadata
  311. let serverInitialMetadata: HPACKHeaders = [
  312. GRPCHTTP2Keys.status.rawValue: "200",
  313. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  314. ]
  315. XCTAssertNoThrow(
  316. try channel.writeInbound(
  317. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  318. )
  319. )
  320. XCTAssertEqual(
  321. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  322. .metadata(Metadata(headers: serverInitialMetadata))
  323. )
  324. // Server sends message with EOS set.
  325. var buffer = ByteBuffer()
  326. buffer.writeInteger(UInt8(0)) // not compressed
  327. buffer.writeInteger(UInt32(42)) // message length
  328. buffer.writeRepeatingByte(0, count: 42) // message
  329. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  330. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  331. // Make sure we got status + trailers with the right error.
  332. XCTAssertEqual(
  333. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  334. .status(
  335. Status(
  336. code: .internalError,
  337. message:
  338. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  339. ),
  340. [:]
  341. )
  342. )
  343. }
  344. func testServerEndsStream() throws {
  345. let handler = GRPCClientStreamHandler(
  346. methodDescriptor: .testTest,
  347. scheme: .http,
  348. authority: nil,
  349. outboundEncoding: .none,
  350. acceptedEncodings: [],
  351. maxPayloadSize: 1,
  352. skipStateMachineAssertions: true
  353. )
  354. let channel = EmbeddedChannel(handler: handler)
  355. // Write client's initial metadata
  356. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  357. let clientInitialMetadata: HPACKHeaders = [
  358. GRPCHTTP2Keys.path.rawValue: "/test/test",
  359. GRPCHTTP2Keys.scheme.rawValue: "http",
  360. GRPCHTTP2Keys.method.rawValue: "POST",
  361. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  362. GRPCHTTP2Keys.te.rawValue: "trailers",
  363. ]
  364. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  365. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  366. // Receive server's initial metadata with end stream set
  367. let serverInitialMetadata: HPACKHeaders = [
  368. GRPCHTTP2Keys.status.rawValue: "200",
  369. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  370. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  371. ]
  372. XCTAssertNoThrow(
  373. try channel.writeInbound(
  374. HTTP2Frame.FramePayload.headers(
  375. .init(
  376. headers: serverInitialMetadata,
  377. endStream: true
  378. )
  379. )
  380. )
  381. )
  382. XCTAssertEqual(
  383. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  384. .status(
  385. .init(code: .ok, message: ""),
  386. [
  387. GRPCHTTP2Keys.status.rawValue: "200",
  388. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  389. ]
  390. )
  391. )
  392. // We should throw if the server sends another message, since it's closed the stream already.
  393. var buffer = ByteBuffer()
  394. buffer.writeInteger(UInt8(0)) // not compressed
  395. buffer.writeInteger(UInt32(42)) // message length
  396. buffer.writeRepeatingByte(0, count: 42) // message
  397. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  398. XCTAssertThrowsError(
  399. ofType: RPCError.self,
  400. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  401. ) { error in
  402. XCTAssertEqual(error.code, .internalError)
  403. XCTAssertEqual(error.message, "Invalid state")
  404. }
  405. }
  406. func testNormalFlow() throws {
  407. let handler = GRPCClientStreamHandler(
  408. methodDescriptor: .testTest,
  409. scheme: .http,
  410. authority: nil,
  411. outboundEncoding: .none,
  412. acceptedEncodings: [],
  413. maxPayloadSize: 100,
  414. skipStateMachineAssertions: true
  415. )
  416. let channel = EmbeddedChannel(handler: handler)
  417. // Send client's initial metadata
  418. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  419. XCTAssertNoThrow(try channel.writeOutbound(request))
  420. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  421. let writtenHeaders = try channel.assertReadHeadersOutbound()
  422. XCTAssertEqual(
  423. writtenHeaders.headers,
  424. [
  425. GRPCHTTP2Keys.method.rawValue: "POST",
  426. GRPCHTTP2Keys.scheme.rawValue: "http",
  427. GRPCHTTP2Keys.path.rawValue: "/test/test",
  428. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  429. GRPCHTTP2Keys.te.rawValue: "trailers",
  430. ]
  431. )
  432. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  433. // Receive server's initial metadata
  434. let serverInitialMetadata: HPACKHeaders = [
  435. GRPCHTTP2Keys.status.rawValue: "200",
  436. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  437. "some-custom-header": "some-custom-value",
  438. ]
  439. XCTAssertNoThrow(
  440. try channel.writeInbound(
  441. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  442. )
  443. )
  444. XCTAssertEqual(
  445. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  446. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  447. )
  448. // Send a message
  449. XCTAssertNoThrow(
  450. try channel.writeOutbound(
  451. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  452. )
  453. )
  454. // Assert we wrote it successfully into the channel
  455. let writtenMessage = try channel.assertReadDataOutbound()
  456. var expectedBuffer = ByteBuffer()
  457. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  458. expectedBuffer.writeInteger(UInt32(42)) // message length
  459. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  460. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  461. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  462. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  463. // Make sure the EOS frame was sent
  464. let emptyEOSFrame = try channel.assertReadDataOutbound()
  465. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  466. XCTAssertTrue(emptyEOSFrame.endStream)
  467. // Make sure that, if we flush again, we're not writing anything else down
  468. // the stream. We should have closed at this point.
  469. channel.flush()
  470. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  471. // Make sure we cannot write anymore because client's closed.
  472. XCTAssertThrowsError(
  473. ofType: RPCError.self,
  474. try channel.writeOutbound(
  475. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  476. )
  477. ) { error in
  478. XCTAssertEqual(error.code, .internalError)
  479. XCTAssertEqual(error.message, "Invalid state")
  480. }
  481. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  482. // it will be thrown when writing inbound.
  483. try? channel.throwIfErrorCaught()
  484. // Server sends back response message
  485. var buffer = ByteBuffer()
  486. buffer.writeInteger(UInt8(0)) // not compressed
  487. buffer.writeInteger(UInt32(42)) // message length
  488. buffer.writeRepeatingByte(0, count: 42) // message
  489. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  490. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  491. // Make sure we read the message properly
  492. XCTAssertEqual(
  493. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  494. RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  495. )
  496. // Server sends status to end RPC
  497. XCTAssertNoThrow(
  498. try channel.writeInbound(
  499. HTTP2Frame.FramePayload.headers(
  500. .init(headers: [
  501. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  502. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  503. "custom-header": "custom-value",
  504. ])
  505. )
  506. )
  507. )
  508. XCTAssertEqual(
  509. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  510. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  511. )
  512. }
  513. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  514. let handler = GRPCClientStreamHandler(
  515. methodDescriptor: .testTest,
  516. scheme: .http,
  517. authority: nil,
  518. outboundEncoding: .none,
  519. acceptedEncodings: [],
  520. maxPayloadSize: 100,
  521. skipStateMachineAssertions: true
  522. )
  523. let channel = EmbeddedChannel(handler: handler)
  524. // Send client's initial metadata
  525. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  526. XCTAssertNoThrow(try channel.writeOutbound(request))
  527. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  528. let writtenHeaders = try channel.assertReadHeadersOutbound()
  529. XCTAssertEqual(
  530. writtenHeaders.headers,
  531. [
  532. GRPCHTTP2Keys.method.rawValue: "POST",
  533. GRPCHTTP2Keys.scheme.rawValue: "http",
  534. GRPCHTTP2Keys.path.rawValue: "/test/test",
  535. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  536. GRPCHTTP2Keys.te.rawValue: "trailers",
  537. ]
  538. )
  539. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  540. // Receive server's initial metadata
  541. let serverInitialMetadata: HPACKHeaders = [
  542. GRPCHTTP2Keys.status.rawValue: "200",
  543. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  544. "some-custom-header": "some-custom-value",
  545. ]
  546. XCTAssertNoThrow(
  547. try channel.writeInbound(
  548. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  549. )
  550. )
  551. XCTAssertEqual(
  552. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  553. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  554. )
  555. // Send a message
  556. XCTAssertNoThrow(
  557. try channel.writeOutbound(
  558. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  559. )
  560. )
  561. // Assert we wrote it successfully into the channel
  562. let writtenMessage = try channel.assertReadDataOutbound()
  563. var expectedBuffer = ByteBuffer()
  564. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  565. expectedBuffer.writeInteger(UInt32(42)) // message length
  566. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  567. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  568. // Receive server's first message
  569. var buffer = ByteBuffer()
  570. buffer.writeInteger(UInt8(0)) // not compressed
  571. XCTAssertNoThrow(
  572. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  573. )
  574. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  575. buffer.clear()
  576. buffer.writeInteger(UInt32(30)) // message length
  577. XCTAssertNoThrow(
  578. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  579. )
  580. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  581. buffer.clear()
  582. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  583. XCTAssertNoThrow(
  584. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  585. )
  586. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  587. buffer.clear()
  588. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  589. XCTAssertNoThrow(
  590. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  591. )
  592. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  593. buffer.clear()
  594. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  595. XCTAssertNoThrow(
  596. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  597. )
  598. var expected = ByteBuffer()
  599. expected.writeRepeatingByte(0, count: 10)
  600. expected.writeRepeatingByte(1, count: 10)
  601. expected.writeRepeatingByte(2, count: 10)
  602. // Make sure we read the message properly
  603. XCTAssertEqual(
  604. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  605. RPCResponsePart.message(GRPCNIOTransportBytes(expected))
  606. )
  607. }
  608. func testSendMultipleMessagesInSingleBuffer() throws {
  609. let handler = GRPCClientStreamHandler(
  610. methodDescriptor: .testTest,
  611. scheme: .http,
  612. authority: nil,
  613. outboundEncoding: .none,
  614. acceptedEncodings: [],
  615. maxPayloadSize: 100,
  616. skipStateMachineAssertions: true
  617. )
  618. let channel = EmbeddedChannel(handler: handler)
  619. // Send client's initial metadata
  620. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  621. XCTAssertNoThrow(try channel.writeOutbound(request))
  622. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  623. let writtenHeaders = try channel.assertReadHeadersOutbound()
  624. XCTAssertEqual(
  625. writtenHeaders.headers,
  626. [
  627. GRPCHTTP2Keys.method.rawValue: "POST",
  628. GRPCHTTP2Keys.scheme.rawValue: "http",
  629. GRPCHTTP2Keys.path.rawValue: "/test/test",
  630. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  631. GRPCHTTP2Keys.te.rawValue: "trailers",
  632. ]
  633. )
  634. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  635. // Receive server's initial metadata
  636. let serverInitialMetadata: HPACKHeaders = [
  637. GRPCHTTP2Keys.status.rawValue: "200",
  638. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  639. "some-custom-header": "some-custom-value",
  640. ]
  641. XCTAssertNoThrow(
  642. try channel.writeInbound(
  643. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  644. )
  645. )
  646. XCTAssertEqual(
  647. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  648. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  649. )
  650. // This is where this test actually begins. We want to write two messages
  651. // without flushing, and make sure that no messages are sent down the pipeline
  652. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  653. // Write back first message and make sure nothing's written in the channel.
  654. XCTAssertNoThrow(
  655. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  656. )
  657. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  658. // Write back second message and make sure nothing's written in the channel.
  659. XCTAssertNoThrow(
  660. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  661. )
  662. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  663. // Now flush and check we *do* write the data.
  664. channel.flush()
  665. let writtenMessage = try channel.assertReadDataOutbound()
  666. // Make sure both messages have been framed together in the ByteBuffer.
  667. XCTAssertEqual(
  668. writtenMessage.data,
  669. .byteBuffer(
  670. .init(bytes: [
  671. // First message
  672. 0, // Compression disabled
  673. 0, 0, 0, 4, // Message length
  674. 1, 1, 1, 1, // First message data
  675. // Second message
  676. 0, // Compression disabled
  677. 0, 0, 0, 4, // Message length
  678. 2, 2, 2, 2, // Second message data
  679. ])
  680. )
  681. )
  682. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  683. }
  684. func testUnexpectedStreamClose_ErrorFired() throws {
  685. let handler = GRPCClientStreamHandler(
  686. methodDescriptor: .testTest,
  687. scheme: .http,
  688. authority: nil,
  689. outboundEncoding: .none,
  690. acceptedEncodings: [],
  691. maxPayloadSize: 1,
  692. skipStateMachineAssertions: true
  693. )
  694. let channel = EmbeddedChannel(handler: handler)
  695. // Write client's initial metadata
  696. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  697. let clientInitialMetadata: HPACKHeaders = [
  698. GRPCHTTP2Keys.path.rawValue: "/test/test",
  699. GRPCHTTP2Keys.scheme.rawValue: "http",
  700. GRPCHTTP2Keys.method.rawValue: "POST",
  701. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  702. GRPCHTTP2Keys.te.rawValue: "trailers",
  703. ]
  704. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  705. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  706. // An error is fired down the pipeline
  707. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  708. channel.pipeline.fireErrorCaught(thrownError)
  709. // The client receives a status explaining the stream was closed because of the thrown error.
  710. XCTAssertEqual(
  711. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  712. .status(
  713. .init(
  714. code: .unavailable,
  715. message: "Stream unexpectedly closed with error."
  716. ),
  717. [:]
  718. )
  719. )
  720. // We should now be closed: check we can't write anymore.
  721. XCTAssertThrowsError(
  722. ofType: RPCError.self,
  723. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  724. ) { error in
  725. XCTAssertEqual(error.code, .internalError)
  726. XCTAssertEqual(error.message, "Invalid state")
  727. }
  728. }
  729. func testUnexpectedStreamClose_ChannelInactive() throws {
  730. let handler = GRPCClientStreamHandler(
  731. methodDescriptor: .testTest,
  732. scheme: .http,
  733. authority: nil,
  734. outboundEncoding: .none,
  735. acceptedEncodings: [],
  736. maxPayloadSize: 1,
  737. skipStateMachineAssertions: true
  738. )
  739. let channel = EmbeddedChannel(handler: handler)
  740. // Write client's initial metadata
  741. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  742. let clientInitialMetadata: HPACKHeaders = [
  743. GRPCHTTP2Keys.path.rawValue: "/test/test",
  744. GRPCHTTP2Keys.scheme.rawValue: "http",
  745. GRPCHTTP2Keys.method.rawValue: "POST",
  746. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  747. GRPCHTTP2Keys.te.rawValue: "trailers",
  748. ]
  749. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  750. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  751. // Channel becomes inactive
  752. channel.pipeline.fireChannelInactive()
  753. // The client receives a status explaining the stream was closed.
  754. XCTAssertEqual(
  755. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  756. .status(
  757. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  758. [:]
  759. )
  760. )
  761. // We should now be closed: check we can't write anymore.
  762. XCTAssertThrowsError(
  763. ofType: RPCError.self,
  764. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  765. ) { error in
  766. XCTAssertEqual(error.code, .internalError)
  767. XCTAssertEqual(error.message, "Invalid state")
  768. }
  769. }
  770. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  771. let handler = GRPCClientStreamHandler(
  772. methodDescriptor: .testTest,
  773. scheme: .http,
  774. authority: nil,
  775. outboundEncoding: .none,
  776. acceptedEncodings: [],
  777. maxPayloadSize: 1,
  778. skipStateMachineAssertions: true
  779. )
  780. let channel = EmbeddedChannel(handler: handler)
  781. // Write client's initial metadata
  782. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  783. let clientInitialMetadata: HPACKHeaders = [
  784. GRPCHTTP2Keys.path.rawValue: "/test/test",
  785. GRPCHTTP2Keys.scheme.rawValue: "http",
  786. GRPCHTTP2Keys.method.rawValue: "POST",
  787. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  788. GRPCHTTP2Keys.te.rawValue: "trailers",
  789. ]
  790. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  791. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  792. // Receive RST_STREAM
  793. XCTAssertNoThrow(
  794. try channel.writeInbound(
  795. HTTP2Frame.FramePayload.rstStream(.internalError)
  796. )
  797. )
  798. // The client receives a status explaining RST_STREAM was sent.
  799. XCTAssertEqual(
  800. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  801. .status(
  802. .init(
  803. code: .unavailable,
  804. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  805. ),
  806. [:]
  807. )
  808. )
  809. // We should now be closed: check we can't write anymore.
  810. XCTAssertThrowsError(
  811. ofType: RPCError.self,
  812. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  813. ) { error in
  814. XCTAssertEqual(error.code, .internalError)
  815. XCTAssertEqual(error.message, "Invalid state")
  816. }
  817. }
  818. }
  819. extension EmbeddedChannel {
  820. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  821. guard
  822. case .headers(let writtenHeaders) = try XCTUnwrap(
  823. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  824. )
  825. else {
  826. throw TestError.assertionFailure("Expected to write headers")
  827. }
  828. return writtenHeaders
  829. }
  830. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  831. guard
  832. case .data(let writtenMessage) = try XCTUnwrap(
  833. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  834. )
  835. else {
  836. throw TestError.assertionFailure("Expected to write data")
  837. }
  838. return writtenMessage
  839. }
  840. }
  841. private enum TestError: Error {
  842. case assertionFailure(String)
  843. }
  844. extension MethodDescriptor {
  845. static let testTest = Self(fullyQualifiedService: "test", method: "test")
  846. }