GRPCClientStreamHandlerTests.swift 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCClientStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let handler = GRPCClientStreamHandler(
  27. methodDescriptor: .testTest,
  28. scheme: .http,
  29. authority: nil,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maxPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. .priority(
  39. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  40. ),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  53. let handler = GRPCClientStreamHandler(
  54. methodDescriptor: .testTest,
  55. scheme: .http,
  56. authority: nil,
  57. outboundEncoding: .none,
  58. acceptedEncodings: [],
  59. maxPayloadSize: 1,
  60. skipStateMachineAssertions: true
  61. )
  62. let channel = EmbeddedChannel(handler: handler)
  63. // Send client's initial metadata
  64. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  65. XCTAssertNoThrow(try channel.writeOutbound(request))
  66. // Receive server's initial metadata without :status
  67. let serverInitialMetadata: HPACKHeaders = [
  68. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  69. ]
  70. XCTAssertNoThrow(
  71. try channel.writeInbound(
  72. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  73. )
  74. )
  75. XCTAssertEqual(
  76. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  77. .status(
  78. .init(code: .unknown, message: "HTTP Status Code is missing."),
  79. Metadata(headers: serverInitialMetadata)
  80. )
  81. )
  82. }
  83. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  84. let handler = GRPCClientStreamHandler(
  85. methodDescriptor: .testTest,
  86. scheme: .http,
  87. authority: nil,
  88. outboundEncoding: .none,
  89. acceptedEncodings: [],
  90. maxPayloadSize: 1,
  91. skipStateMachineAssertions: true
  92. )
  93. let channel = EmbeddedChannel(handler: handler)
  94. // Send client's initial metadata
  95. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  96. XCTAssertNoThrow(try channel.writeOutbound(request))
  97. // Receive server's initial metadata with 1xx status
  98. let invalidServerInitialMetadata: HPACKHeaders = [
  99. GRPCHTTP2Keys.status.rawValue: "104",
  100. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  101. ]
  102. XCTAssertNoThrow(
  103. try channel.writeInbound(
  104. HTTP2Frame.FramePayload.headers(.init(headers: invalidServerInitialMetadata))
  105. )
  106. )
  107. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  108. // We are still expecting the correct headers after getting a 1xx response, so make sure we
  109. // don't fail if we get the metadata twice.
  110. let validServerInitialMetadata: HPACKHeaders = [
  111. GRPCHTTP2Keys.status.rawValue: "200",
  112. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  113. "some-custom-header": "some-custom-value",
  114. ]
  115. XCTAssertNoThrow(
  116. try channel.writeInbound(
  117. HTTP2Frame.FramePayload.headers(.init(headers: validServerInitialMetadata))
  118. )
  119. )
  120. XCTAssertEqual(
  121. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  122. RPCResponsePart.metadata(Metadata(headers: validServerInitialMetadata))
  123. )
  124. }
  125. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  126. let handler = GRPCClientStreamHandler(
  127. methodDescriptor: .testTest,
  128. scheme: .http,
  129. authority: nil,
  130. outboundEncoding: .none,
  131. acceptedEncodings: [],
  132. maxPayloadSize: 1,
  133. skipStateMachineAssertions: true
  134. )
  135. let channel = EmbeddedChannel(handler: handler)
  136. // Send client's initial metadata
  137. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  138. XCTAssertNoThrow(try channel.writeOutbound(request))
  139. // Receive server's initial metadata with non-200 and non-1xx :status
  140. let serverInitialMetadata: HPACKHeaders = [
  141. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  142. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  143. ]
  144. XCTAssertNoThrow(
  145. try channel.writeInbound(
  146. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  147. )
  148. )
  149. XCTAssertEqual(
  150. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  151. .status(
  152. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  153. Metadata(headers: serverInitialMetadata)
  154. )
  155. )
  156. // We should not throw if the server sends another message:
  157. // we should drop it, since the server is now closed.
  158. var buffer = ByteBuffer()
  159. buffer.writeInteger(UInt8(0)) // not compressed
  160. buffer.writeInteger(UInt32(42)) // message length
  161. buffer.writeRepeatingByte(0, count: 42) // message
  162. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  163. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  164. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  165. }
  166. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  167. let handler = GRPCClientStreamHandler(
  168. methodDescriptor: .testTest,
  169. scheme: .http,
  170. authority: nil,
  171. outboundEncoding: .none,
  172. acceptedEncodings: [],
  173. maxPayloadSize: 1,
  174. skipStateMachineAssertions: true
  175. )
  176. let channel = EmbeddedChannel(handler: handler)
  177. // Send client's initial metadata
  178. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  179. XCTAssertNoThrow(try channel.writeOutbound(request))
  180. // Receive server's initial metadata without content-type
  181. let serverInitialMetadata: HPACKHeaders = [
  182. GRPCHTTP2Keys.status.rawValue: "200"
  183. ]
  184. XCTAssertNoThrow(
  185. try channel.writeInbound(
  186. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  187. )
  188. )
  189. XCTAssertEqual(
  190. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  191. .status(
  192. .init(code: .internalError, message: "Missing content-type header"),
  193. Metadata(headers: serverInitialMetadata)
  194. )
  195. )
  196. }
  197. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  198. let handler = GRPCClientStreamHandler(
  199. methodDescriptor: .testTest,
  200. scheme: .http,
  201. authority: nil,
  202. outboundEncoding: .deflate,
  203. acceptedEncodings: [.deflate],
  204. maxPayloadSize: 1
  205. )
  206. let channel = EmbeddedChannel(handler: handler)
  207. // Send client's initial metadata
  208. XCTAssertNoThrow(
  209. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  210. )
  211. // Make sure we have sent right metadata.
  212. let writtenMetadata = try channel.assertReadHeadersOutbound()
  213. XCTAssertEqual(
  214. writtenMetadata.headers,
  215. [
  216. GRPCHTTP2Keys.method.rawValue: "POST",
  217. GRPCHTTP2Keys.scheme.rawValue: "http",
  218. GRPCHTTP2Keys.path.rawValue: "/test/test",
  219. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  220. GRPCHTTP2Keys.te.rawValue: "trailers",
  221. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  222. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  223. ]
  224. )
  225. // Server sends initial metadata with unsupported encoding
  226. let serverInitialMetadata: HPACKHeaders = [
  227. GRPCHTTP2Keys.status.rawValue: "200",
  228. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  229. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  230. ]
  231. XCTAssertNoThrow(
  232. try channel.writeInbound(
  233. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  234. )
  235. )
  236. XCTAssertEqual(
  237. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  238. .status(
  239. .init(
  240. code: .internalError,
  241. message:
  242. "The server picked a compression algorithm ('gzip') the client does not know about."
  243. ),
  244. Metadata(headers: serverInitialMetadata)
  245. )
  246. )
  247. }
  248. func testOverMaximumPayloadSize() throws {
  249. let handler = GRPCClientStreamHandler(
  250. methodDescriptor: .testTest,
  251. scheme: .http,
  252. authority: nil,
  253. outboundEncoding: .none,
  254. acceptedEncodings: [],
  255. maxPayloadSize: 1,
  256. skipStateMachineAssertions: true
  257. )
  258. let channel = EmbeddedChannel(handler: handler)
  259. // Send client's initial metadata
  260. XCTAssertNoThrow(
  261. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  262. )
  263. // Make sure we have sent right metadata.
  264. let writtenMetadata = try channel.assertReadHeadersOutbound()
  265. XCTAssertEqual(
  266. writtenMetadata.headers,
  267. [
  268. GRPCHTTP2Keys.method.rawValue: "POST",
  269. GRPCHTTP2Keys.scheme.rawValue: "http",
  270. GRPCHTTP2Keys.path.rawValue: "/test/test",
  271. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  272. GRPCHTTP2Keys.te.rawValue: "trailers",
  273. ]
  274. )
  275. // Server sends initial metadata
  276. let serverInitialMetadata: HPACKHeaders = [
  277. GRPCHTTP2Keys.status.rawValue: "200",
  278. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  279. ]
  280. XCTAssertNoThrow(
  281. try channel.writeInbound(
  282. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  283. )
  284. )
  285. XCTAssertEqual(
  286. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  287. .metadata(Metadata(headers: serverInitialMetadata))
  288. )
  289. // Server sends message over payload limit
  290. var buffer = ByteBuffer()
  291. buffer.writeInteger(UInt8(0)) // not compressed
  292. buffer.writeInteger(UInt32(42)) // message length
  293. buffer.writeRepeatingByte(0, count: 42) // message
  294. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  295. data: .byteBuffer(buffer),
  296. endStream: false
  297. )
  298. // Invalid payload should result in error status and stream being closed
  299. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  300. let part = try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self)
  301. XCTAssertEqual(
  302. part,
  303. .status(Status(code: .internalError, message: "Failed to decode message"), [:])
  304. )
  305. channel.embeddedEventLoop.run()
  306. try channel.closeFuture.wait()
  307. }
  308. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  309. let handler = GRPCClientStreamHandler(
  310. methodDescriptor: .testTest,
  311. scheme: .http,
  312. authority: nil,
  313. outboundEncoding: .none,
  314. acceptedEncodings: [],
  315. maxPayloadSize: 100,
  316. skipStateMachineAssertions: true
  317. )
  318. let channel = EmbeddedChannel(handler: handler)
  319. // Send client's initial metadata
  320. XCTAssertNoThrow(
  321. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  322. )
  323. // Make sure we have sent right metadata.
  324. let writtenMetadata = try channel.assertReadHeadersOutbound()
  325. XCTAssertEqual(
  326. writtenMetadata.headers,
  327. [
  328. GRPCHTTP2Keys.method.rawValue: "POST",
  329. GRPCHTTP2Keys.scheme.rawValue: "http",
  330. GRPCHTTP2Keys.path.rawValue: "/test/test",
  331. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  332. GRPCHTTP2Keys.te.rawValue: "trailers",
  333. ]
  334. )
  335. // Server sends initial metadata
  336. let serverInitialMetadata: HPACKHeaders = [
  337. GRPCHTTP2Keys.status.rawValue: "200",
  338. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  339. ]
  340. XCTAssertNoThrow(
  341. try channel.writeInbound(
  342. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  343. )
  344. )
  345. XCTAssertEqual(
  346. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  347. .metadata(Metadata(headers: serverInitialMetadata))
  348. )
  349. // Server sends message with EOS set.
  350. var buffer = ByteBuffer()
  351. buffer.writeInteger(UInt8(0)) // not compressed
  352. buffer.writeInteger(UInt32(42)) // message length
  353. buffer.writeRepeatingByte(0, count: 42) // message
  354. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  355. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  356. // Make sure we got status + trailers with the right error.
  357. XCTAssertEqual(
  358. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  359. .status(
  360. Status(
  361. code: .internalError,
  362. message:
  363. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  364. ),
  365. [:]
  366. )
  367. )
  368. }
  369. func testServerEndsStream() throws {
  370. let handler = GRPCClientStreamHandler(
  371. methodDescriptor: .testTest,
  372. scheme: .http,
  373. authority: nil,
  374. outboundEncoding: .none,
  375. acceptedEncodings: [],
  376. maxPayloadSize: 1,
  377. skipStateMachineAssertions: true
  378. )
  379. let channel = EmbeddedChannel(handler: handler)
  380. // Write client's initial metadata
  381. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  382. let clientInitialMetadata: HPACKHeaders = [
  383. GRPCHTTP2Keys.path.rawValue: "/test/test",
  384. GRPCHTTP2Keys.scheme.rawValue: "http",
  385. GRPCHTTP2Keys.method.rawValue: "POST",
  386. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  387. GRPCHTTP2Keys.te.rawValue: "trailers",
  388. ]
  389. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  390. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  391. // Receive server's initial metadata with end stream set
  392. let serverInitialMetadata: HPACKHeaders = [
  393. GRPCHTTP2Keys.status.rawValue: "200",
  394. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  395. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  396. ]
  397. XCTAssertNoThrow(
  398. try channel.writeInbound(
  399. HTTP2Frame.FramePayload.headers(
  400. .init(
  401. headers: serverInitialMetadata,
  402. endStream: true
  403. )
  404. )
  405. )
  406. )
  407. XCTAssertEqual(
  408. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  409. .status(
  410. .init(code: .ok, message: ""),
  411. [
  412. GRPCHTTP2Keys.status.rawValue: "200",
  413. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  414. ]
  415. )
  416. )
  417. // We should not throw if the server sends another message:
  418. // we should drop it, since the server is now closed.
  419. var buffer = ByteBuffer()
  420. buffer.writeInteger(UInt8(0)) // not compressed
  421. buffer.writeInteger(UInt32(42)) // message length
  422. buffer.writeRepeatingByte(0, count: 42) // message
  423. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  424. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  425. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  426. // We should also not throw if the server sends trailers again.
  427. XCTAssertNoThrow(
  428. try channel.writeInbound(
  429. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata, endStream: true))
  430. )
  431. )
  432. }
  433. func testNormalFlow() throws {
  434. let handler = GRPCClientStreamHandler(
  435. methodDescriptor: .testTest,
  436. scheme: .http,
  437. authority: nil,
  438. outboundEncoding: .none,
  439. acceptedEncodings: [],
  440. maxPayloadSize: 100,
  441. skipStateMachineAssertions: true
  442. )
  443. let channel = EmbeddedChannel(handler: handler)
  444. // Send client's initial metadata
  445. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  446. XCTAssertNoThrow(try channel.writeOutbound(request))
  447. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  448. let writtenHeaders = try channel.assertReadHeadersOutbound()
  449. XCTAssertEqual(
  450. writtenHeaders.headers,
  451. [
  452. GRPCHTTP2Keys.method.rawValue: "POST",
  453. GRPCHTTP2Keys.scheme.rawValue: "http",
  454. GRPCHTTP2Keys.path.rawValue: "/test/test",
  455. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  456. GRPCHTTP2Keys.te.rawValue: "trailers",
  457. ]
  458. )
  459. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  460. // Receive server's initial metadata
  461. let serverInitialMetadata: HPACKHeaders = [
  462. GRPCHTTP2Keys.status.rawValue: "200",
  463. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  464. "some-custom-header": "some-custom-value",
  465. ]
  466. XCTAssertNoThrow(
  467. try channel.writeInbound(
  468. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  469. )
  470. )
  471. XCTAssertEqual(
  472. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  473. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  474. )
  475. // Send a message
  476. XCTAssertNoThrow(
  477. try channel.writeOutbound(
  478. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  479. )
  480. )
  481. // Assert we wrote it successfully into the channel
  482. let writtenMessage = try channel.assertReadDataOutbound()
  483. var expectedBuffer = ByteBuffer()
  484. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  485. expectedBuffer.writeInteger(UInt32(42)) // message length
  486. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  487. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  488. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  489. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  490. // Make sure the EOS frame was sent
  491. let emptyEOSFrame = try channel.assertReadDataOutbound()
  492. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  493. XCTAssertTrue(emptyEOSFrame.endStream)
  494. // Make sure that, if we flush again, we're not writing anything else down
  495. // the stream. We should have closed at this point.
  496. channel.flush()
  497. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  498. // Make sure we cannot write anymore because client's closed.
  499. XCTAssertThrowsError(
  500. ofType: RPCError.self,
  501. try channel.writeOutbound(
  502. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  503. )
  504. ) { error in
  505. XCTAssertEqual(error.code, .internalError)
  506. XCTAssertEqual(error.message, "Invalid state")
  507. }
  508. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  509. // it will be thrown when writing inbound.
  510. try? channel.throwIfErrorCaught()
  511. // Server sends back response message
  512. var buffer = ByteBuffer()
  513. buffer.writeInteger(UInt8(0)) // not compressed
  514. buffer.writeInteger(UInt32(42)) // message length
  515. buffer.writeRepeatingByte(0, count: 42) // message
  516. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  517. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  518. // Make sure we read the message properly
  519. XCTAssertEqual(
  520. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  521. RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  522. )
  523. // Server sends status to end RPC
  524. XCTAssertNoThrow(
  525. try channel.writeInbound(
  526. HTTP2Frame.FramePayload.headers(
  527. .init(headers: [
  528. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  529. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  530. "custom-header": "custom-value",
  531. ])
  532. )
  533. )
  534. )
  535. XCTAssertEqual(
  536. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  537. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  538. )
  539. }
  540. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  541. let handler = GRPCClientStreamHandler(
  542. methodDescriptor: .testTest,
  543. scheme: .http,
  544. authority: nil,
  545. outboundEncoding: .none,
  546. acceptedEncodings: [],
  547. maxPayloadSize: 100,
  548. skipStateMachineAssertions: true
  549. )
  550. let channel = EmbeddedChannel(handler: handler)
  551. // Send client's initial metadata
  552. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  553. XCTAssertNoThrow(try channel.writeOutbound(request))
  554. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  555. let writtenHeaders = try channel.assertReadHeadersOutbound()
  556. XCTAssertEqual(
  557. writtenHeaders.headers,
  558. [
  559. GRPCHTTP2Keys.method.rawValue: "POST",
  560. GRPCHTTP2Keys.scheme.rawValue: "http",
  561. GRPCHTTP2Keys.path.rawValue: "/test/test",
  562. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  563. GRPCHTTP2Keys.te.rawValue: "trailers",
  564. ]
  565. )
  566. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  567. // Receive server's initial metadata
  568. let serverInitialMetadata: HPACKHeaders = [
  569. GRPCHTTP2Keys.status.rawValue: "200",
  570. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  571. "some-custom-header": "some-custom-value",
  572. ]
  573. XCTAssertNoThrow(
  574. try channel.writeInbound(
  575. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  576. )
  577. )
  578. XCTAssertEqual(
  579. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  580. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  581. )
  582. // Send a message
  583. XCTAssertNoThrow(
  584. try channel.writeOutbound(
  585. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  586. )
  587. )
  588. // Assert we wrote it successfully into the channel
  589. let writtenMessage = try channel.assertReadDataOutbound()
  590. var expectedBuffer = ByteBuffer()
  591. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  592. expectedBuffer.writeInteger(UInt32(42)) // message length
  593. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  594. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  595. // Receive server's first message
  596. var buffer = ByteBuffer()
  597. buffer.writeInteger(UInt8(0)) // not compressed
  598. XCTAssertNoThrow(
  599. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  600. )
  601. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  602. buffer.clear()
  603. buffer.writeInteger(UInt32(30)) // message length
  604. XCTAssertNoThrow(
  605. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  606. )
  607. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  608. buffer.clear()
  609. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  610. XCTAssertNoThrow(
  611. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  612. )
  613. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  614. buffer.clear()
  615. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  616. XCTAssertNoThrow(
  617. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  618. )
  619. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  620. buffer.clear()
  621. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  622. XCTAssertNoThrow(
  623. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  624. )
  625. var expected = ByteBuffer()
  626. expected.writeRepeatingByte(0, count: 10)
  627. expected.writeRepeatingByte(1, count: 10)
  628. expected.writeRepeatingByte(2, count: 10)
  629. // Make sure we read the message properly
  630. XCTAssertEqual(
  631. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  632. RPCResponsePart.message(GRPCNIOTransportBytes(expected))
  633. )
  634. }
  635. func testSendMultipleMessagesInSingleBuffer() throws {
  636. let handler = GRPCClientStreamHandler(
  637. methodDescriptor: .testTest,
  638. scheme: .http,
  639. authority: nil,
  640. outboundEncoding: .none,
  641. acceptedEncodings: [],
  642. maxPayloadSize: 100,
  643. skipStateMachineAssertions: true
  644. )
  645. let channel = EmbeddedChannel(handler: handler)
  646. // Send client's initial metadata
  647. let request = RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])
  648. XCTAssertNoThrow(try channel.writeOutbound(request))
  649. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  650. let writtenHeaders = try channel.assertReadHeadersOutbound()
  651. XCTAssertEqual(
  652. writtenHeaders.headers,
  653. [
  654. GRPCHTTP2Keys.method.rawValue: "POST",
  655. GRPCHTTP2Keys.scheme.rawValue: "http",
  656. GRPCHTTP2Keys.path.rawValue: "/test/test",
  657. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  658. GRPCHTTP2Keys.te.rawValue: "trailers",
  659. ]
  660. )
  661. XCTAssertNil(try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self))
  662. // Receive server's initial metadata
  663. let serverInitialMetadata: HPACKHeaders = [
  664. GRPCHTTP2Keys.status.rawValue: "200",
  665. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  666. "some-custom-header": "some-custom-value",
  667. ]
  668. XCTAssertNoThrow(
  669. try channel.writeInbound(
  670. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  671. )
  672. )
  673. XCTAssertEqual(
  674. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  675. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  676. )
  677. // This is where this test actually begins. We want to write two messages
  678. // without flushing, and make sure that no messages are sent down the pipeline
  679. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  680. // Write back first message and make sure nothing's written in the channel.
  681. XCTAssertNoThrow(
  682. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  683. )
  684. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  685. // Write back second message and make sure nothing's written in the channel.
  686. XCTAssertNoThrow(
  687. channel.write(RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  688. )
  689. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  690. // Now flush and check we *do* write the data.
  691. channel.flush()
  692. let writtenMessage = try channel.assertReadDataOutbound()
  693. // Make sure both messages have been framed together in the ByteBuffer.
  694. XCTAssertEqual(
  695. writtenMessage.data,
  696. .byteBuffer(
  697. .init(bytes: [
  698. // First message
  699. 0, // Compression disabled
  700. 0, 0, 0, 4, // Message length
  701. 1, 1, 1, 1, // First message data
  702. // Second message
  703. 0, // Compression disabled
  704. 0, 0, 0, 4, // Message length
  705. 2, 2, 2, 2, // Second message data
  706. ])
  707. )
  708. )
  709. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  710. }
  711. func testUnexpectedStreamClose_ErrorFired() throws {
  712. let handler = GRPCClientStreamHandler(
  713. methodDescriptor: .testTest,
  714. scheme: .http,
  715. authority: nil,
  716. outboundEncoding: .none,
  717. acceptedEncodings: [],
  718. maxPayloadSize: 1,
  719. skipStateMachineAssertions: true
  720. )
  721. let channel = EmbeddedChannel(handler: handler)
  722. // Write client's initial metadata
  723. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  724. let clientInitialMetadata: HPACKHeaders = [
  725. GRPCHTTP2Keys.path.rawValue: "/test/test",
  726. GRPCHTTP2Keys.scheme.rawValue: "http",
  727. GRPCHTTP2Keys.method.rawValue: "POST",
  728. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  729. GRPCHTTP2Keys.te.rawValue: "trailers",
  730. ]
  731. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  732. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  733. // An error is fired down the pipeline
  734. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  735. channel.pipeline.fireErrorCaught(thrownError)
  736. // The client receives a status explaining the stream was closed because of the thrown error.
  737. XCTAssertEqual(
  738. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  739. .status(
  740. .init(
  741. code: .unavailable,
  742. message: "Stream unexpectedly closed with error."
  743. ),
  744. [:]
  745. )
  746. )
  747. // We should now be closed: check we can't write anymore.
  748. XCTAssertThrowsError(
  749. ofType: RPCError.self,
  750. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  751. ) { error in
  752. XCTAssertEqual(error.code, .internalError)
  753. XCTAssertEqual(error.message, "Invalid state")
  754. }
  755. }
  756. func testUnexpectedStreamClose_ChannelInactive() throws {
  757. let handler = GRPCClientStreamHandler(
  758. methodDescriptor: .testTest,
  759. scheme: .http,
  760. authority: nil,
  761. outboundEncoding: .none,
  762. acceptedEncodings: [],
  763. maxPayloadSize: 1,
  764. skipStateMachineAssertions: true
  765. )
  766. let channel = EmbeddedChannel(handler: handler)
  767. // Write client's initial metadata
  768. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  769. let clientInitialMetadata: HPACKHeaders = [
  770. GRPCHTTP2Keys.path.rawValue: "/test/test",
  771. GRPCHTTP2Keys.scheme.rawValue: "http",
  772. GRPCHTTP2Keys.method.rawValue: "POST",
  773. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  774. GRPCHTTP2Keys.te.rawValue: "trailers",
  775. ]
  776. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  777. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  778. // Channel becomes inactive
  779. channel.pipeline.fireChannelInactive()
  780. // The client receives a status explaining the stream was closed.
  781. XCTAssertEqual(
  782. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  783. .status(
  784. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  785. [:]
  786. )
  787. )
  788. // We should now be closed: check we can't write anymore.
  789. XCTAssertThrowsError(
  790. ofType: RPCError.self,
  791. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  792. ) { error in
  793. XCTAssertEqual(error.code, .internalError)
  794. XCTAssertEqual(error.message, "Invalid state")
  795. }
  796. }
  797. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  798. let handler = GRPCClientStreamHandler(
  799. methodDescriptor: .testTest,
  800. scheme: .http,
  801. authority: nil,
  802. outboundEncoding: .none,
  803. acceptedEncodings: [],
  804. maxPayloadSize: 1,
  805. skipStateMachineAssertions: true
  806. )
  807. let channel = EmbeddedChannel(handler: handler)
  808. // Write client's initial metadata
  809. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:])))
  810. let clientInitialMetadata: HPACKHeaders = [
  811. GRPCHTTP2Keys.path.rawValue: "/test/test",
  812. GRPCHTTP2Keys.scheme.rawValue: "http",
  813. GRPCHTTP2Keys.method.rawValue: "POST",
  814. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  815. GRPCHTTP2Keys.te.rawValue: "trailers",
  816. ]
  817. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  818. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  819. // Receive RST_STREAM
  820. XCTAssertNoThrow(
  821. try channel.writeInbound(
  822. HTTP2Frame.FramePayload.rstStream(.internalError)
  823. )
  824. )
  825. // The client receives a status explaining RST_STREAM was sent.
  826. XCTAssertEqual(
  827. try channel.readInbound(as: RPCResponsePart<GRPCNIOTransportBytes>.self),
  828. .status(
  829. .init(
  830. code: .unavailable,
  831. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  832. ),
  833. [:]
  834. )
  835. )
  836. // We should now be closed: check we can't write anymore.
  837. XCTAssertThrowsError(
  838. ofType: RPCError.self,
  839. try channel.writeOutbound(RPCRequestPart<GRPCNIOTransportBytes>.metadata([:]))
  840. ) { error in
  841. XCTAssertEqual(error.code, .internalError)
  842. XCTAssertEqual(error.message, "Invalid state")
  843. }
  844. }
  845. }
  846. extension EmbeddedChannel {
  847. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  848. guard
  849. case .headers(let writtenHeaders) = try XCTUnwrap(
  850. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  851. )
  852. else {
  853. throw TestError.assertionFailure("Expected to write headers")
  854. }
  855. return writtenHeaders
  856. }
  857. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  858. guard
  859. case .data(let writtenMessage) = try XCTUnwrap(
  860. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  861. )
  862. else {
  863. throw TestError.assertionFailure("Expected to write data")
  864. }
  865. return writtenMessage
  866. }
  867. }
  868. private enum TestError: Error {
  869. case assertionFailure(String)
  870. }
  871. extension MethodDescriptor {
  872. static let testTest = Self(fullyQualifiedService: "test", method: "test")
  873. }