GRPCServerStreamHandlerTests.swift 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import Testing
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. private func makeServerStreamHandler(
  26. channel: any Channel,
  27. scheme: Scheme = .http,
  28. acceptedEncodings: CompressionAlgorithmSet = [],
  29. maxPayloadSize: Int = .max,
  30. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  31. disableAssertions: Bool = false
  32. ) -> GRPCServerStreamHandler {
  33. let serverConnectionManagementHandler = ServerConnectionManagementHandler(
  34. eventLoop: channel.eventLoop,
  35. maxIdleTime: nil,
  36. maxAge: nil,
  37. maxGraceTime: nil,
  38. keepaliveTime: nil,
  39. keepaliveTimeout: nil,
  40. allowKeepaliveWithoutCalls: false,
  41. minPingIntervalWithoutCalls: .minutes(5),
  42. requireALPN: false
  43. )
  44. return GRPCServerStreamHandler(
  45. scheme: scheme,
  46. acceptedEncodings: acceptedEncodings,
  47. maxPayloadSize: maxPayloadSize,
  48. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  49. eventLoop: channel.eventLoop,
  50. connectionManagementHandler: serverConnectionManagementHandler.syncView,
  51. skipStateMachineAssertions: disableAssertions
  52. )
  53. }
  54. func testH2FramesAreIgnored() throws {
  55. let channel = EmbeddedChannel()
  56. let handler = self.makeServerStreamHandler(channel: channel)
  57. try channel.pipeline.syncOperations.addHandler(handler)
  58. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  59. .ping(.init(), ack: false),
  60. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  61. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  62. // .priority(
  63. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  64. // ),
  65. .settings(.ack),
  66. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  67. .windowUpdate(windowSizeIncrement: 4),
  68. .alternativeService(origin: nil, field: nil),
  69. .origin([]),
  70. ]
  71. for toBeIgnored in framesToBeIgnored {
  72. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  73. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  74. }
  75. }
  76. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  77. let channel = EmbeddedChannel()
  78. let handler = self.makeServerStreamHandler(channel: channel)
  79. try channel.pipeline.syncOperations.addHandler(handler)
  80. // Receive client's initial metadata without content-type
  81. let clientInitialMetadata: HPACKHeaders = [
  82. GRPCHTTP2Keys.path.rawValue: "/test/test",
  83. GRPCHTTP2Keys.scheme.rawValue: "http",
  84. GRPCHTTP2Keys.method.rawValue: "POST",
  85. GRPCHTTP2Keys.te.rawValue: "trailers",
  86. ]
  87. XCTAssertNoThrow(
  88. try channel.writeInbound(
  89. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  90. )
  91. )
  92. // Make sure we have sent a trailers-only response
  93. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  94. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  95. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  96. }
  97. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  98. let channel = EmbeddedChannel()
  99. let handler = self.makeServerStreamHandler(channel: channel)
  100. try channel.pipeline.syncOperations.addHandler(handler)
  101. // Receive client's initial metadata without :method
  102. let clientInitialMetadata: HPACKHeaders = [
  103. GRPCHTTP2Keys.path.rawValue: "/test/test",
  104. GRPCHTTP2Keys.scheme.rawValue: "http",
  105. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  106. GRPCHTTP2Keys.te.rawValue: "trailers",
  107. ]
  108. XCTAssertNoThrow(
  109. try channel.writeInbound(
  110. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  111. )
  112. )
  113. // Make sure we have sent a trailers-only response
  114. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  115. XCTAssertEqual(
  116. writtenTrailersOnlyResponse.headers,
  117. [
  118. GRPCHTTP2Keys.status.rawValue: "200",
  119. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  120. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  121. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  122. ":method header is expected to be present and have a value of \"POST\".",
  123. ]
  124. )
  125. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  126. }
  127. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  128. let channel = EmbeddedChannel()
  129. let handler = self.makeServerStreamHandler(channel: channel)
  130. try channel.pipeline.syncOperations.addHandler(handler)
  131. // Receive client's initial metadata without :scheme
  132. let clientInitialMetadata: HPACKHeaders = [
  133. GRPCHTTP2Keys.path.rawValue: "/test/test",
  134. GRPCHTTP2Keys.method.rawValue: "POST",
  135. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  136. GRPCHTTP2Keys.te.rawValue: "trailers",
  137. ]
  138. XCTAssertNoThrow(
  139. try channel.writeInbound(
  140. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  141. )
  142. )
  143. // Make sure we have sent a trailers-only response
  144. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  145. XCTAssertEqual(
  146. writtenTrailersOnlyResponse.headers,
  147. [
  148. GRPCHTTP2Keys.status.rawValue: "200",
  149. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  150. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  151. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  152. ":scheme header must be present and one of \"http\" or \"https\".",
  153. ]
  154. )
  155. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  156. }
  157. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  158. let channel = EmbeddedChannel()
  159. let handler = self.makeServerStreamHandler(channel: channel)
  160. try channel.pipeline.syncOperations.addHandler(handler)
  161. // Receive client's initial metadata without :path
  162. let clientInitialMetadata: HPACKHeaders = [
  163. GRPCHTTP2Keys.scheme.rawValue: "http",
  164. GRPCHTTP2Keys.method.rawValue: "POST",
  165. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  166. GRPCHTTP2Keys.te.rawValue: "trailers",
  167. ]
  168. XCTAssertNoThrow(
  169. try channel.writeInbound(
  170. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  171. )
  172. )
  173. // Make sure we have sent a trailers-only response
  174. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  175. XCTAssertEqual(
  176. writtenTrailersOnlyResponse.headers,
  177. [
  178. GRPCHTTP2Keys.status.rawValue: "200",
  179. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  180. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  181. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  182. ]
  183. )
  184. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  185. }
  186. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  187. let channel = EmbeddedChannel()
  188. let handler = self.makeServerStreamHandler(channel: channel)
  189. try channel.pipeline.syncOperations.addHandler(handler)
  190. // Receive client's initial metadata
  191. let clientInitialMetadata: HPACKHeaders = [
  192. GRPCHTTP2Keys.path.rawValue: "/test/test",
  193. GRPCHTTP2Keys.scheme.rawValue: "http",
  194. GRPCHTTP2Keys.method.rawValue: "POST",
  195. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  196. GRPCHTTP2Keys.te.rawValue: "trailers",
  197. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  198. ]
  199. XCTAssertNoThrow(
  200. try channel.writeInbound(
  201. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  202. )
  203. )
  204. // Make sure we have sent a trailers-only response
  205. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  206. XCTAssertEqual(
  207. writtenTrailersOnlyResponse.headers,
  208. [
  209. GRPCHTTP2Keys.status.rawValue: "200",
  210. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  211. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  212. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  213. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  214. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  215. ]
  216. )
  217. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  218. }
  219. func testOverMaximumPayloadSize() throws {
  220. let channel = EmbeddedChannel()
  221. let handler = self.makeServerStreamHandler(channel: channel, maxPayloadSize: 1)
  222. try channel.pipeline.syncOperations.addHandler(handler)
  223. // Receive client's initial metadata
  224. let clientInitialMetadata: HPACKHeaders = [
  225. GRPCHTTP2Keys.path.rawValue: "/test/test",
  226. GRPCHTTP2Keys.scheme.rawValue: "http",
  227. GRPCHTTP2Keys.method.rawValue: "POST",
  228. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  229. GRPCHTTP2Keys.te.rawValue: "trailers",
  230. ]
  231. XCTAssertNoThrow(
  232. try channel.writeInbound(
  233. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  234. )
  235. )
  236. // Make sure we haven't sent back an error response, and that we read the initial metadata
  237. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  238. XCTAssertEqual(
  239. try channel.readInbound(as: RPCRequestPart.self),
  240. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  241. )
  242. // Write back server's initial metadata
  243. let headers: HPACKHeaders = [
  244. "some-custom-header": "some-custom-value"
  245. ]
  246. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  247. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  248. // Make sure we wrote back the initial metadata
  249. let writtenHeaders = try channel.assertReadHeadersOutbound()
  250. XCTAssertEqual(
  251. writtenHeaders.headers,
  252. [
  253. GRPCHTTP2Keys.status.rawValue: "200",
  254. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  255. "some-custom-header": "some-custom-value",
  256. ]
  257. )
  258. // Receive client's message
  259. var buffer = ByteBuffer()
  260. buffer.writeInteger(UInt8(0)) // not compressed
  261. buffer.writeInteger(UInt32(42)) // message length
  262. buffer.writeRepeatingByte(0, count: 42) // message
  263. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  264. XCTAssertThrowsError(
  265. ofType: RPCError.self,
  266. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  267. ) { error in
  268. XCTAssertEqual(error.code, .internalError)
  269. XCTAssertEqual(error.message, "Failed to decode message")
  270. }
  271. // Make sure we haven't sent a response back and that we didn't read the received message
  272. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  273. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  274. }
  275. func testClientEndsStream() throws {
  276. let channel = EmbeddedChannel()
  277. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  278. try channel.pipeline.syncOperations.addHandler(handler)
  279. // Receive client's initial metadata with end stream set
  280. let clientInitialMetadata: HPACKHeaders = [
  281. GRPCHTTP2Keys.path.rawValue: "/test/test",
  282. GRPCHTTP2Keys.scheme.rawValue: "http",
  283. GRPCHTTP2Keys.method.rawValue: "POST",
  284. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  285. GRPCHTTP2Keys.te.rawValue: "trailers",
  286. ]
  287. XCTAssertNoThrow(
  288. try channel.writeInbound(
  289. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  290. )
  291. )
  292. // Make sure we haven't sent back an error response, and that we read the initial metadata
  293. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  294. XCTAssertEqual(
  295. try channel.readInbound(as: RPCRequestPart.self),
  296. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  297. )
  298. // Write back server's initial metadata
  299. let headers: HPACKHeaders = [
  300. "some-custom-header": "some-custom-value"
  301. ]
  302. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  303. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  304. // Make sure we wrote back the initial metadata
  305. let writtenHeaders = try channel.assertReadHeadersOutbound()
  306. XCTAssertEqual(
  307. writtenHeaders.headers,
  308. [
  309. GRPCHTTP2Keys.status.rawValue: "200",
  310. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  311. "some-custom-header": "some-custom-value",
  312. ]
  313. )
  314. // We should throw if the client sends another message, since it's closed the stream already.
  315. var buffer = ByteBuffer()
  316. buffer.writeInteger(UInt8(0)) // not compressed
  317. buffer.writeInteger(UInt32(42)) // message length
  318. buffer.writeRepeatingByte(0, count: 42) // message
  319. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  320. XCTAssertThrowsError(
  321. ofType: RPCError.self,
  322. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  323. ) { error in
  324. XCTAssertEqual(error.code, .internalError)
  325. XCTAssertEqual(error.message, "Invalid state")
  326. }
  327. }
  328. func testNormalFlow() throws {
  329. let channel = EmbeddedChannel()
  330. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  331. try channel.pipeline.syncOperations.addHandler(handler)
  332. // Receive client's initial metadata
  333. let clientInitialMetadata: HPACKHeaders = [
  334. GRPCHTTP2Keys.path.rawValue: "/test/test",
  335. GRPCHTTP2Keys.scheme.rawValue: "http",
  336. GRPCHTTP2Keys.method.rawValue: "POST",
  337. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  338. GRPCHTTP2Keys.te.rawValue: "trailers",
  339. ]
  340. XCTAssertNoThrow(
  341. try channel.writeInbound(
  342. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  343. )
  344. )
  345. // Make sure we haven't sent back an error response, and that we read the initial metadata
  346. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  347. XCTAssertEqual(
  348. try channel.readInbound(as: RPCRequestPart.self),
  349. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  350. )
  351. // Write back server's initial metadata
  352. let headers: HPACKHeaders = [
  353. "some-custom-header": "some-custom-value"
  354. ]
  355. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  356. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  357. // Make sure we wrote back the initial metadata
  358. let writtenHeaders = try channel.assertReadHeadersOutbound()
  359. XCTAssertEqual(
  360. writtenHeaders.headers,
  361. [
  362. GRPCHTTP2Keys.status.rawValue: "200",
  363. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  364. "some-custom-header": "some-custom-value",
  365. ]
  366. )
  367. // Receive client's message
  368. var buffer = ByteBuffer()
  369. buffer.writeInteger(UInt8(0)) // not compressed
  370. buffer.writeInteger(UInt32(42)) // message length
  371. buffer.writeRepeatingByte(0, count: 42) // message
  372. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  373. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  374. // Make sure we haven't sent back an error response, and that we read the message properly
  375. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  376. XCTAssertEqual(
  377. try channel.readInbound(as: RPCRequestPart.self),
  378. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  379. )
  380. // Write back response
  381. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  382. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  383. // Make sure we wrote back the right message
  384. let writtenMessage = try channel.assertReadDataOutbound()
  385. var expectedBuffer = ByteBuffer()
  386. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  387. expectedBuffer.writeInteger(UInt32(42)) // message length
  388. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  389. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  390. // Send back status to end RPC
  391. let trailers = RPCResponsePart.status(
  392. .init(code: .dataLoss, message: "Test data loss"),
  393. ["custom-header": "custom-value"]
  394. )
  395. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  396. // Make sure we wrote back the status and trailers
  397. let writtenStatus = try channel.assertReadHeadersOutbound()
  398. XCTAssertTrue(writtenStatus.endStream)
  399. XCTAssertEqual(
  400. writtenStatus.headers,
  401. [
  402. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  403. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  404. "custom-header": "custom-value",
  405. ]
  406. )
  407. // Try writing and assert it throws to make sure we don't allow writes
  408. // after closing.
  409. XCTAssertThrowsError(
  410. ofType: RPCError.self,
  411. try channel.writeOutbound(trailers)
  412. ) { error in
  413. XCTAssertEqual(error.code, .internalError)
  414. XCTAssertEqual(error.message, "Invalid state")
  415. }
  416. }
  417. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  418. let channel = EmbeddedChannel()
  419. let handler = self.makeServerStreamHandler(channel: channel)
  420. try channel.pipeline.syncOperations.addHandler(handler)
  421. // Receive client's initial metadata
  422. let clientInitialMetadata: HPACKHeaders = [
  423. GRPCHTTP2Keys.path.rawValue: "/test/test",
  424. GRPCHTTP2Keys.scheme.rawValue: "http",
  425. GRPCHTTP2Keys.method.rawValue: "POST",
  426. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  427. GRPCHTTP2Keys.te.rawValue: "trailers",
  428. ]
  429. XCTAssertNoThrow(
  430. try channel.writeInbound(
  431. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  432. )
  433. )
  434. // Make sure we haven't sent back an error response, and that we read the initial metadata
  435. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  436. XCTAssertEqual(
  437. try channel.readInbound(as: RPCRequestPart.self),
  438. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  439. )
  440. // Write back server's initial metadata
  441. let headers: HPACKHeaders = [
  442. "some-custom-header": "some-custom-value"
  443. ]
  444. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  445. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  446. // Make sure we wrote back the initial metadata
  447. let writtenHeaders = try channel.assertReadHeadersOutbound()
  448. XCTAssertEqual(
  449. writtenHeaders.headers,
  450. [
  451. GRPCHTTP2Keys.status.rawValue: "200",
  452. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  453. "some-custom-header": "some-custom-value",
  454. ]
  455. )
  456. // Receive client's first message
  457. var buffer = ByteBuffer()
  458. buffer.writeInteger(UInt8(0)) // not compressed
  459. XCTAssertNoThrow(
  460. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  461. )
  462. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  463. buffer.clear()
  464. buffer.writeInteger(UInt32(30)) // message length
  465. XCTAssertNoThrow(
  466. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  467. )
  468. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  469. buffer.clear()
  470. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  471. XCTAssertNoThrow(
  472. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  473. )
  474. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  475. buffer.clear()
  476. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  477. XCTAssertNoThrow(
  478. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  479. )
  480. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  481. buffer.clear()
  482. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  483. XCTAssertNoThrow(
  484. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  485. )
  486. // Make sure we haven't sent back an error response, and that we read the message properly
  487. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  488. XCTAssertEqual(
  489. try channel.readInbound(as: RPCRequestPart.self),
  490. RPCRequestPart.message(
  491. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  492. + [UInt8](repeating: 2, count: 10)
  493. )
  494. )
  495. }
  496. func testReceiveMultipleHeaders() throws {
  497. let channel = EmbeddedChannel()
  498. let handler = self.makeServerStreamHandler(channel: channel)
  499. try channel.pipeline.syncOperations.addHandler(handler)
  500. // Receive client's initial metadata
  501. let clientInitialMetadata: HPACKHeaders = [
  502. GRPCHTTP2Keys.path.rawValue: "/test/test",
  503. GRPCHTTP2Keys.scheme.rawValue: "http",
  504. GRPCHTTP2Keys.method.rawValue: "POST",
  505. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  506. GRPCHTTP2Keys.te.rawValue: "trailers",
  507. ]
  508. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  509. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  510. // Receive them again. Should be a protocol violation.
  511. XCTAssertThrowsError(
  512. ofType: RPCError.self,
  513. try channel.writeInbound(
  514. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  515. )
  516. ) { error in
  517. XCTAssertEqual(error.code, .unavailable)
  518. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  519. }
  520. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  521. switch payload {
  522. case .rstStream(let errorCode):
  523. XCTAssertEqual(errorCode, .protocolError)
  524. default:
  525. XCTFail("Expected RST_STREAM, got \(payload)")
  526. }
  527. }
  528. func testSendMultipleMessagesInSingleBuffer() throws {
  529. let channel = EmbeddedChannel()
  530. let handler = self.makeServerStreamHandler(channel: channel)
  531. try channel.pipeline.syncOperations.addHandler(handler)
  532. // Receive client's initial metadata
  533. let clientInitialMetadata: HPACKHeaders = [
  534. GRPCHTTP2Keys.path.rawValue: "/test/test",
  535. GRPCHTTP2Keys.scheme.rawValue: "http",
  536. GRPCHTTP2Keys.method.rawValue: "POST",
  537. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  538. GRPCHTTP2Keys.te.rawValue: "trailers",
  539. ]
  540. XCTAssertNoThrow(
  541. try channel.writeInbound(
  542. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  543. )
  544. )
  545. // Make sure we haven't sent back an error response, and that we read the initial metadata
  546. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  547. XCTAssertEqual(
  548. try channel.readInbound(as: RPCRequestPart.self),
  549. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  550. )
  551. // Write back server's initial metadata
  552. let headers: HPACKHeaders = [
  553. "some-custom-header": "some-custom-value"
  554. ]
  555. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  556. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  557. // Read out the metadata
  558. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  559. // This is where this test actually begins. We want to write two messages
  560. // without flushing, and make sure that no messages are sent down the pipeline
  561. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  562. // Write back first message and make sure nothing's written in the channel.
  563. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  564. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  565. // Write back second message and make sure nothing's written in the channel.
  566. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  567. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  568. // Now flush and check we *do* write the data.
  569. channel.flush()
  570. let writtenMessage = try channel.assertReadDataOutbound()
  571. // Make sure both messages have been framed together in the ByteBuffer.
  572. XCTAssertEqual(
  573. writtenMessage.data,
  574. .byteBuffer(
  575. .init(bytes: [
  576. // First message
  577. 0, // Compression disabled
  578. 0, 0, 0, 4, // Message length
  579. 1, 1, 1, 1, // First message data
  580. // Second message
  581. 0, // Compression disabled
  582. 0, 0, 0, 4, // Message length
  583. 2, 2, 2, 2, // Second message data
  584. ])
  585. )
  586. )
  587. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  588. }
  589. func testMessageAndStatusAreNotReordered() throws {
  590. let channel = EmbeddedChannel()
  591. let handler = self.makeServerStreamHandler(channel: channel)
  592. try channel.pipeline.syncOperations.addHandler(handler)
  593. // Receive client's initial metadata
  594. let clientInitialMetadata: HPACKHeaders = [
  595. GRPCHTTP2Keys.path.rawValue: "/test/test",
  596. GRPCHTTP2Keys.scheme.rawValue: "http",
  597. GRPCHTTP2Keys.method.rawValue: "POST",
  598. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  599. GRPCHTTP2Keys.te.rawValue: "trailers",
  600. ]
  601. XCTAssertNoThrow(
  602. try channel.writeInbound(
  603. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  604. )
  605. )
  606. // Make sure we haven't sent back an error response, and that we read the initial metadata
  607. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  608. XCTAssertEqual(
  609. try channel.readInbound(as: RPCRequestPart.self),
  610. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  611. )
  612. // Write back server's initial metadata
  613. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  614. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  615. // Read out the metadata
  616. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  617. // This is where this test actually begins. We want to write a message followed
  618. // by status and trailers, and only flush after both writes.
  619. // Because messages are buffered and potentially bundled together in a single
  620. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  621. // and trailers won't be written before the messages.
  622. // Write back message and make sure nothing's written in the channel.
  623. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  624. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  625. // Write status + metadata and make sure nothing's written.
  626. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  627. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  628. // Now flush and check we *do* write the data in the right order: message first,
  629. // trailers second.
  630. channel.flush()
  631. let writtenMessage = try channel.assertReadDataOutbound()
  632. // Make sure we first get message.
  633. XCTAssertEqual(
  634. writtenMessage.data,
  635. .byteBuffer(
  636. .init(bytes: [
  637. // First message
  638. 0, // Compression disabled
  639. 0, 0, 0, 4, // Message length
  640. 1, 1, 1, 1, // First message data
  641. ])
  642. )
  643. )
  644. XCTAssertFalse(writtenMessage.endStream)
  645. // Make sure we get trailers.
  646. let writtenTrailers = try channel.assertReadHeadersOutbound()
  647. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  648. XCTAssertTrue(writtenTrailers.endStream)
  649. // Make sure we get nothing else.
  650. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  651. }
  652. func testMethodDescriptorPromiseSucceeds() throws {
  653. let channel = EmbeddedChannel()
  654. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  655. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  656. try channel.pipeline.syncOperations.addHandler(handler)
  657. // Receive client's initial metadata
  658. let clientInitialMetadata: HPACKHeaders = [
  659. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  660. GRPCHTTP2Keys.scheme.rawValue: "http",
  661. GRPCHTTP2Keys.method.rawValue: "POST",
  662. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  663. GRPCHTTP2Keys.te.rawValue: "trailers",
  664. ]
  665. XCTAssertNoThrow(
  666. try channel.writeInbound(
  667. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  668. )
  669. )
  670. // Make sure we haven't sent back an error response, and that we read the initial metadata
  671. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  672. XCTAssertEqual(
  673. try channel.readInbound(as: RPCRequestPart.self),
  674. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  675. )
  676. XCTAssertEqual(
  677. try promise.futureResult.wait(),
  678. MethodDescriptor(fullyQualifiedService: "SomeService", method: "SomeMethod")
  679. )
  680. }
  681. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  682. let channel = EmbeddedChannel()
  683. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  684. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  685. try channel.pipeline.syncOperations.addHandler(handler)
  686. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  687. XCTAssertThrowsError(
  688. ofType: RPCError.self,
  689. try promise.futureResult.wait()
  690. ) { error in
  691. XCTAssertEqual(error.code, .unavailable)
  692. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  693. }
  694. }
  695. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  696. let channel = EmbeddedChannel()
  697. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  698. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  699. try channel.pipeline.syncOperations.addHandler(handler)
  700. // Receive client's initial metadata
  701. let clientInitialMetadata: HPACKHeaders = [
  702. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  703. GRPCHTTP2Keys.scheme.rawValue: "http",
  704. GRPCHTTP2Keys.method.rawValue: "POST",
  705. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  706. GRPCHTTP2Keys.te.rawValue: "trailers",
  707. ]
  708. XCTAssertNoThrow(
  709. try channel.writeInbound(
  710. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  711. )
  712. )
  713. XCTAssertThrowsError(
  714. ofType: RPCError.self,
  715. try promise.futureResult.wait()
  716. ) { error in
  717. XCTAssertEqual(error.code, .unavailable)
  718. XCTAssertEqual(error.message, "RPC was rejected.")
  719. }
  720. }
  721. func testUnexpectedStreamClose_ErrorFired() throws {
  722. let channel = EmbeddedChannel()
  723. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  724. let handler = self.makeServerStreamHandler(
  725. channel: channel,
  726. descriptorPromise: promise,
  727. disableAssertions: true
  728. )
  729. try channel.pipeline.syncOperations.addHandler(handler)
  730. // Receive client's initial metadata
  731. let clientInitialMetadata: HPACKHeaders = [
  732. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  733. GRPCHTTP2Keys.scheme.rawValue: "http",
  734. GRPCHTTP2Keys.method.rawValue: "POST",
  735. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  736. GRPCHTTP2Keys.te.rawValue: "trailers",
  737. ]
  738. XCTAssertNoThrow(
  739. try channel.writeInbound(
  740. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  741. )
  742. )
  743. // Make sure we haven't sent back an error response, and that we read the initial metadata
  744. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  745. XCTAssertEqual(
  746. try channel.readInbound(as: RPCRequestPart.self),
  747. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  748. )
  749. // An error is fired down the pipeline
  750. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  751. channel.pipeline.fireErrorCaught(thrownError)
  752. // The server handler simply forwards the error.
  753. XCTAssertThrowsError(
  754. ofType: type(of: thrownError),
  755. try channel.throwIfErrorCaught()
  756. ) { error in
  757. XCTAssertEqual(error, thrownError)
  758. }
  759. // We should now be closed: check we can't write anymore.
  760. XCTAssertThrowsError(
  761. ofType: RPCError.self,
  762. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  763. ) { error in
  764. XCTAssertEqual(error.code, .internalError)
  765. XCTAssertEqual(error.message, "Invalid state")
  766. }
  767. }
  768. func testUnexpectedStreamClose_ChannelInactive() throws {
  769. let channel = EmbeddedChannel()
  770. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  771. let handler = self.makeServerStreamHandler(
  772. channel: channel,
  773. descriptorPromise: promise,
  774. disableAssertions: true
  775. )
  776. try channel.pipeline.syncOperations.addHandler(handler)
  777. // Receive client's initial metadata
  778. let clientInitialMetadata: HPACKHeaders = [
  779. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  780. GRPCHTTP2Keys.scheme.rawValue: "http",
  781. GRPCHTTP2Keys.method.rawValue: "POST",
  782. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  783. GRPCHTTP2Keys.te.rawValue: "trailers",
  784. ]
  785. XCTAssertNoThrow(
  786. try channel.writeInbound(
  787. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  788. )
  789. )
  790. // Make sure we haven't sent back an error response, and that we read the initial metadata
  791. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  792. XCTAssertEqual(
  793. try channel.readInbound(as: RPCRequestPart.self),
  794. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  795. )
  796. // Channel becomes inactive
  797. channel.pipeline.fireChannelInactive()
  798. // The server handler fires an error
  799. XCTAssertThrowsError(
  800. ofType: RPCError.self,
  801. try channel.throwIfErrorCaught()
  802. ) { error in
  803. XCTAssertEqual(error.code, .unavailable)
  804. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  805. }
  806. // We should now be closed: check we can't write anymore.
  807. XCTAssertThrowsError(
  808. ofType: RPCError.self,
  809. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  810. ) { error in
  811. XCTAssertEqual(error.code, .internalError)
  812. XCTAssertEqual(error.message, "Invalid state")
  813. }
  814. }
  815. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  816. let channel = EmbeddedChannel()
  817. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  818. let handler = self.makeServerStreamHandler(
  819. channel: channel,
  820. descriptorPromise: promise,
  821. disableAssertions: true
  822. )
  823. try channel.pipeline.syncOperations.addHandler(handler)
  824. // Receive client's initial metadata
  825. let clientInitialMetadata: HPACKHeaders = [
  826. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  827. GRPCHTTP2Keys.scheme.rawValue: "http",
  828. GRPCHTTP2Keys.method.rawValue: "POST",
  829. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  830. GRPCHTTP2Keys.te.rawValue: "trailers",
  831. ]
  832. XCTAssertNoThrow(
  833. try channel.writeInbound(
  834. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  835. )
  836. )
  837. // Make sure we haven't sent back an error response, and that we read the initial metadata
  838. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  839. XCTAssertEqual(
  840. try channel.readInbound(as: RPCRequestPart.self),
  841. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  842. )
  843. // We receive RST_STREAM frame
  844. // Assert the server handler fires an error
  845. XCTAssertThrowsError(
  846. ofType: RPCError.self,
  847. try channel.writeInbound(
  848. HTTP2Frame.FramePayload.rstStream(.internalError)
  849. )
  850. ) { error in
  851. XCTAssertEqual(error.code, .unavailable)
  852. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  853. }
  854. // We should now be closed: check we can't write anymore.
  855. XCTAssertThrowsError(
  856. ofType: RPCError.self,
  857. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  858. ) { error in
  859. XCTAssertEqual(error.code, .internalError)
  860. XCTAssertEqual(error.message, "Invalid state")
  861. }
  862. }
  863. }
  864. struct ServerStreamHandlerTests {
  865. struct ConnectionAndStreamHandlers {
  866. let streamHandler: GRPCServerStreamHandler
  867. let connectionHandler: ServerConnectionManagementHandler
  868. }
  869. private func makeServerConnectionAndStreamHandlers(
  870. channel: any Channel,
  871. scheme: Scheme = .http,
  872. acceptedEncodings: CompressionAlgorithmSet = [],
  873. maxPayloadSize: Int = .max,
  874. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  875. disableAssertions: Bool = false
  876. ) -> ConnectionAndStreamHandlers {
  877. let connectionManagementHandler = ServerConnectionManagementHandler(
  878. eventLoop: channel.eventLoop,
  879. maxIdleTime: nil,
  880. maxAge: nil,
  881. maxGraceTime: nil,
  882. keepaliveTime: nil,
  883. keepaliveTimeout: nil,
  884. allowKeepaliveWithoutCalls: false,
  885. minPingIntervalWithoutCalls: .minutes(5),
  886. requireALPN: false
  887. )
  888. let streamHandler = GRPCServerStreamHandler(
  889. scheme: scheme,
  890. acceptedEncodings: acceptedEncodings,
  891. maxPayloadSize: maxPayloadSize,
  892. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  893. eventLoop: channel.eventLoop,
  894. connectionManagementHandler: connectionManagementHandler.syncView,
  895. skipStateMachineAssertions: disableAssertions
  896. )
  897. return ConnectionAndStreamHandlers(
  898. streamHandler: streamHandler,
  899. connectionHandler: connectionManagementHandler
  900. )
  901. }
  902. @Test("ChannelShouldQuiesceEvent is buffered and turns into RPC cancellation")
  903. func shouldQuiesceEventIsBufferedBeforeHandleIsSet() async throws {
  904. let channel = EmbeddedChannel()
  905. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  906. try channel.pipeline.syncOperations.addHandler(handler)
  907. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  908. await withServerContextRPCCancellationHandle { handle in
  909. handler.setCancellationHandle(handle)
  910. #expect(handle.isCancelled)
  911. }
  912. // Throwing is fine: the channel is closed abruptly, errors are expected.
  913. _ = try? channel.finish()
  914. }
  915. @Test("ChannelShouldQuiesceEvent turns into RPC cancellation")
  916. func shouldQuiesceEventTriggersCancellation() async throws {
  917. let channel = EmbeddedChannel()
  918. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  919. try channel.pipeline.syncOperations.addHandler(handler)
  920. await withServerContextRPCCancellationHandle { handle in
  921. handler.setCancellationHandle(handle)
  922. #expect(!handle.isCancelled)
  923. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  924. #expect(handle.isCancelled)
  925. }
  926. // Throwing is fine: the channel is closed abruptly, errors are expected.
  927. _ = try? channel.finish()
  928. }
  929. @Test("RST_STREAM turns into RPC cancellation")
  930. func rstStreamTriggersCancellation() async throws {
  931. let channel = EmbeddedChannel()
  932. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  933. try channel.pipeline.syncOperations.addHandler(handler)
  934. await withServerContextRPCCancellationHandle { handle in
  935. handler.setCancellationHandle(handle)
  936. #expect(!handle.isCancelled)
  937. let rstStream: HTTP2Frame.FramePayload = .rstStream(.cancel)
  938. channel.pipeline.fireChannelRead(NIOAny(rstStream))
  939. #expect(handle.isCancelled)
  940. }
  941. // Throwing is fine: the channel is closed abruptly, errors are expected.
  942. _ = try? channel.finish()
  943. }
  944. @Test("Connection FrameStats are updated when writing headers or data frames")
  945. func connectionFrameStatsAreUpdatedAccordingly() async throws {
  946. let channel = EmbeddedChannel()
  947. let handlers = self.makeServerConnectionAndStreamHandlers(channel: channel)
  948. try channel.pipeline.syncOperations.addHandler(handlers.streamHandler)
  949. // We have written nothing yet, so expect FrameStats/didWriteHeadersOrData to be false
  950. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  951. // FrameStats aren't affected by pings received
  952. channel.pipeline.fireChannelRead(
  953. NIOAny(HTTP2Frame.FramePayload.ping(.init(withInteger: 42), ack: false))
  954. )
  955. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  956. // Now write back headers and make sure FrameStats are updated accordingly:
  957. // To do that, we first need to receive client's initial metadata...
  958. let clientInitialMetadata: HPACKHeaders = [
  959. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  960. GRPCHTTP2Keys.scheme.rawValue: "http",
  961. GRPCHTTP2Keys.method.rawValue: "POST",
  962. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  963. GRPCHTTP2Keys.te.rawValue: "trailers",
  964. ]
  965. try channel.writeInbound(
  966. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  967. )
  968. // Now we write back server's initial metadata...
  969. let serverInitialMetadata = RPCResponsePart.metadata([:])
  970. try channel.writeOutbound(serverInitialMetadata)
  971. // And this should have updated the FrameStats
  972. #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  973. // Manually reset the FrameStats to make sure that writing data also updates it correctly.
  974. handlers.connectionHandler.frameStats.reset()
  975. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  976. try channel.writeOutbound(RPCResponsePart.message([42]))
  977. #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  978. // Clean up.
  979. // Throwing is fine: the channel is closed abruptly, errors are expected.
  980. _ = try? channel.finish()
  981. }
  982. }
  983. extension EmbeddedChannel {
  984. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  985. guard
  986. case .headers(let writtenHeaders) = try XCTUnwrap(
  987. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  988. )
  989. else {
  990. throw TestError.assertionFailure("Expected to write headers")
  991. }
  992. return writtenHeaders
  993. }
  994. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  995. guard
  996. case .data(let writtenMessage) = try XCTUnwrap(
  997. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  998. )
  999. else {
  1000. throw TestError.assertionFailure("Expected to write data")
  1001. }
  1002. return writtenMessage
  1003. }
  1004. }
  1005. private enum TestError: Error {
  1006. case assertionFailure(String)
  1007. }