GRPCServerStreamHandlerTests.swift 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import Testing
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. private func makeServerStreamHandler(
  26. channel: any Channel,
  27. scheme: Scheme = .http,
  28. acceptedEncodings: CompressionAlgorithmSet = [],
  29. maxPayloadSize: Int = .max,
  30. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  31. disableAssertions: Bool = false
  32. ) -> GRPCServerStreamHandler {
  33. return GRPCServerStreamHandler(
  34. scheme: scheme,
  35. acceptedEncodings: acceptedEncodings,
  36. maxPayloadSize: maxPayloadSize,
  37. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  38. eventLoop: channel.eventLoop,
  39. skipStateMachineAssertions: disableAssertions
  40. )
  41. }
  42. func testH2FramesAreIgnored() throws {
  43. let channel = EmbeddedChannel()
  44. let handler = self.makeServerStreamHandler(channel: channel)
  45. try channel.pipeline.syncOperations.addHandler(handler)
  46. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  47. .ping(.init(), ack: false),
  48. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  49. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  50. // .priority(
  51. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  52. // ),
  53. .settings(.ack),
  54. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  55. .windowUpdate(windowSizeIncrement: 4),
  56. .alternativeService(origin: nil, field: nil),
  57. .origin([]),
  58. ]
  59. for toBeIgnored in framesToBeIgnored {
  60. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  61. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  62. }
  63. }
  64. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  65. let channel = EmbeddedChannel()
  66. let handler = self.makeServerStreamHandler(channel: channel)
  67. try channel.pipeline.syncOperations.addHandler(handler)
  68. // Receive client's initial metadata without content-type
  69. let clientInitialMetadata: HPACKHeaders = [
  70. GRPCHTTP2Keys.path.rawValue: "/test/test",
  71. GRPCHTTP2Keys.scheme.rawValue: "http",
  72. GRPCHTTP2Keys.method.rawValue: "POST",
  73. GRPCHTTP2Keys.te.rawValue: "trailers",
  74. ]
  75. XCTAssertNoThrow(
  76. try channel.writeInbound(
  77. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  78. )
  79. )
  80. // Make sure we have sent a trailers-only response
  81. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  82. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  83. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  84. }
  85. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  86. let channel = EmbeddedChannel()
  87. let handler = self.makeServerStreamHandler(channel: channel)
  88. try channel.pipeline.syncOperations.addHandler(handler)
  89. // Receive client's initial metadata without :method
  90. let clientInitialMetadata: HPACKHeaders = [
  91. GRPCHTTP2Keys.path.rawValue: "/test/test",
  92. GRPCHTTP2Keys.scheme.rawValue: "http",
  93. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  94. GRPCHTTP2Keys.te.rawValue: "trailers",
  95. ]
  96. XCTAssertNoThrow(
  97. try channel.writeInbound(
  98. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  99. )
  100. )
  101. // Make sure we have sent a trailers-only response
  102. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  103. XCTAssertEqual(
  104. writtenTrailersOnlyResponse.headers,
  105. [
  106. GRPCHTTP2Keys.status.rawValue: "200",
  107. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  108. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  109. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  110. ":method header is expected to be present and have a value of \"POST\".",
  111. ]
  112. )
  113. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  114. }
  115. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  116. let channel = EmbeddedChannel()
  117. let handler = self.makeServerStreamHandler(channel: channel)
  118. try channel.pipeline.syncOperations.addHandler(handler)
  119. // Receive client's initial metadata without :scheme
  120. let clientInitialMetadata: HPACKHeaders = [
  121. GRPCHTTP2Keys.path.rawValue: "/test/test",
  122. GRPCHTTP2Keys.method.rawValue: "POST",
  123. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  124. GRPCHTTP2Keys.te.rawValue: "trailers",
  125. ]
  126. XCTAssertNoThrow(
  127. try channel.writeInbound(
  128. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  129. )
  130. )
  131. // Make sure we have sent a trailers-only response
  132. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  133. XCTAssertEqual(
  134. writtenTrailersOnlyResponse.headers,
  135. [
  136. GRPCHTTP2Keys.status.rawValue: "200",
  137. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  138. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  139. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  140. ":scheme header must be present and one of \"http\" or \"https\".",
  141. ]
  142. )
  143. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  144. }
  145. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  146. let channel = EmbeddedChannel()
  147. let handler = self.makeServerStreamHandler(channel: channel)
  148. try channel.pipeline.syncOperations.addHandler(handler)
  149. // Receive client's initial metadata without :path
  150. let clientInitialMetadata: HPACKHeaders = [
  151. GRPCHTTP2Keys.scheme.rawValue: "http",
  152. GRPCHTTP2Keys.method.rawValue: "POST",
  153. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  154. GRPCHTTP2Keys.te.rawValue: "trailers",
  155. ]
  156. XCTAssertNoThrow(
  157. try channel.writeInbound(
  158. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  159. )
  160. )
  161. // Make sure we have sent a trailers-only response
  162. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  163. XCTAssertEqual(
  164. writtenTrailersOnlyResponse.headers,
  165. [
  166. GRPCHTTP2Keys.status.rawValue: "200",
  167. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  168. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  169. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  170. ]
  171. )
  172. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  173. }
  174. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  175. let channel = EmbeddedChannel()
  176. let handler = self.makeServerStreamHandler(channel: channel)
  177. try channel.pipeline.syncOperations.addHandler(handler)
  178. // Receive client's initial metadata
  179. let clientInitialMetadata: HPACKHeaders = [
  180. GRPCHTTP2Keys.path.rawValue: "/test/test",
  181. GRPCHTTP2Keys.scheme.rawValue: "http",
  182. GRPCHTTP2Keys.method.rawValue: "POST",
  183. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  184. GRPCHTTP2Keys.te.rawValue: "trailers",
  185. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  186. ]
  187. XCTAssertNoThrow(
  188. try channel.writeInbound(
  189. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  190. )
  191. )
  192. // Make sure we have sent a trailers-only response
  193. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  194. XCTAssertEqual(
  195. writtenTrailersOnlyResponse.headers,
  196. [
  197. GRPCHTTP2Keys.status.rawValue: "200",
  198. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  199. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  200. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  201. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  202. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  203. ]
  204. )
  205. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  206. }
  207. func testOverMaximumPayloadSize() throws {
  208. let channel = EmbeddedChannel()
  209. let handler = self.makeServerStreamHandler(channel: channel, maxPayloadSize: 1)
  210. try channel.pipeline.syncOperations.addHandler(handler)
  211. // Receive client's initial metadata
  212. let clientInitialMetadata: HPACKHeaders = [
  213. GRPCHTTP2Keys.path.rawValue: "/test/test",
  214. GRPCHTTP2Keys.scheme.rawValue: "http",
  215. GRPCHTTP2Keys.method.rawValue: "POST",
  216. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  217. GRPCHTTP2Keys.te.rawValue: "trailers",
  218. ]
  219. XCTAssertNoThrow(
  220. try channel.writeInbound(
  221. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  222. )
  223. )
  224. // Make sure we haven't sent back an error response, and that we read the initial metadata
  225. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  226. XCTAssertEqual(
  227. try channel.readInbound(as: RPCRequestPart.self),
  228. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  229. )
  230. // Write back server's initial metadata
  231. let headers: HPACKHeaders = [
  232. "some-custom-header": "some-custom-value"
  233. ]
  234. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  235. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  236. // Make sure we wrote back the initial metadata
  237. let writtenHeaders = try channel.assertReadHeadersOutbound()
  238. XCTAssertEqual(
  239. writtenHeaders.headers,
  240. [
  241. GRPCHTTP2Keys.status.rawValue: "200",
  242. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  243. "some-custom-header": "some-custom-value",
  244. ]
  245. )
  246. // Receive client's message
  247. var buffer = ByteBuffer()
  248. buffer.writeInteger(UInt8(0)) // not compressed
  249. buffer.writeInteger(UInt32(42)) // message length
  250. buffer.writeRepeatingByte(0, count: 42) // message
  251. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  252. XCTAssertThrowsError(
  253. ofType: RPCError.self,
  254. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  255. ) { error in
  256. XCTAssertEqual(error.code, .internalError)
  257. XCTAssertEqual(error.message, "Failed to decode message")
  258. }
  259. // Make sure we haven't sent a response back and that we didn't read the received message
  260. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  261. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  262. }
  263. func testClientEndsStream() throws {
  264. let channel = EmbeddedChannel()
  265. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  266. try channel.pipeline.syncOperations.addHandler(handler)
  267. // Receive client's initial metadata with end stream set
  268. let clientInitialMetadata: HPACKHeaders = [
  269. GRPCHTTP2Keys.path.rawValue: "/test/test",
  270. GRPCHTTP2Keys.scheme.rawValue: "http",
  271. GRPCHTTP2Keys.method.rawValue: "POST",
  272. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  273. GRPCHTTP2Keys.te.rawValue: "trailers",
  274. ]
  275. XCTAssertNoThrow(
  276. try channel.writeInbound(
  277. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  278. )
  279. )
  280. // Make sure we haven't sent back an error response, and that we read the initial metadata
  281. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  282. XCTAssertEqual(
  283. try channel.readInbound(as: RPCRequestPart.self),
  284. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  285. )
  286. // Write back server's initial metadata
  287. let headers: HPACKHeaders = [
  288. "some-custom-header": "some-custom-value"
  289. ]
  290. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  291. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  292. // Make sure we wrote back the initial metadata
  293. let writtenHeaders = try channel.assertReadHeadersOutbound()
  294. XCTAssertEqual(
  295. writtenHeaders.headers,
  296. [
  297. GRPCHTTP2Keys.status.rawValue: "200",
  298. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  299. "some-custom-header": "some-custom-value",
  300. ]
  301. )
  302. // We should throw if the client sends another message, since it's closed the stream already.
  303. var buffer = ByteBuffer()
  304. buffer.writeInteger(UInt8(0)) // not compressed
  305. buffer.writeInteger(UInt32(42)) // message length
  306. buffer.writeRepeatingByte(0, count: 42) // message
  307. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  308. XCTAssertThrowsError(
  309. ofType: RPCError.self,
  310. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  311. ) { error in
  312. XCTAssertEqual(error.code, .internalError)
  313. XCTAssertEqual(error.message, "Invalid state")
  314. }
  315. }
  316. func testNormalFlow() throws {
  317. let channel = EmbeddedChannel()
  318. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  319. try channel.pipeline.syncOperations.addHandler(handler)
  320. // Receive client's initial metadata
  321. let clientInitialMetadata: HPACKHeaders = [
  322. GRPCHTTP2Keys.path.rawValue: "/test/test",
  323. GRPCHTTP2Keys.scheme.rawValue: "http",
  324. GRPCHTTP2Keys.method.rawValue: "POST",
  325. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  326. GRPCHTTP2Keys.te.rawValue: "trailers",
  327. ]
  328. XCTAssertNoThrow(
  329. try channel.writeInbound(
  330. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  331. )
  332. )
  333. // Make sure we haven't sent back an error response, and that we read the initial metadata
  334. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  335. XCTAssertEqual(
  336. try channel.readInbound(as: RPCRequestPart.self),
  337. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  338. )
  339. // Write back server's initial metadata
  340. let headers: HPACKHeaders = [
  341. "some-custom-header": "some-custom-value"
  342. ]
  343. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  344. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  345. // Make sure we wrote back the initial metadata
  346. let writtenHeaders = try channel.assertReadHeadersOutbound()
  347. XCTAssertEqual(
  348. writtenHeaders.headers,
  349. [
  350. GRPCHTTP2Keys.status.rawValue: "200",
  351. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  352. "some-custom-header": "some-custom-value",
  353. ]
  354. )
  355. // Receive client's message
  356. var buffer = ByteBuffer()
  357. buffer.writeInteger(UInt8(0)) // not compressed
  358. buffer.writeInteger(UInt32(42)) // message length
  359. buffer.writeRepeatingByte(0, count: 42) // message
  360. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  361. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  362. // Make sure we haven't sent back an error response, and that we read the message properly
  363. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  364. XCTAssertEqual(
  365. try channel.readInbound(as: RPCRequestPart.self),
  366. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  367. )
  368. // Write back response
  369. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  370. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  371. // Make sure we wrote back the right message
  372. let writtenMessage = try channel.assertReadDataOutbound()
  373. var expectedBuffer = ByteBuffer()
  374. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  375. expectedBuffer.writeInteger(UInt32(42)) // message length
  376. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  377. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  378. // Send back status to end RPC
  379. let trailers = RPCResponsePart.status(
  380. .init(code: .dataLoss, message: "Test data loss"),
  381. ["custom-header": "custom-value"]
  382. )
  383. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  384. // Make sure we wrote back the status and trailers
  385. let writtenStatus = try channel.assertReadHeadersOutbound()
  386. XCTAssertTrue(writtenStatus.endStream)
  387. XCTAssertEqual(
  388. writtenStatus.headers,
  389. [
  390. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  391. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  392. "custom-header": "custom-value",
  393. ]
  394. )
  395. // Try writing and assert it throws to make sure we don't allow writes
  396. // after closing.
  397. XCTAssertThrowsError(
  398. ofType: RPCError.self,
  399. try channel.writeOutbound(trailers)
  400. ) { error in
  401. XCTAssertEqual(error.code, .internalError)
  402. XCTAssertEqual(error.message, "Invalid state")
  403. }
  404. }
  405. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  406. let channel = EmbeddedChannel()
  407. let handler = self.makeServerStreamHandler(channel: channel)
  408. try channel.pipeline.syncOperations.addHandler(handler)
  409. // Receive client's initial metadata
  410. let clientInitialMetadata: HPACKHeaders = [
  411. GRPCHTTP2Keys.path.rawValue: "/test/test",
  412. GRPCHTTP2Keys.scheme.rawValue: "http",
  413. GRPCHTTP2Keys.method.rawValue: "POST",
  414. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  415. GRPCHTTP2Keys.te.rawValue: "trailers",
  416. ]
  417. XCTAssertNoThrow(
  418. try channel.writeInbound(
  419. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  420. )
  421. )
  422. // Make sure we haven't sent back an error response, and that we read the initial metadata
  423. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  424. XCTAssertEqual(
  425. try channel.readInbound(as: RPCRequestPart.self),
  426. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  427. )
  428. // Write back server's initial metadata
  429. let headers: HPACKHeaders = [
  430. "some-custom-header": "some-custom-value"
  431. ]
  432. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  433. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  434. // Make sure we wrote back the initial metadata
  435. let writtenHeaders = try channel.assertReadHeadersOutbound()
  436. XCTAssertEqual(
  437. writtenHeaders.headers,
  438. [
  439. GRPCHTTP2Keys.status.rawValue: "200",
  440. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  441. "some-custom-header": "some-custom-value",
  442. ]
  443. )
  444. // Receive client's first message
  445. var buffer = ByteBuffer()
  446. buffer.writeInteger(UInt8(0)) // not compressed
  447. XCTAssertNoThrow(
  448. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  449. )
  450. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  451. buffer.clear()
  452. buffer.writeInteger(UInt32(30)) // message length
  453. XCTAssertNoThrow(
  454. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  455. )
  456. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  457. buffer.clear()
  458. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  459. XCTAssertNoThrow(
  460. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  461. )
  462. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  463. buffer.clear()
  464. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  465. XCTAssertNoThrow(
  466. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  467. )
  468. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  469. buffer.clear()
  470. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  471. XCTAssertNoThrow(
  472. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  473. )
  474. // Make sure we haven't sent back an error response, and that we read the message properly
  475. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  476. XCTAssertEqual(
  477. try channel.readInbound(as: RPCRequestPart.self),
  478. RPCRequestPart.message(
  479. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  480. + [UInt8](repeating: 2, count: 10)
  481. )
  482. )
  483. }
  484. func testReceiveMultipleHeaders() throws {
  485. let channel = EmbeddedChannel()
  486. let handler = self.makeServerStreamHandler(channel: channel)
  487. try channel.pipeline.syncOperations.addHandler(handler)
  488. // Receive client's initial metadata
  489. let clientInitialMetadata: HPACKHeaders = [
  490. GRPCHTTP2Keys.path.rawValue: "/test/test",
  491. GRPCHTTP2Keys.scheme.rawValue: "http",
  492. GRPCHTTP2Keys.method.rawValue: "POST",
  493. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  494. GRPCHTTP2Keys.te.rawValue: "trailers",
  495. ]
  496. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  497. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  498. // Receive them again. Should be a protocol violation.
  499. XCTAssertThrowsError(
  500. ofType: RPCError.self,
  501. try channel.writeInbound(
  502. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  503. )
  504. ) { error in
  505. XCTAssertEqual(error.code, .unavailable)
  506. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  507. }
  508. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  509. switch payload {
  510. case .rstStream(let errorCode):
  511. XCTAssertEqual(errorCode, .protocolError)
  512. default:
  513. XCTFail("Expected RST_STREAM, got \(payload)")
  514. }
  515. }
  516. func testSendMultipleMessagesInSingleBuffer() throws {
  517. let channel = EmbeddedChannel()
  518. let handler = self.makeServerStreamHandler(channel: channel)
  519. try channel.pipeline.syncOperations.addHandler(handler)
  520. // Receive client's initial metadata
  521. let clientInitialMetadata: HPACKHeaders = [
  522. GRPCHTTP2Keys.path.rawValue: "/test/test",
  523. GRPCHTTP2Keys.scheme.rawValue: "http",
  524. GRPCHTTP2Keys.method.rawValue: "POST",
  525. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  526. GRPCHTTP2Keys.te.rawValue: "trailers",
  527. ]
  528. XCTAssertNoThrow(
  529. try channel.writeInbound(
  530. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  531. )
  532. )
  533. // Make sure we haven't sent back an error response, and that we read the initial metadata
  534. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  535. XCTAssertEqual(
  536. try channel.readInbound(as: RPCRequestPart.self),
  537. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  538. )
  539. // Write back server's initial metadata
  540. let headers: HPACKHeaders = [
  541. "some-custom-header": "some-custom-value"
  542. ]
  543. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  544. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  545. // Read out the metadata
  546. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  547. // This is where this test actually begins. We want to write two messages
  548. // without flushing, and make sure that no messages are sent down the pipeline
  549. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  550. // Write back first message and make sure nothing's written in the channel.
  551. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  552. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  553. // Write back second message and make sure nothing's written in the channel.
  554. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  555. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  556. // Now flush and check we *do* write the data.
  557. channel.flush()
  558. let writtenMessage = try channel.assertReadDataOutbound()
  559. // Make sure both messages have been framed together in the ByteBuffer.
  560. XCTAssertEqual(
  561. writtenMessage.data,
  562. .byteBuffer(
  563. .init(bytes: [
  564. // First message
  565. 0, // Compression disabled
  566. 0, 0, 0, 4, // Message length
  567. 1, 1, 1, 1, // First message data
  568. // Second message
  569. 0, // Compression disabled
  570. 0, 0, 0, 4, // Message length
  571. 2, 2, 2, 2, // Second message data
  572. ])
  573. )
  574. )
  575. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  576. }
  577. func testMessageAndStatusAreNotReordered() throws {
  578. let channel = EmbeddedChannel()
  579. let handler = self.makeServerStreamHandler(channel: channel)
  580. try channel.pipeline.syncOperations.addHandler(handler)
  581. // Receive client's initial metadata
  582. let clientInitialMetadata: HPACKHeaders = [
  583. GRPCHTTP2Keys.path.rawValue: "/test/test",
  584. GRPCHTTP2Keys.scheme.rawValue: "http",
  585. GRPCHTTP2Keys.method.rawValue: "POST",
  586. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  587. GRPCHTTP2Keys.te.rawValue: "trailers",
  588. ]
  589. XCTAssertNoThrow(
  590. try channel.writeInbound(
  591. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  592. )
  593. )
  594. // Make sure we haven't sent back an error response, and that we read the initial metadata
  595. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  596. XCTAssertEqual(
  597. try channel.readInbound(as: RPCRequestPart.self),
  598. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  599. )
  600. // Write back server's initial metadata
  601. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  602. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  603. // Read out the metadata
  604. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  605. // This is where this test actually begins. We want to write a message followed
  606. // by status and trailers, and only flush after both writes.
  607. // Because messages are buffered and potentially bundled together in a single
  608. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  609. // and trailers won't be written before the messages.
  610. // Write back message and make sure nothing's written in the channel.
  611. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  612. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  613. // Write status + metadata and make sure nothing's written.
  614. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  615. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  616. // Now flush and check we *do* write the data in the right order: message first,
  617. // trailers second.
  618. channel.flush()
  619. let writtenMessage = try channel.assertReadDataOutbound()
  620. // Make sure we first get message.
  621. XCTAssertEqual(
  622. writtenMessage.data,
  623. .byteBuffer(
  624. .init(bytes: [
  625. // First message
  626. 0, // Compression disabled
  627. 0, 0, 0, 4, // Message length
  628. 1, 1, 1, 1, // First message data
  629. ])
  630. )
  631. )
  632. XCTAssertFalse(writtenMessage.endStream)
  633. // Make sure we get trailers.
  634. let writtenTrailers = try channel.assertReadHeadersOutbound()
  635. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  636. XCTAssertTrue(writtenTrailers.endStream)
  637. // Make sure we get nothing else.
  638. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  639. }
  640. func testMethodDescriptorPromiseSucceeds() throws {
  641. let channel = EmbeddedChannel()
  642. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  643. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  644. try channel.pipeline.syncOperations.addHandler(handler)
  645. // Receive client's initial metadata
  646. let clientInitialMetadata: HPACKHeaders = [
  647. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  648. GRPCHTTP2Keys.scheme.rawValue: "http",
  649. GRPCHTTP2Keys.method.rawValue: "POST",
  650. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  651. GRPCHTTP2Keys.te.rawValue: "trailers",
  652. ]
  653. XCTAssertNoThrow(
  654. try channel.writeInbound(
  655. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  656. )
  657. )
  658. // Make sure we haven't sent back an error response, and that we read the initial metadata
  659. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  660. XCTAssertEqual(
  661. try channel.readInbound(as: RPCRequestPart.self),
  662. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  663. )
  664. XCTAssertEqual(
  665. try promise.futureResult.wait(),
  666. MethodDescriptor(fullyQualifiedService: "SomeService", method: "SomeMethod")
  667. )
  668. }
  669. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  670. let channel = EmbeddedChannel()
  671. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  672. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  673. try channel.pipeline.syncOperations.addHandler(handler)
  674. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  675. XCTAssertThrowsError(
  676. ofType: RPCError.self,
  677. try promise.futureResult.wait()
  678. ) { error in
  679. XCTAssertEqual(error.code, .unavailable)
  680. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  681. }
  682. }
  683. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  684. let channel = EmbeddedChannel()
  685. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  686. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  687. try channel.pipeline.syncOperations.addHandler(handler)
  688. // Receive client's initial metadata
  689. let clientInitialMetadata: HPACKHeaders = [
  690. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  691. GRPCHTTP2Keys.scheme.rawValue: "http",
  692. GRPCHTTP2Keys.method.rawValue: "POST",
  693. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  694. GRPCHTTP2Keys.te.rawValue: "trailers",
  695. ]
  696. XCTAssertNoThrow(
  697. try channel.writeInbound(
  698. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  699. )
  700. )
  701. XCTAssertThrowsError(
  702. ofType: RPCError.self,
  703. try promise.futureResult.wait()
  704. ) { error in
  705. XCTAssertEqual(error.code, .unavailable)
  706. XCTAssertEqual(error.message, "RPC was rejected.")
  707. }
  708. }
  709. func testUnexpectedStreamClose_ErrorFired() throws {
  710. let channel = EmbeddedChannel()
  711. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  712. let handler = self.makeServerStreamHandler(
  713. channel: channel,
  714. descriptorPromise: promise,
  715. disableAssertions: true
  716. )
  717. try channel.pipeline.syncOperations.addHandler(handler)
  718. // Receive client's initial metadata
  719. let clientInitialMetadata: HPACKHeaders = [
  720. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  721. GRPCHTTP2Keys.scheme.rawValue: "http",
  722. GRPCHTTP2Keys.method.rawValue: "POST",
  723. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  724. GRPCHTTP2Keys.te.rawValue: "trailers",
  725. ]
  726. XCTAssertNoThrow(
  727. try channel.writeInbound(
  728. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  729. )
  730. )
  731. // Make sure we haven't sent back an error response, and that we read the initial metadata
  732. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  733. XCTAssertEqual(
  734. try channel.readInbound(as: RPCRequestPart.self),
  735. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  736. )
  737. // An error is fired down the pipeline
  738. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  739. channel.pipeline.fireErrorCaught(thrownError)
  740. // The server handler simply forwards the error.
  741. XCTAssertThrowsError(
  742. ofType: type(of: thrownError),
  743. try channel.throwIfErrorCaught()
  744. ) { error in
  745. XCTAssertEqual(error, thrownError)
  746. }
  747. // We should now be closed: check we can't write anymore.
  748. XCTAssertThrowsError(
  749. ofType: RPCError.self,
  750. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  751. ) { error in
  752. XCTAssertEqual(error.code, .internalError)
  753. XCTAssertEqual(error.message, "Invalid state")
  754. }
  755. }
  756. func testUnexpectedStreamClose_ChannelInactive() throws {
  757. let channel = EmbeddedChannel()
  758. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  759. let handler = self.makeServerStreamHandler(
  760. channel: channel,
  761. descriptorPromise: promise,
  762. disableAssertions: true
  763. )
  764. try channel.pipeline.syncOperations.addHandler(handler)
  765. // Receive client's initial metadata
  766. let clientInitialMetadata: HPACKHeaders = [
  767. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  768. GRPCHTTP2Keys.scheme.rawValue: "http",
  769. GRPCHTTP2Keys.method.rawValue: "POST",
  770. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  771. GRPCHTTP2Keys.te.rawValue: "trailers",
  772. ]
  773. XCTAssertNoThrow(
  774. try channel.writeInbound(
  775. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  776. )
  777. )
  778. // Make sure we haven't sent back an error response, and that we read the initial metadata
  779. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  780. XCTAssertEqual(
  781. try channel.readInbound(as: RPCRequestPart.self),
  782. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  783. )
  784. // Channel becomes inactive
  785. channel.pipeline.fireChannelInactive()
  786. // The server handler fires an error
  787. XCTAssertThrowsError(
  788. ofType: RPCError.self,
  789. try channel.throwIfErrorCaught()
  790. ) { error in
  791. XCTAssertEqual(error.code, .unavailable)
  792. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  793. }
  794. // We should now be closed: check we can't write anymore.
  795. XCTAssertThrowsError(
  796. ofType: RPCError.self,
  797. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  798. ) { error in
  799. XCTAssertEqual(error.code, .internalError)
  800. XCTAssertEqual(error.message, "Invalid state")
  801. }
  802. }
  803. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  804. let channel = EmbeddedChannel()
  805. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  806. let handler = self.makeServerStreamHandler(
  807. channel: channel,
  808. descriptorPromise: promise,
  809. disableAssertions: true
  810. )
  811. try channel.pipeline.syncOperations.addHandler(handler)
  812. // Receive client's initial metadata
  813. let clientInitialMetadata: HPACKHeaders = [
  814. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  815. GRPCHTTP2Keys.scheme.rawValue: "http",
  816. GRPCHTTP2Keys.method.rawValue: "POST",
  817. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  818. GRPCHTTP2Keys.te.rawValue: "trailers",
  819. ]
  820. XCTAssertNoThrow(
  821. try channel.writeInbound(
  822. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  823. )
  824. )
  825. // Make sure we haven't sent back an error response, and that we read the initial metadata
  826. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  827. XCTAssertEqual(
  828. try channel.readInbound(as: RPCRequestPart.self),
  829. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  830. )
  831. // We receive RST_STREAM frame
  832. // Assert the server handler fires an error
  833. XCTAssertThrowsError(
  834. ofType: RPCError.self,
  835. try channel.writeInbound(
  836. HTTP2Frame.FramePayload.rstStream(.internalError)
  837. )
  838. ) { error in
  839. XCTAssertEqual(error.code, .unavailable)
  840. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  841. }
  842. // We should now be closed: check we can't write anymore.
  843. XCTAssertThrowsError(
  844. ofType: RPCError.self,
  845. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  846. ) { error in
  847. XCTAssertEqual(error.code, .internalError)
  848. XCTAssertEqual(error.message, "Invalid state")
  849. }
  850. }
  851. }
  852. struct ServerStreamHandlerTests {
  853. private func makeServerStreamHandler(
  854. channel: any Channel,
  855. scheme: Scheme = .http,
  856. acceptedEncodings: CompressionAlgorithmSet = [],
  857. maxPayloadSize: Int = .max,
  858. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  859. disableAssertions: Bool = false
  860. ) -> GRPCServerStreamHandler {
  861. return GRPCServerStreamHandler(
  862. scheme: scheme,
  863. acceptedEncodings: acceptedEncodings,
  864. maxPayloadSize: maxPayloadSize,
  865. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  866. eventLoop: channel.eventLoop,
  867. skipStateMachineAssertions: disableAssertions
  868. )
  869. }
  870. @Test("ChannelShouldQuiesceEvent is buffered and turns into RPC cancellation")
  871. func shouldQuiesceEventIsBufferedBeforeHandleIsSet() async throws {
  872. let channel = EmbeddedChannel()
  873. let handler = self.makeServerStreamHandler(channel: channel)
  874. try channel.pipeline.syncOperations.addHandler(handler)
  875. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  876. await withServerContextRPCCancellationHandle { handle in
  877. handler.setCancellationHandle(handle)
  878. #expect(handle.isCancelled)
  879. }
  880. // Throwing is fine: the channel is closed abruptly, errors are expected.
  881. _ = try? channel.finish()
  882. }
  883. @Test("ChannelShouldQuiesceEvent turns into RPC cancellation")
  884. func shouldQuiesceEventTriggersCancellation() async throws {
  885. let channel = EmbeddedChannel()
  886. let handler = self.makeServerStreamHandler(channel: channel)
  887. try channel.pipeline.syncOperations.addHandler(handler)
  888. await withServerContextRPCCancellationHandle { handle in
  889. handler.setCancellationHandle(handle)
  890. #expect(!handle.isCancelled)
  891. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  892. #expect(handle.isCancelled)
  893. }
  894. // Throwing is fine: the channel is closed abruptly, errors are expected.
  895. _ = try? channel.finish()
  896. }
  897. @Test("RST_STREAM turns into RPC cancellation")
  898. func rstStreamTriggersCancellation() async throws {
  899. let channel = EmbeddedChannel()
  900. let handler = self.makeServerStreamHandler(channel: channel)
  901. try channel.pipeline.syncOperations.addHandler(handler)
  902. await withServerContextRPCCancellationHandle { handle in
  903. handler.setCancellationHandle(handle)
  904. #expect(!handle.isCancelled)
  905. let rstStream: HTTP2Frame.FramePayload = .rstStream(.cancel)
  906. channel.pipeline.fireChannelRead(NIOAny(rstStream))
  907. #expect(handle.isCancelled)
  908. }
  909. // Throwing is fine: the channel is closed abruptly, errors are expected.
  910. _ = try? channel.finish()
  911. }
  912. }
  913. extension EmbeddedChannel {
  914. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  915. guard
  916. case .headers(let writtenHeaders) = try XCTUnwrap(
  917. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  918. )
  919. else {
  920. throw TestError.assertionFailure("Expected to write headers")
  921. }
  922. return writtenHeaders
  923. }
  924. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  925. guard
  926. case .data(let writtenMessage) = try XCTUnwrap(
  927. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  928. )
  929. else {
  930. throw TestError.assertionFailure("Expected to write data")
  931. }
  932. return writtenMessage
  933. }
  934. }
  935. private enum TestError: Error {
  936. case assertionFailure(String)
  937. }