GRPCClientStreamHandlerTests.swift 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. @available(gRPCSwiftNIOTransport 2.0, *)
  25. final class GRPCClientStreamHandlerTests: XCTestCase {
  26. func testH2FramesAreIgnored() throws {
  27. let handler = GRPCClientStreamHandler(
  28. methodDescriptor: .testTest,
  29. scheme: .http,
  30. authority: nil,
  31. outboundEncoding: .none,
  32. acceptedEncodings: [],
  33. maxPayloadSize: 1
  34. )
  35. let channel = EmbeddedChannel(handler: handler)
  36. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  37. .ping(.init(), ack: false),
  38. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  39. .priority(
  40. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  41. ),
  42. .settings(.ack),
  43. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  44. .windowUpdate(windowSizeIncrement: 4),
  45. .alternativeService(origin: nil, field: nil),
  46. .origin([]),
  47. ]
  48. for toBeIgnored in framesToBeIgnored {
  49. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  50. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  51. }
  52. }
  53. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  54. let handler = GRPCClientStreamHandler(
  55. methodDescriptor: .testTest,
  56. scheme: .http,
  57. authority: nil,
  58. outboundEncoding: .none,
  59. acceptedEncodings: [],
  60. maxPayloadSize: 1,
  61. skipStateMachineAssertions: true
  62. )
  63. let channel = EmbeddedChannel(handler: handler)
  64. // Send client's initial metadata
  65. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  66. XCTAssertNoThrow(try channel.writeOutbound(request))
  67. // Receive server's initial metadata without :status
  68. let serverInitialMetadata: HPACKHeaders = [
  69. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  70. ]
  71. XCTAssertNoThrow(
  72. try channel.writeInbound(
  73. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  74. )
  75. )
  76. XCTAssertEqual(
  77. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  78. .status(
  79. .init(code: .unknown, message: "HTTP Status Code is missing."),
  80. Metadata(headers: serverInitialMetadata)
  81. )
  82. )
  83. }
  84. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  85. let handler = GRPCClientStreamHandler(
  86. methodDescriptor: .testTest,
  87. scheme: .http,
  88. authority: nil,
  89. outboundEncoding: .none,
  90. acceptedEncodings: [],
  91. maxPayloadSize: 1,
  92. skipStateMachineAssertions: true
  93. )
  94. let channel = EmbeddedChannel(handler: handler)
  95. // Send client's initial metadata
  96. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  97. XCTAssertNoThrow(try channel.writeOutbound(request))
  98. // Receive server's initial metadata with 1xx status
  99. let invalidServerInitialMetadata: HPACKHeaders = [
  100. GRPCHTTP2Keys.status.rawValue: "104",
  101. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  102. ]
  103. XCTAssertNoThrow(
  104. try channel.writeInbound(
  105. HTTP2Frame.FramePayload.headers(.init(headers: invalidServerInitialMetadata))
  106. )
  107. )
  108. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  109. // We are still expecting the correct headers after getting a 1xx response, so make sure we
  110. // don't fail if we get the metadata twice.
  111. let validServerInitialMetadata: HPACKHeaders = [
  112. GRPCHTTP2Keys.status.rawValue: "200",
  113. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  114. "some-custom-header": "some-custom-value",
  115. ]
  116. XCTAssertNoThrow(
  117. try channel.writeInbound(
  118. HTTP2Frame.FramePayload.headers(.init(headers: validServerInitialMetadata))
  119. )
  120. )
  121. XCTAssertEqual(
  122. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  123. RPCResponsePart.metadata(Metadata(headers: validServerInitialMetadata))
  124. )
  125. }
  126. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  127. let handler = GRPCClientStreamHandler(
  128. methodDescriptor: .testTest,
  129. scheme: .http,
  130. authority: nil,
  131. outboundEncoding: .none,
  132. acceptedEncodings: [],
  133. maxPayloadSize: 1,
  134. skipStateMachineAssertions: true
  135. )
  136. let channel = EmbeddedChannel(handler: handler)
  137. // Send client's initial metadata
  138. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  139. XCTAssertNoThrow(try channel.writeOutbound(request))
  140. // Receive server's initial metadata with non-200 and non-1xx :status
  141. let serverInitialMetadata: HPACKHeaders = [
  142. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  143. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  144. ]
  145. XCTAssertNoThrow(
  146. try channel.writeInbound(
  147. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  148. )
  149. )
  150. XCTAssertEqual(
  151. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  152. .status(
  153. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  154. Metadata(headers: serverInitialMetadata)
  155. )
  156. )
  157. // We should not throw if the server sends another message:
  158. // we should drop it, since the server is now closed.
  159. var buffer = ByteBuffer()
  160. buffer.writeInteger(UInt8(0)) // not compressed
  161. buffer.writeInteger(UInt32(42)) // message length
  162. buffer.writeRepeatingByte(0, count: 42) // message
  163. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  164. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  165. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  166. }
  167. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  168. let handler = GRPCClientStreamHandler(
  169. methodDescriptor: .testTest,
  170. scheme: .http,
  171. authority: nil,
  172. outboundEncoding: .none,
  173. acceptedEncodings: [],
  174. maxPayloadSize: 1,
  175. skipStateMachineAssertions: true
  176. )
  177. let channel = EmbeddedChannel(handler: handler)
  178. // Send client's initial metadata
  179. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  180. XCTAssertNoThrow(try channel.writeOutbound(request))
  181. // Receive server's initial metadata without content-type
  182. let serverInitialMetadata: HPACKHeaders = [
  183. GRPCHTTP2Keys.status.rawValue: "200"
  184. ]
  185. XCTAssertNoThrow(
  186. try channel.writeInbound(
  187. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  188. )
  189. )
  190. XCTAssertEqual(
  191. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  192. .status(
  193. .init(code: .internalError, message: "Missing content-type header"),
  194. Metadata(headers: serverInitialMetadata)
  195. )
  196. )
  197. }
  198. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  199. let handler = GRPCClientStreamHandler(
  200. methodDescriptor: .testTest,
  201. scheme: .http,
  202. authority: nil,
  203. outboundEncoding: .deflate,
  204. acceptedEncodings: [.deflate],
  205. maxPayloadSize: 1
  206. )
  207. let channel = EmbeddedChannel(handler: handler)
  208. // Send client's initial metadata
  209. XCTAssertNoThrow(
  210. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  211. )
  212. // Make sure we have sent right metadata.
  213. let writtenMetadata = try channel.assertReadHeadersOutbound()
  214. XCTAssertEqual(
  215. writtenMetadata.headers,
  216. [
  217. GRPCHTTP2Keys.method.rawValue: "POST",
  218. GRPCHTTP2Keys.scheme.rawValue: "http",
  219. GRPCHTTP2Keys.path.rawValue: "/test/test",
  220. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  221. GRPCHTTP2Keys.te.rawValue: "trailers",
  222. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  223. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  224. ]
  225. )
  226. // Server sends initial metadata with unsupported encoding
  227. let serverInitialMetadata: HPACKHeaders = [
  228. GRPCHTTP2Keys.status.rawValue: "200",
  229. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  230. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  231. ]
  232. XCTAssertNoThrow(
  233. try channel.writeInbound(
  234. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  235. )
  236. )
  237. XCTAssertEqual(
  238. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  239. .status(
  240. .init(
  241. code: .internalError,
  242. message:
  243. "The server picked a compression algorithm ('gzip') the client does not know about."
  244. ),
  245. Metadata(headers: serverInitialMetadata)
  246. )
  247. )
  248. }
  249. func testOverMaximumPayloadSize() throws {
  250. let handler = GRPCClientStreamHandler(
  251. methodDescriptor: .testTest,
  252. scheme: .http,
  253. authority: nil,
  254. outboundEncoding: .none,
  255. acceptedEncodings: [],
  256. maxPayloadSize: 1,
  257. skipStateMachineAssertions: true
  258. )
  259. let channel = EmbeddedChannel(handler: handler)
  260. // Send client's initial metadata
  261. XCTAssertNoThrow(
  262. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  263. )
  264. // Make sure we have sent right metadata.
  265. let writtenMetadata = try channel.assertReadHeadersOutbound()
  266. XCTAssertEqual(
  267. writtenMetadata.headers,
  268. [
  269. GRPCHTTP2Keys.method.rawValue: "POST",
  270. GRPCHTTP2Keys.scheme.rawValue: "http",
  271. GRPCHTTP2Keys.path.rawValue: "/test/test",
  272. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  273. GRPCHTTP2Keys.te.rawValue: "trailers",
  274. ]
  275. )
  276. // Server sends initial metadata
  277. let serverInitialMetadata: HPACKHeaders = [
  278. GRPCHTTP2Keys.status.rawValue: "200",
  279. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  280. ]
  281. XCTAssertNoThrow(
  282. try channel.writeInbound(
  283. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  284. )
  285. )
  286. XCTAssertEqual(
  287. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  288. .metadata(Metadata(headers: serverInitialMetadata))
  289. )
  290. // Server sends message over payload limit
  291. var buffer = ByteBuffer()
  292. buffer.writeInteger(UInt8(0)) // not compressed
  293. buffer.writeInteger(UInt32(42)) // message length
  294. buffer.writeRepeatingByte(0, count: 42) // message
  295. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  296. data: .byteBuffer(buffer),
  297. endStream: false
  298. )
  299. // Invalid payload should result in error status and stream being closed
  300. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  301. let part = try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self)
  302. XCTAssertEqual(
  303. part,
  304. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  305. )
  306. channel.embeddedEventLoop.run()
  307. try channel.closeFuture.wait()
  308. }
  309. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  310. let handler = GRPCClientStreamHandler(
  311. methodDescriptor: .testTest,
  312. scheme: .http,
  313. authority: nil,
  314. outboundEncoding: .none,
  315. acceptedEncodings: [],
  316. maxPayloadSize: 100,
  317. skipStateMachineAssertions: true
  318. )
  319. let channel = EmbeddedChannel(handler: handler)
  320. // Send client's initial metadata
  321. XCTAssertNoThrow(
  322. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  323. )
  324. // Make sure we have sent right metadata.
  325. let writtenMetadata = try channel.assertReadHeadersOutbound()
  326. XCTAssertEqual(
  327. writtenMetadata.headers,
  328. [
  329. GRPCHTTP2Keys.method.rawValue: "POST",
  330. GRPCHTTP2Keys.scheme.rawValue: "http",
  331. GRPCHTTP2Keys.path.rawValue: "/test/test",
  332. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  333. GRPCHTTP2Keys.te.rawValue: "trailers",
  334. ]
  335. )
  336. // Server sends initial metadata
  337. let serverInitialMetadata: HPACKHeaders = [
  338. GRPCHTTP2Keys.status.rawValue: "200",
  339. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  340. ]
  341. XCTAssertNoThrow(
  342. try channel.writeInbound(
  343. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  344. )
  345. )
  346. XCTAssertEqual(
  347. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  348. .metadata(Metadata(headers: serverInitialMetadata))
  349. )
  350. // Server sends message with EOS set.
  351. var buffer = ByteBuffer()
  352. buffer.writeInteger(UInt8(0)) // not compressed
  353. buffer.writeInteger(UInt32(42)) // message length
  354. buffer.writeRepeatingByte(0, count: 42) // message
  355. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  356. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  357. // Make sure we got status + trailers with the right error.
  358. XCTAssertEqual(
  359. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  360. .status(
  361. Status(
  362. code: .internalError,
  363. message:
  364. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  365. ),
  366. [:]
  367. )
  368. )
  369. }
  370. func testServerEndsStream() throws {
  371. let handler = GRPCClientStreamHandler(
  372. methodDescriptor: .testTest,
  373. scheme: .http,
  374. authority: nil,
  375. outboundEncoding: .none,
  376. acceptedEncodings: [],
  377. maxPayloadSize: 1,
  378. skipStateMachineAssertions: true
  379. )
  380. let channel = EmbeddedChannel(handler: handler)
  381. // Write client's initial metadata
  382. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  383. let clientInitialMetadata: HPACKHeaders = [
  384. GRPCHTTP2Keys.path.rawValue: "/test/test",
  385. GRPCHTTP2Keys.scheme.rawValue: "http",
  386. GRPCHTTP2Keys.method.rawValue: "POST",
  387. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  388. GRPCHTTP2Keys.te.rawValue: "trailers",
  389. ]
  390. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  391. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  392. // Receive server's initial metadata with end stream set
  393. let serverInitialMetadata: HPACKHeaders = [
  394. GRPCHTTP2Keys.status.rawValue: "200",
  395. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  396. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  397. ]
  398. XCTAssertNoThrow(
  399. try channel.writeInbound(
  400. HTTP2Frame.FramePayload.headers(
  401. .init(
  402. headers: serverInitialMetadata,
  403. endStream: true
  404. )
  405. )
  406. )
  407. )
  408. XCTAssertEqual(
  409. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  410. .status(
  411. .init(code: .ok, message: ""),
  412. [
  413. GRPCHTTP2Keys.status.rawValue: "200",
  414. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  415. ]
  416. )
  417. )
  418. // We should not throw if the server sends another message:
  419. // we should drop it, since the server is now closed.
  420. var buffer = ByteBuffer()
  421. buffer.writeInteger(UInt8(0)) // not compressed
  422. buffer.writeInteger(UInt32(42)) // message length
  423. buffer.writeRepeatingByte(0, count: 42) // message
  424. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  425. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  426. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  427. // We should also not throw if the server sends trailers again.
  428. XCTAssertNoThrow(
  429. try channel.writeInbound(
  430. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata, endStream: true))
  431. )
  432. )
  433. }
  434. func testNormalFlow() throws {
  435. let handler = GRPCClientStreamHandler(
  436. methodDescriptor: .testTest,
  437. scheme: .http,
  438. authority: nil,
  439. outboundEncoding: .none,
  440. acceptedEncodings: [],
  441. maxPayloadSize: 100,
  442. skipStateMachineAssertions: true
  443. )
  444. let channel = EmbeddedChannel(handler: handler)
  445. // Send client's initial metadata
  446. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  447. XCTAssertNoThrow(try channel.writeOutbound(request))
  448. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  449. let writtenHeaders = try channel.assertReadHeadersOutbound()
  450. XCTAssertEqual(
  451. writtenHeaders.headers,
  452. [
  453. GRPCHTTP2Keys.method.rawValue: "POST",
  454. GRPCHTTP2Keys.scheme.rawValue: "http",
  455. GRPCHTTP2Keys.path.rawValue: "/test/test",
  456. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  457. GRPCHTTP2Keys.te.rawValue: "trailers",
  458. ]
  459. )
  460. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  461. // Receive server's initial metadata
  462. let serverInitialMetadata: HPACKHeaders = [
  463. GRPCHTTP2Keys.status.rawValue: "200",
  464. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  465. "some-custom-header": "some-custom-value",
  466. ]
  467. XCTAssertNoThrow(
  468. try channel.writeInbound(
  469. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  470. )
  471. )
  472. XCTAssertEqual(
  473. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  474. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  475. )
  476. // Send a message
  477. XCTAssertNoThrow(
  478. try channel.writeOutbound(
  479. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  480. )
  481. )
  482. // Assert we wrote it successfully into the channel
  483. let writtenMessage = try channel.assertReadDataOutbound()
  484. var expectedBuffer = ByteBuffer()
  485. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  486. expectedBuffer.writeInteger(UInt32(42)) // message length
  487. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  488. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  489. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  490. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  491. // Make sure the EOS frame was sent
  492. let emptyEOSFrame = try channel.assertReadDataOutbound()
  493. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  494. XCTAssertTrue(emptyEOSFrame.endStream)
  495. // Make sure that, if we flush again, we're not writing anything else down
  496. // the stream. We should have closed at this point.
  497. channel.flush()
  498. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  499. // Make sure we cannot write anymore because client's closed.
  500. XCTAssertThrowsError(
  501. ofType: RPCError.self,
  502. try channel.writeOutbound(
  503. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  504. )
  505. ) { error in
  506. XCTAssertEqual(error.code, .internalError)
  507. XCTAssertEqual(error.message, "Invalid state")
  508. }
  509. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  510. // it will be thrown when writing inbound.
  511. try? channel.throwIfErrorCaught()
  512. // Server sends back response message
  513. var buffer = ByteBuffer()
  514. buffer.writeInteger(UInt8(0)) // not compressed
  515. buffer.writeInteger(UInt32(42)) // message length
  516. buffer.writeRepeatingByte(0, count: 42) // message
  517. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  518. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  519. // Make sure we read the message properly
  520. XCTAssertEqual(
  521. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  522. RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  523. )
  524. // Server sends status to end RPC
  525. XCTAssertNoThrow(
  526. try channel.writeInbound(
  527. HTTP2Frame.FramePayload.headers(
  528. .init(headers: [
  529. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  530. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  531. "custom-header": "custom-value",
  532. ])
  533. )
  534. )
  535. )
  536. XCTAssertEqual(
  537. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  538. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  539. )
  540. }
  541. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  542. let handler = GRPCClientStreamHandler(
  543. methodDescriptor: .testTest,
  544. scheme: .http,
  545. authority: nil,
  546. outboundEncoding: .none,
  547. acceptedEncodings: [],
  548. maxPayloadSize: 100,
  549. skipStateMachineAssertions: true
  550. )
  551. let channel = EmbeddedChannel(handler: handler)
  552. // Send client's initial metadata
  553. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  554. XCTAssertNoThrow(try channel.writeOutbound(request))
  555. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  556. let writtenHeaders = try channel.assertReadHeadersOutbound()
  557. XCTAssertEqual(
  558. writtenHeaders.headers,
  559. [
  560. GRPCHTTP2Keys.method.rawValue: "POST",
  561. GRPCHTTP2Keys.scheme.rawValue: "http",
  562. GRPCHTTP2Keys.path.rawValue: "/test/test",
  563. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  564. GRPCHTTP2Keys.te.rawValue: "trailers",
  565. ]
  566. )
  567. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  568. // Receive server's initial metadata
  569. let serverInitialMetadata: HPACKHeaders = [
  570. GRPCHTTP2Keys.status.rawValue: "200",
  571. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  572. "some-custom-header": "some-custom-value",
  573. ]
  574. XCTAssertNoThrow(
  575. try channel.writeInbound(
  576. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  577. )
  578. )
  579. XCTAssertEqual(
  580. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  581. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  582. )
  583. // Send a message
  584. XCTAssertNoThrow(
  585. try channel.writeOutbound(
  586. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  587. )
  588. )
  589. // Assert we wrote it successfully into the channel
  590. let writtenMessage = try channel.assertReadDataOutbound()
  591. var expectedBuffer = ByteBuffer()
  592. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  593. expectedBuffer.writeInteger(UInt32(42)) // message length
  594. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  595. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  596. // Receive server's first message
  597. var buffer = ByteBuffer()
  598. buffer.writeInteger(UInt8(0)) // not compressed
  599. XCTAssertNoThrow(
  600. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  601. )
  602. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  603. buffer.clear()
  604. buffer.writeInteger(UInt32(30)) // message length
  605. XCTAssertNoThrow(
  606. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  607. )
  608. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  609. buffer.clear()
  610. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  611. XCTAssertNoThrow(
  612. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  613. )
  614. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  615. buffer.clear()
  616. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  617. XCTAssertNoThrow(
  618. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  619. )
  620. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  621. buffer.clear()
  622. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  623. XCTAssertNoThrow(
  624. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  625. )
  626. var expected = ByteBuffer()
  627. expected.writeRepeatingByte(0, count: 10)
  628. expected.writeRepeatingByte(1, count: 10)
  629. expected.writeRepeatingByte(2, count: 10)
  630. // Make sure we read the message properly
  631. XCTAssertEqual(
  632. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  633. RPCResponsePart.message(GRPCNIOTransportBytes(expected))
  634. )
  635. }
  636. func testSendMultipleMessagesInSingleBuffer() throws {
  637. let handler = GRPCClientStreamHandler(
  638. methodDescriptor: .testTest,
  639. scheme: .http,
  640. authority: nil,
  641. outboundEncoding: .none,
  642. acceptedEncodings: [],
  643. maxPayloadSize: 100,
  644. skipStateMachineAssertions: true
  645. )
  646. let channel = EmbeddedChannel(handler: handler)
  647. // Send client's initial metadata
  648. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  649. XCTAssertNoThrow(try channel.writeOutbound(request))
  650. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  651. let writtenHeaders = try channel.assertReadHeadersOutbound()
  652. XCTAssertEqual(
  653. writtenHeaders.headers,
  654. [
  655. GRPCHTTP2Keys.method.rawValue: "POST",
  656. GRPCHTTP2Keys.scheme.rawValue: "http",
  657. GRPCHTTP2Keys.path.rawValue: "/test/test",
  658. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  659. GRPCHTTP2Keys.te.rawValue: "trailers",
  660. ]
  661. )
  662. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  663. // Receive server's initial metadata
  664. let serverInitialMetadata: HPACKHeaders = [
  665. GRPCHTTP2Keys.status.rawValue: "200",
  666. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  667. "some-custom-header": "some-custom-value",
  668. ]
  669. XCTAssertNoThrow(
  670. try channel.writeInbound(
  671. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  672. )
  673. )
  674. XCTAssertEqual(
  675. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  676. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  677. )
  678. // This is where this test actually begins. We want to write two messages
  679. // without flushing, and make sure that no messages are sent down the pipeline
  680. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  681. // Write back first message and make sure nothing's written in the channel.
  682. XCTAssertNoThrow(
  683. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  684. )
  685. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  686. // Write back second message and make sure nothing's written in the channel.
  687. XCTAssertNoThrow(
  688. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  689. )
  690. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  691. // Now flush and check we *do* write the data.
  692. channel.flush()
  693. let writtenMessage = try channel.assertReadDataOutbound()
  694. // Make sure both messages have been framed together in the ByteBuffer.
  695. XCTAssertEqual(
  696. writtenMessage.data,
  697. .byteBuffer(
  698. .init(bytes: [
  699. // First message
  700. 0, // Compression disabled
  701. 0, 0, 0, 4, // Message length
  702. 1, 1, 1, 1, // First message data
  703. // Second message
  704. 0, // Compression disabled
  705. 0, 0, 0, 4, // Message length
  706. 2, 2, 2, 2, // Second message data
  707. ])
  708. )
  709. )
  710. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  711. }
  712. func testUnexpectedStreamClose_ErrorFired() throws {
  713. let handler = GRPCClientStreamHandler(
  714. methodDescriptor: .testTest,
  715. scheme: .http,
  716. authority: nil,
  717. outboundEncoding: .none,
  718. acceptedEncodings: [],
  719. maxPayloadSize: 1,
  720. skipStateMachineAssertions: true
  721. )
  722. let channel = EmbeddedChannel(handler: handler)
  723. // Write client's initial metadata
  724. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  725. let clientInitialMetadata: HPACKHeaders = [
  726. GRPCHTTP2Keys.path.rawValue: "/test/test",
  727. GRPCHTTP2Keys.scheme.rawValue: "http",
  728. GRPCHTTP2Keys.method.rawValue: "POST",
  729. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  730. GRPCHTTP2Keys.te.rawValue: "trailers",
  731. ]
  732. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  733. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  734. // An error is fired down the pipeline
  735. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  736. channel.pipeline.fireErrorCaught(thrownError)
  737. // The client receives a status explaining the stream was closed because of the thrown error.
  738. XCTAssertEqual(
  739. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  740. .status(
  741. .init(
  742. code: .unavailable,
  743. message: "Stream unexpectedly closed with error."
  744. ),
  745. [:]
  746. )
  747. )
  748. // We should now be closed: check we can't write anymore.
  749. XCTAssertThrowsError(
  750. ofType: RPCError.self,
  751. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  752. ) { error in
  753. XCTAssertEqual(error.code, .internalError)
  754. XCTAssertEqual(error.message, "Invalid state")
  755. }
  756. }
  757. func testUnexpectedStreamClose_ChannelInactive() throws {
  758. let handler = GRPCClientStreamHandler(
  759. methodDescriptor: .testTest,
  760. scheme: .http,
  761. authority: nil,
  762. outboundEncoding: .none,
  763. acceptedEncodings: [],
  764. maxPayloadSize: 1,
  765. skipStateMachineAssertions: true
  766. )
  767. let channel = EmbeddedChannel(handler: handler)
  768. // Write client's initial metadata
  769. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  770. let clientInitialMetadata: HPACKHeaders = [
  771. GRPCHTTP2Keys.path.rawValue: "/test/test",
  772. GRPCHTTP2Keys.scheme.rawValue: "http",
  773. GRPCHTTP2Keys.method.rawValue: "POST",
  774. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  775. GRPCHTTP2Keys.te.rawValue: "trailers",
  776. ]
  777. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  778. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  779. // Channel becomes inactive
  780. channel.pipeline.fireChannelInactive()
  781. // The client receives a status explaining the stream was closed.
  782. XCTAssertEqual(
  783. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  784. .status(
  785. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  786. [:]
  787. )
  788. )
  789. // We should now be closed: check we can't write anymore.
  790. XCTAssertThrowsError(
  791. ofType: RPCError.self,
  792. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  793. ) { error in
  794. XCTAssertEqual(error.code, .internalError)
  795. XCTAssertEqual(error.message, "Invalid state")
  796. }
  797. }
  798. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  799. let handler = GRPCClientStreamHandler(
  800. methodDescriptor: .testTest,
  801. scheme: .http,
  802. authority: nil,
  803. outboundEncoding: .none,
  804. acceptedEncodings: [],
  805. maxPayloadSize: 1,
  806. skipStateMachineAssertions: true
  807. )
  808. let channel = EmbeddedChannel(handler: handler)
  809. // Write client's initial metadata
  810. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  811. let clientInitialMetadata: HPACKHeaders = [
  812. GRPCHTTP2Keys.path.rawValue: "/test/test",
  813. GRPCHTTP2Keys.scheme.rawValue: "http",
  814. GRPCHTTP2Keys.method.rawValue: "POST",
  815. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  816. GRPCHTTP2Keys.te.rawValue: "trailers",
  817. ]
  818. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  819. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  820. // Receive RST_STREAM
  821. XCTAssertNoThrow(
  822. try channel.writeInbound(
  823. HTTP2Frame.FramePayload.rstStream(.internalError)
  824. )
  825. )
  826. // The client receives a status explaining RST_STREAM was sent.
  827. XCTAssertEqual(
  828. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  829. .status(
  830. .init(
  831. code: .unavailable,
  832. message: "Stream unexpectedly closed: received RST_STREAM frame (0x2: internal error)."
  833. ),
  834. [:]
  835. )
  836. )
  837. // We should now be closed: check we can't write anymore.
  838. XCTAssertThrowsError(
  839. ofType: RPCError.self,
  840. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  841. ) { error in
  842. XCTAssertEqual(error.code, .internalError)
  843. XCTAssertEqual(error.message, "Invalid state")
  844. }
  845. }
  846. }
  847. extension EmbeddedChannel {
  848. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  849. guard
  850. case .headers(let writtenHeaders) = try XCTUnwrap(
  851. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  852. )
  853. else {
  854. throw TestError.assertionFailure("Expected to write headers")
  855. }
  856. return writtenHeaders
  857. }
  858. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  859. guard
  860. case .data(let writtenMessage) = try XCTUnwrap(
  861. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  862. )
  863. else {
  864. throw TestError.assertionFailure("Expected to write data")
  865. }
  866. return writtenMessage
  867. }
  868. }
  869. private enum TestError: Error {
  870. case assertionFailure(String)
  871. }
  872. @available(gRPCSwiftNIOTransport 2.0, *)
  873. extension MethodDescriptor {
  874. static let testTest = Self(fullyQualifiedService: "test", method: "test")
  875. }