GRPCClientStreamHandlerTests.swift 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCClientStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCClientStreamHandler(
  27. methodDescriptor: .testTest,
  28. scheme: .http,
  29. authority: nil,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maxPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  39. // .priority(
  40. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  41. // ),
  42. .settings(.ack),
  43. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  44. .windowUpdate(windowSizeIncrement: 4),
  45. .alternativeService(origin: nil, field: nil),
  46. .origin([]),
  47. ]
  48. for toBeIgnored in framesToBeIgnored {
  49. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  50. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  51. }
  52. }
  53. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  54. let handler = GRPCClientStreamHandler(
  55. methodDescriptor: .testTest,
  56. scheme: .http,
  57. authority: nil,
  58. outboundEncoding: .none,
  59. acceptedEncodings: [],
  60. maxPayloadSize: 1,
  61. skipStateMachineAssertions: true
  62. )
  63. let channel = EmbeddedChannel(handler: handler)
  64. // Send client's initial metadata
  65. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  66. XCTAssertNoThrow(try channel.writeOutbound(request))
  67. // Receive server's initial metadata without :status
  68. let serverInitialMetadata: HPACKHeaders = [
  69. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  70. ]
  71. XCTAssertNoThrow(
  72. try channel.writeInbound(
  73. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  74. )
  75. )
  76. XCTAssertEqual(
  77. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  78. .status(
  79. .init(code: .unknown, message: "HTTP Status Code is missing."),
  80. Metadata(headers: serverInitialMetadata)
  81. )
  82. )
  83. }
  84. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  85. let handler = GRPCClientStreamHandler(
  86. methodDescriptor: .testTest,
  87. scheme: .http,
  88. authority: nil,
  89. outboundEncoding: .none,
  90. acceptedEncodings: [],
  91. maxPayloadSize: 1,
  92. skipStateMachineAssertions: true
  93. )
  94. let channel = EmbeddedChannel(handler: handler)
  95. // Send client's initial metadata
  96. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  97. XCTAssertNoThrow(try channel.writeOutbound(request))
  98. // Receive server's initial metadata with 1xx status
  99. let serverInitialMetadata: HPACKHeaders = [
  100. GRPCHTTP2Keys.status.rawValue: "104",
  101. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  102. ]
  103. XCTAssertNoThrow(
  104. try channel.writeInbound(
  105. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  106. )
  107. )
  108. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  109. }
  110. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  111. let handler = GRPCClientStreamHandler(
  112. methodDescriptor: .testTest,
  113. scheme: .http,
  114. authority: nil,
  115. outboundEncoding: .none,
  116. acceptedEncodings: [],
  117. maxPayloadSize: 1,
  118. skipStateMachineAssertions: true
  119. )
  120. let channel = EmbeddedChannel(handler: handler)
  121. // Send client's initial metadata
  122. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  123. XCTAssertNoThrow(try channel.writeOutbound(request))
  124. // Receive server's initial metadata with non-200 and non-1xx :status
  125. let serverInitialMetadata: HPACKHeaders = [
  126. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  127. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  128. ]
  129. XCTAssertNoThrow(
  130. try channel.writeInbound(
  131. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  132. )
  133. )
  134. XCTAssertEqual(
  135. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  136. .status(
  137. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  138. Metadata(headers: serverInitialMetadata)
  139. )
  140. )
  141. }
  142. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  143. let handler = GRPCClientStreamHandler(
  144. methodDescriptor: .testTest,
  145. scheme: .http,
  146. authority: nil,
  147. outboundEncoding: .none,
  148. acceptedEncodings: [],
  149. maxPayloadSize: 1,
  150. skipStateMachineAssertions: true
  151. )
  152. let channel = EmbeddedChannel(handler: handler)
  153. // Send client's initial metadata
  154. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  155. XCTAssertNoThrow(try channel.writeOutbound(request))
  156. // Receive server's initial metadata without content-type
  157. let serverInitialMetadata: HPACKHeaders = [
  158. GRPCHTTP2Keys.status.rawValue: "200"
  159. ]
  160. XCTAssertNoThrow(
  161. try channel.writeInbound(
  162. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  163. )
  164. )
  165. XCTAssertEqual(
  166. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  167. .status(
  168. .init(code: .internalError, message: "Missing content-type header"),
  169. Metadata(headers: serverInitialMetadata)
  170. )
  171. )
  172. }
  173. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  174. let handler = GRPCClientStreamHandler(
  175. methodDescriptor: .testTest,
  176. scheme: .http,
  177. authority: nil,
  178. outboundEncoding: .deflate,
  179. acceptedEncodings: [.deflate],
  180. maxPayloadSize: 1
  181. )
  182. let channel = EmbeddedChannel(handler: handler)
  183. // Send client's initial metadata
  184. XCTAssertNoThrow(
  185. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  186. )
  187. // Make sure we have sent right metadata.
  188. let writtenMetadata = try channel.assertReadHeadersOutbound()
  189. XCTAssertEqual(
  190. writtenMetadata.headers,
  191. [
  192. GRPCHTTP2Keys.method.rawValue: "POST",
  193. GRPCHTTP2Keys.scheme.rawValue: "http",
  194. GRPCHTTP2Keys.path.rawValue: "/test/test",
  195. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  196. GRPCHTTP2Keys.te.rawValue: "trailers",
  197. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  198. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  199. ]
  200. )
  201. // Server sends initial metadata with unsupported encoding
  202. let serverInitialMetadata: HPACKHeaders = [
  203. GRPCHTTP2Keys.status.rawValue: "200",
  204. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  205. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  206. ]
  207. XCTAssertNoThrow(
  208. try channel.writeInbound(
  209. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  210. )
  211. )
  212. XCTAssertEqual(
  213. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  214. .status(
  215. .init(
  216. code: .internalError,
  217. message:
  218. "The server picked a compression algorithm ('gzip') the client does not know about."
  219. ),
  220. Metadata(headers: serverInitialMetadata)
  221. )
  222. )
  223. }
  224. func testOverMaximumPayloadSize() throws {
  225. let handler = GRPCClientStreamHandler(
  226. methodDescriptor: .testTest,
  227. scheme: .http,
  228. authority: nil,
  229. outboundEncoding: .none,
  230. acceptedEncodings: [],
  231. maxPayloadSize: 1,
  232. skipStateMachineAssertions: true
  233. )
  234. let channel = EmbeddedChannel(handler: handler)
  235. // Send client's initial metadata
  236. XCTAssertNoThrow(
  237. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  238. )
  239. // Make sure we have sent right metadata.
  240. let writtenMetadata = try channel.assertReadHeadersOutbound()
  241. XCTAssertEqual(
  242. writtenMetadata.headers,
  243. [
  244. GRPCHTTP2Keys.method.rawValue: "POST",
  245. GRPCHTTP2Keys.scheme.rawValue: "http",
  246. GRPCHTTP2Keys.path.rawValue: "/test/test",
  247. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  248. GRPCHTTP2Keys.te.rawValue: "trailers",
  249. ]
  250. )
  251. // Server sends initial metadata
  252. let serverInitialMetadata: HPACKHeaders = [
  253. GRPCHTTP2Keys.status.rawValue: "200",
  254. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  255. ]
  256. XCTAssertNoThrow(
  257. try channel.writeInbound(
  258. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  259. )
  260. )
  261. XCTAssertEqual(
  262. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  263. .metadata(Metadata(headers: serverInitialMetadata))
  264. )
  265. // Server sends message over payload limit
  266. var buffer = ByteBuffer()
  267. buffer.writeInteger(UInt8(0)) // not compressed
  268. buffer.writeInteger(UInt32(42)) // message length
  269. buffer.writeRepeatingByte(0, count: 42) // message
  270. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  271. data: .byteBuffer(buffer),
  272. endStream: false
  273. )
  274. // Invalid payload should result in error status and stream being closed
  275. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  276. let part = try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self)
  277. XCTAssertEqual(
  278. part,
  279. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  280. )
  281. channel.embeddedEventLoop.run()
  282. try channel.closeFuture.wait()
  283. }
  284. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  285. let handler = GRPCClientStreamHandler(
  286. methodDescriptor: .testTest,
  287. scheme: .http,
  288. authority: nil,
  289. outboundEncoding: .none,
  290. acceptedEncodings: [],
  291. maxPayloadSize: 100,
  292. skipStateMachineAssertions: true
  293. )
  294. let channel = EmbeddedChannel(handler: handler)
  295. // Send client's initial metadata
  296. XCTAssertNoThrow(
  297. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  298. )
  299. // Make sure we have sent right metadata.
  300. let writtenMetadata = try channel.assertReadHeadersOutbound()
  301. XCTAssertEqual(
  302. writtenMetadata.headers,
  303. [
  304. GRPCHTTP2Keys.method.rawValue: "POST",
  305. GRPCHTTP2Keys.scheme.rawValue: "http",
  306. GRPCHTTP2Keys.path.rawValue: "/test/test",
  307. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  308. GRPCHTTP2Keys.te.rawValue: "trailers",
  309. ]
  310. )
  311. // Server sends initial metadata
  312. let serverInitialMetadata: HPACKHeaders = [
  313. GRPCHTTP2Keys.status.rawValue: "200",
  314. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  315. ]
  316. XCTAssertNoThrow(
  317. try channel.writeInbound(
  318. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  319. )
  320. )
  321. XCTAssertEqual(
  322. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  323. .metadata(Metadata(headers: serverInitialMetadata))
  324. )
  325. // Server sends message with EOS set.
  326. var buffer = ByteBuffer()
  327. buffer.writeInteger(UInt8(0)) // not compressed
  328. buffer.writeInteger(UInt32(42)) // message length
  329. buffer.writeRepeatingByte(0, count: 42) // message
  330. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  331. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  332. // Make sure we got status + trailers with the right error.
  333. XCTAssertEqual(
  334. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  335. .status(
  336. Status(
  337. code: .internalError,
  338. message:
  339. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  340. ),
  341. [:]
  342. )
  343. )
  344. }
  345. func testServerEndsStream() throws {
  346. let handler = GRPCClientStreamHandler(
  347. methodDescriptor: .testTest,
  348. scheme: .http,
  349. authority: nil,
  350. outboundEncoding: .none,
  351. acceptedEncodings: [],
  352. maxPayloadSize: 1,
  353. skipStateMachineAssertions: true
  354. )
  355. let channel = EmbeddedChannel(handler: handler)
  356. // Write client's initial metadata
  357. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  358. let clientInitialMetadata: HPACKHeaders = [
  359. GRPCHTTP2Keys.path.rawValue: "/test/test",
  360. GRPCHTTP2Keys.scheme.rawValue: "http",
  361. GRPCHTTP2Keys.method.rawValue: "POST",
  362. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  363. GRPCHTTP2Keys.te.rawValue: "trailers",
  364. ]
  365. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  366. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  367. // Receive server's initial metadata with end stream set
  368. let serverInitialMetadata: HPACKHeaders = [
  369. GRPCHTTP2Keys.status.rawValue: "200",
  370. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  371. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  372. ]
  373. XCTAssertNoThrow(
  374. try channel.writeInbound(
  375. HTTP2Frame.FramePayload.headers(
  376. .init(
  377. headers: serverInitialMetadata,
  378. endStream: true
  379. )
  380. )
  381. )
  382. )
  383. XCTAssertEqual(
  384. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  385. .status(
  386. .init(code: .ok, message: ""),
  387. [
  388. GRPCHTTP2Keys.status.rawValue: "200",
  389. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  390. ]
  391. )
  392. )
  393. // We should throw if the server sends another message, since it's closed the stream already.
  394. var buffer = ByteBuffer()
  395. buffer.writeInteger(UInt8(0)) // not compressed
  396. buffer.writeInteger(UInt32(42)) // message length
  397. buffer.writeRepeatingByte(0, count: 42) // message
  398. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  399. XCTAssertThrowsError(
  400. ofType: RPCError.self,
  401. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  402. ) { error in
  403. XCTAssertEqual(error.code, .internalError)
  404. XCTAssertEqual(error.message, "Invalid state")
  405. }
  406. }
  407. func testNormalFlow() throws {
  408. let handler = GRPCClientStreamHandler(
  409. methodDescriptor: .testTest,
  410. scheme: .http,
  411. authority: nil,
  412. outboundEncoding: .none,
  413. acceptedEncodings: [],
  414. maxPayloadSize: 100,
  415. skipStateMachineAssertions: true
  416. )
  417. let channel = EmbeddedChannel(handler: handler)
  418. // Send client's initial metadata
  419. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  420. XCTAssertNoThrow(try channel.writeOutbound(request))
  421. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  422. let writtenHeaders = try channel.assertReadHeadersOutbound()
  423. XCTAssertEqual(
  424. writtenHeaders.headers,
  425. [
  426. GRPCHTTP2Keys.method.rawValue: "POST",
  427. GRPCHTTP2Keys.scheme.rawValue: "http",
  428. GRPCHTTP2Keys.path.rawValue: "/test/test",
  429. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  430. GRPCHTTP2Keys.te.rawValue: "trailers",
  431. ]
  432. )
  433. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  434. // Receive server's initial metadata
  435. let serverInitialMetadata: HPACKHeaders = [
  436. GRPCHTTP2Keys.status.rawValue: "200",
  437. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  438. "some-custom-header": "some-custom-value",
  439. ]
  440. XCTAssertNoThrow(
  441. try channel.writeInbound(
  442. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  443. )
  444. )
  445. XCTAssertEqual(
  446. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  447. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  448. )
  449. // Send a message
  450. XCTAssertNoThrow(
  451. try channel.writeOutbound(
  452. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  453. )
  454. )
  455. // Assert we wrote it successfully into the channel
  456. let writtenMessage = try channel.assertReadDataOutbound()
  457. var expectedBuffer = ByteBuffer()
  458. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  459. expectedBuffer.writeInteger(UInt32(42)) // message length
  460. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  461. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  462. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  463. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  464. // Make sure the EOS frame was sent
  465. let emptyEOSFrame = try channel.assertReadDataOutbound()
  466. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  467. XCTAssertTrue(emptyEOSFrame.endStream)
  468. // Make sure that, if we flush again, we're not writing anything else down
  469. // the stream. We should have closed at this point.
  470. channel.flush()
  471. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  472. // Make sure we cannot write anymore because client's closed.
  473. XCTAssertThrowsError(
  474. ofType: RPCError.self,
  475. try channel.writeOutbound(
  476. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  477. )
  478. ) { error in
  479. XCTAssertEqual(error.code, .internalError)
  480. XCTAssertEqual(error.message, "Invalid state")
  481. }
  482. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  483. // it will be thrown when writing inbound.
  484. try? channel.throwIfErrorCaught()
  485. // Server sends back response message
  486. var buffer = ByteBuffer()
  487. buffer.writeInteger(UInt8(0)) // not compressed
  488. buffer.writeInteger(UInt32(42)) // message length
  489. buffer.writeRepeatingByte(0, count: 42) // message
  490. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  491. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  492. // Make sure we read the message properly
  493. XCTAssertEqual(
  494. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  495. RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  496. )
  497. // Server sends status to end RPC
  498. XCTAssertNoThrow(
  499. try channel.writeInbound(
  500. HTTP2Frame.FramePayload.headers(
  501. .init(headers: [
  502. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  503. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  504. "custom-header": "custom-value",
  505. ])
  506. )
  507. )
  508. )
  509. XCTAssertEqual(
  510. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  511. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  512. )
  513. }
  514. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  515. let handler = GRPCClientStreamHandler(
  516. methodDescriptor: .testTest,
  517. scheme: .http,
  518. authority: nil,
  519. outboundEncoding: .none,
  520. acceptedEncodings: [],
  521. maxPayloadSize: 100,
  522. skipStateMachineAssertions: true
  523. )
  524. let channel = EmbeddedChannel(handler: handler)
  525. // Send client's initial metadata
  526. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  527. XCTAssertNoThrow(try channel.writeOutbound(request))
  528. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  529. let writtenHeaders = try channel.assertReadHeadersOutbound()
  530. XCTAssertEqual(
  531. writtenHeaders.headers,
  532. [
  533. GRPCHTTP2Keys.method.rawValue: "POST",
  534. GRPCHTTP2Keys.scheme.rawValue: "http",
  535. GRPCHTTP2Keys.path.rawValue: "/test/test",
  536. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  537. GRPCHTTP2Keys.te.rawValue: "trailers",
  538. ]
  539. )
  540. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  541. // Receive server's initial metadata
  542. let serverInitialMetadata: HPACKHeaders = [
  543. GRPCHTTP2Keys.status.rawValue: "200",
  544. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  545. "some-custom-header": "some-custom-value",
  546. ]
  547. XCTAssertNoThrow(
  548. try channel.writeInbound(
  549. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  550. )
  551. )
  552. XCTAssertEqual(
  553. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  554. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  555. )
  556. // Send a message
  557. XCTAssertNoThrow(
  558. try channel.writeOutbound(
  559. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  560. )
  561. )
  562. // Assert we wrote it successfully into the channel
  563. let writtenMessage = try channel.assertReadDataOutbound()
  564. var expectedBuffer = ByteBuffer()
  565. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  566. expectedBuffer.writeInteger(UInt32(42)) // message length
  567. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  568. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  569. // Receive server's first message
  570. var buffer = ByteBuffer()
  571. buffer.writeInteger(UInt8(0)) // not compressed
  572. XCTAssertNoThrow(
  573. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  574. )
  575. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  576. buffer.clear()
  577. buffer.writeInteger(UInt32(30)) // message length
  578. XCTAssertNoThrow(
  579. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  580. )
  581. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  582. buffer.clear()
  583. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  584. XCTAssertNoThrow(
  585. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  586. )
  587. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  588. buffer.clear()
  589. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  590. XCTAssertNoThrow(
  591. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  592. )
  593. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  594. buffer.clear()
  595. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  596. XCTAssertNoThrow(
  597. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  598. )
  599. var expected = ByteBuffer()
  600. expected.writeRepeatingByte(0, count: 10)
  601. expected.writeRepeatingByte(1, count: 10)
  602. expected.writeRepeatingByte(2, count: 10)
  603. // Make sure we read the message properly
  604. XCTAssertEqual(
  605. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  606. RPCResponsePart.message(GRPCNIOTransportBytes(expected))
  607. )
  608. }
  609. func testSendMultipleMessagesInSingleBuffer() throws {
  610. let handler = GRPCClientStreamHandler(
  611. methodDescriptor: .testTest,
  612. scheme: .http,
  613. authority: nil,
  614. outboundEncoding: .none,
  615. acceptedEncodings: [],
  616. maxPayloadSize: 100,
  617. skipStateMachineAssertions: true
  618. )
  619. let channel = EmbeddedChannel(handler: handler)
  620. // Send client's initial metadata
  621. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  622. XCTAssertNoThrow(try channel.writeOutbound(request))
  623. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  624. let writtenHeaders = try channel.assertReadHeadersOutbound()
  625. XCTAssertEqual(
  626. writtenHeaders.headers,
  627. [
  628. GRPCHTTP2Keys.method.rawValue: "POST",
  629. GRPCHTTP2Keys.scheme.rawValue: "http",
  630. GRPCHTTP2Keys.path.rawValue: "/test/test",
  631. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  632. GRPCHTTP2Keys.te.rawValue: "trailers",
  633. ]
  634. )
  635. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  636. // Receive server's initial metadata
  637. let serverInitialMetadata: HPACKHeaders = [
  638. GRPCHTTP2Keys.status.rawValue: "200",
  639. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  640. "some-custom-header": "some-custom-value",
  641. ]
  642. XCTAssertNoThrow(
  643. try channel.writeInbound(
  644. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  645. )
  646. )
  647. XCTAssertEqual(
  648. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  649. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  650. )
  651. // This is where this test actually begins. We want to write two messages
  652. // without flushing, and make sure that no messages are sent down the pipeline
  653. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  654. // Write back first message and make sure nothing's written in the channel.
  655. XCTAssertNoThrow(
  656. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  657. )
  658. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  659. // Write back second message and make sure nothing's written in the channel.
  660. XCTAssertNoThrow(
  661. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  662. )
  663. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  664. // Now flush and check we *do* write the data.
  665. channel.flush()
  666. let writtenMessage = try channel.assertReadDataOutbound()
  667. // Make sure both messages have been framed together in the ByteBuffer.
  668. XCTAssertEqual(
  669. writtenMessage.data,
  670. .byteBuffer(
  671. .init(bytes: [
  672. // First message
  673. 0, // Compression disabled
  674. 0, 0, 0, 4, // Message length
  675. 1, 1, 1, 1, // First message data
  676. // Second message
  677. 0, // Compression disabled
  678. 0, 0, 0, 4, // Message length
  679. 2, 2, 2, 2, // Second message data
  680. ])
  681. )
  682. )
  683. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  684. }
  685. func testUnexpectedStreamClose_ErrorFired() throws {
  686. let handler = GRPCClientStreamHandler(
  687. methodDescriptor: .testTest,
  688. scheme: .http,
  689. authority: nil,
  690. outboundEncoding: .none,
  691. acceptedEncodings: [],
  692. maxPayloadSize: 1,
  693. skipStateMachineAssertions: true
  694. )
  695. let channel = EmbeddedChannel(handler: handler)
  696. // Write client's initial metadata
  697. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  698. let clientInitialMetadata: HPACKHeaders = [
  699. GRPCHTTP2Keys.path.rawValue: "/test/test",
  700. GRPCHTTP2Keys.scheme.rawValue: "http",
  701. GRPCHTTP2Keys.method.rawValue: "POST",
  702. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  703. GRPCHTTP2Keys.te.rawValue: "trailers",
  704. ]
  705. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  706. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  707. // An error is fired down the pipeline
  708. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  709. channel.pipeline.fireErrorCaught(thrownError)
  710. // The client receives a status explaining the stream was closed because of the thrown error.
  711. XCTAssertEqual(
  712. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  713. .status(
  714. .init(
  715. code: .unavailable,
  716. message: "Stream unexpectedly closed with error."
  717. ),
  718. [:]
  719. )
  720. )
  721. // We should now be closed: check we can't write anymore.
  722. XCTAssertThrowsError(
  723. ofType: RPCError.self,
  724. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  725. ) { error in
  726. XCTAssertEqual(error.code, .internalError)
  727. XCTAssertEqual(error.message, "Invalid state")
  728. }
  729. }
  730. func testUnexpectedStreamClose_ChannelInactive() throws {
  731. let handler = GRPCClientStreamHandler(
  732. methodDescriptor: .testTest,
  733. scheme: .http,
  734. authority: nil,
  735. outboundEncoding: .none,
  736. acceptedEncodings: [],
  737. maxPayloadSize: 1,
  738. skipStateMachineAssertions: true
  739. )
  740. let channel = EmbeddedChannel(handler: handler)
  741. // Write client's initial metadata
  742. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  743. let clientInitialMetadata: HPACKHeaders = [
  744. GRPCHTTP2Keys.path.rawValue: "/test/test",
  745. GRPCHTTP2Keys.scheme.rawValue: "http",
  746. GRPCHTTP2Keys.method.rawValue: "POST",
  747. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  748. GRPCHTTP2Keys.te.rawValue: "trailers",
  749. ]
  750. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  751. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  752. // Channel becomes inactive
  753. channel.pipeline.fireChannelInactive()
  754. // The client receives a status explaining the stream was closed.
  755. XCTAssertEqual(
  756. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  757. .status(
  758. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  759. [:]
  760. )
  761. )
  762. // We should now be closed: check we can't write anymore.
  763. XCTAssertThrowsError(
  764. ofType: RPCError.self,
  765. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  766. ) { error in
  767. XCTAssertEqual(error.code, .internalError)
  768. XCTAssertEqual(error.message, "Invalid state")
  769. }
  770. }
  771. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  772. let handler = GRPCClientStreamHandler(
  773. methodDescriptor: .testTest,
  774. scheme: .http,
  775. authority: nil,
  776. outboundEncoding: .none,
  777. acceptedEncodings: [],
  778. maxPayloadSize: 1,
  779. skipStateMachineAssertions: true
  780. )
  781. let channel = EmbeddedChannel(handler: handler)
  782. // Write client's initial metadata
  783. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  784. let clientInitialMetadata: HPACKHeaders = [
  785. GRPCHTTP2Keys.path.rawValue: "/test/test",
  786. GRPCHTTP2Keys.scheme.rawValue: "http",
  787. GRPCHTTP2Keys.method.rawValue: "POST",
  788. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  789. GRPCHTTP2Keys.te.rawValue: "trailers",
  790. ]
  791. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  792. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  793. // Receive RST_STREAM
  794. XCTAssertNoThrow(
  795. try channel.writeInbound(
  796. HTTP2Frame.FramePayload.rstStream(.internalError)
  797. )
  798. )
  799. // The client receives a status explaining RST_STREAM was sent.
  800. XCTAssertEqual(
  801. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  802. .status(
  803. .init(
  804. code: .unavailable,
  805. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  806. ),
  807. [:]
  808. )
  809. )
  810. // We should now be closed: check we can't write anymore.
  811. XCTAssertThrowsError(
  812. ofType: RPCError.self,
  813. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  814. ) { error in
  815. XCTAssertEqual(error.code, .internalError)
  816. XCTAssertEqual(error.message, "Invalid state")
  817. }
  818. }
  819. }
  820. extension EmbeddedChannel {
  821. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  822. guard
  823. case .headers(let writtenHeaders) = try XCTUnwrap(
  824. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  825. )
  826. else {
  827. throw TestError.assertionFailure("Expected to write headers")
  828. }
  829. return writtenHeaders
  830. }
  831. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  832. guard
  833. case .data(let writtenMessage) = try XCTUnwrap(
  834. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  835. )
  836. else {
  837. throw TestError.assertionFailure("Expected to write data")
  838. }
  839. return writtenMessage
  840. }
  841. }
  842. private enum TestError: Error {
  843. case assertionFailure(String)
  844. }
  845. extension MethodDescriptor {
  846. static let testTest = Self(fullyQualifiedService: "test", method: "test")
  847. }