GRPCServerStreamHandlerTests.swift 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import Testing
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. @available(gRPCSwiftNIOTransport 2.0, *)
  25. final class GRPCServerStreamHandlerTests: XCTestCase {
  26. private func makeServerStreamHandler(
  27. channel: any Channel,
  28. scheme: Scheme = .http,
  29. acceptedEncodings: CompressionAlgorithmSet = [],
  30. maxPayloadSize: Int = .max,
  31. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  32. disableAssertions: Bool = false
  33. ) -> GRPCServerStreamHandler {
  34. return GRPCServerStreamHandler(
  35. scheme: scheme,
  36. acceptedEncodings: acceptedEncodings,
  37. maxPayloadSize: maxPayloadSize,
  38. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  39. eventLoop: channel.eventLoop,
  40. skipStateMachineAssertions: disableAssertions
  41. )
  42. }
  43. func testH2FramesAreIgnored() throws {
  44. let channel = EmbeddedChannel()
  45. let handler = self.makeServerStreamHandler(channel: channel)
  46. try channel.pipeline.syncOperations.addHandler(handler)
  47. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  48. .ping(.init(), ack: false),
  49. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  50. .priority(
  51. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  52. ),
  53. .settings(.ack),
  54. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  55. .windowUpdate(windowSizeIncrement: 4),
  56. .alternativeService(origin: nil, field: nil),
  57. .origin([]),
  58. ]
  59. for toBeIgnored in framesToBeIgnored {
  60. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  61. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  62. }
  63. }
  64. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  65. let channel = EmbeddedChannel()
  66. let handler = self.makeServerStreamHandler(channel: channel)
  67. try channel.pipeline.syncOperations.addHandler(handler)
  68. // Receive client's initial metadata without content-type
  69. let clientInitialMetadata: HPACKHeaders = [
  70. GRPCHTTP2Keys.path.rawValue: "/test/test",
  71. GRPCHTTP2Keys.scheme.rawValue: "http",
  72. GRPCHTTP2Keys.method.rawValue: "POST",
  73. GRPCHTTP2Keys.te.rawValue: "trailers",
  74. ]
  75. XCTAssertNoThrow(
  76. try channel.writeInbound(
  77. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  78. )
  79. )
  80. // Make sure we have sent a trailers-only response
  81. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  82. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  83. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  84. }
  85. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  86. let channel = EmbeddedChannel()
  87. let handler = self.makeServerStreamHandler(channel: channel)
  88. try channel.pipeline.syncOperations.addHandler(handler)
  89. // Receive client's initial metadata without :method
  90. let clientInitialMetadata: HPACKHeaders = [
  91. GRPCHTTP2Keys.path.rawValue: "/test/test",
  92. GRPCHTTP2Keys.scheme.rawValue: "http",
  93. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  94. GRPCHTTP2Keys.te.rawValue: "trailers",
  95. ]
  96. XCTAssertNoThrow(
  97. try channel.writeInbound(
  98. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  99. )
  100. )
  101. // Make sure we have sent a trailers-only response
  102. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  103. XCTAssertEqual(
  104. writtenTrailersOnlyResponse.headers,
  105. [
  106. GRPCHTTP2Keys.status.rawValue: "200",
  107. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  108. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  109. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  110. ":method header is expected to be present and have a value of \"POST\".",
  111. ]
  112. )
  113. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  114. }
  115. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  116. let channel = EmbeddedChannel()
  117. let handler = self.makeServerStreamHandler(channel: channel)
  118. try channel.pipeline.syncOperations.addHandler(handler)
  119. // Receive client's initial metadata without :scheme
  120. let clientInitialMetadata: HPACKHeaders = [
  121. GRPCHTTP2Keys.path.rawValue: "/test/test",
  122. GRPCHTTP2Keys.method.rawValue: "POST",
  123. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  124. GRPCHTTP2Keys.te.rawValue: "trailers",
  125. ]
  126. XCTAssertNoThrow(
  127. try channel.writeInbound(
  128. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  129. )
  130. )
  131. // Make sure we have sent a trailers-only response
  132. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  133. XCTAssertEqual(
  134. writtenTrailersOnlyResponse.headers,
  135. [
  136. GRPCHTTP2Keys.status.rawValue: "200",
  137. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  138. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  139. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  140. ":scheme header must be present and one of \"http\" or \"https\".",
  141. ]
  142. )
  143. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  144. }
  145. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  146. let channel = EmbeddedChannel()
  147. let handler = self.makeServerStreamHandler(channel: channel)
  148. try channel.pipeline.syncOperations.addHandler(handler)
  149. // Receive client's initial metadata without :path
  150. let clientInitialMetadata: HPACKHeaders = [
  151. GRPCHTTP2Keys.scheme.rawValue: "http",
  152. GRPCHTTP2Keys.method.rawValue: "POST",
  153. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  154. GRPCHTTP2Keys.te.rawValue: "trailers",
  155. ]
  156. XCTAssertNoThrow(
  157. try channel.writeInbound(
  158. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  159. )
  160. )
  161. // Make sure we have sent a trailers-only response
  162. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  163. XCTAssertEqual(
  164. writtenTrailersOnlyResponse.headers,
  165. [
  166. GRPCHTTP2Keys.status.rawValue: "200",
  167. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  168. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  169. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  170. ]
  171. )
  172. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  173. }
  174. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  175. let channel = EmbeddedChannel()
  176. let handler = self.makeServerStreamHandler(channel: channel)
  177. try channel.pipeline.syncOperations.addHandler(handler)
  178. // Receive client's initial metadata
  179. let clientInitialMetadata: HPACKHeaders = [
  180. GRPCHTTP2Keys.path.rawValue: "/test/test",
  181. GRPCHTTP2Keys.scheme.rawValue: "http",
  182. GRPCHTTP2Keys.method.rawValue: "POST",
  183. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  184. GRPCHTTP2Keys.te.rawValue: "trailers",
  185. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  186. ]
  187. XCTAssertNoThrow(
  188. try channel.writeInbound(
  189. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  190. )
  191. )
  192. // Make sure we have sent a trailers-only response
  193. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  194. XCTAssertEqual(
  195. writtenTrailersOnlyResponse.headers,
  196. [
  197. GRPCHTTP2Keys.status.rawValue: "200",
  198. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  199. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  200. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  201. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  202. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  203. ]
  204. )
  205. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  206. }
  207. func testOverMaximumPayloadSize() throws {
  208. let channel = EmbeddedChannel()
  209. let handler = self.makeServerStreamHandler(channel: channel, maxPayloadSize: 1)
  210. try channel.pipeline.syncOperations.addHandler(handler)
  211. // Receive client's initial metadata
  212. let clientInitialMetadata: HPACKHeaders = [
  213. GRPCHTTP2Keys.path.rawValue: "/test/test",
  214. GRPCHTTP2Keys.scheme.rawValue: "http",
  215. GRPCHTTP2Keys.method.rawValue: "POST",
  216. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  217. GRPCHTTP2Keys.te.rawValue: "trailers",
  218. ]
  219. XCTAssertNoThrow(
  220. try channel.writeInbound(
  221. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  222. )
  223. )
  224. // Make sure we haven't sent back an error response, and that we read the initial metadata
  225. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  226. XCTAssertEqual(
  227. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  228. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  229. )
  230. // Write back server's initial metadata
  231. let headers: HPACKHeaders = [
  232. "some-custom-header": "some-custom-value"
  233. ]
  234. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  235. Metadata(headers: headers)
  236. )
  237. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  238. // Make sure we wrote back the initial metadata
  239. let writtenHeaders = try channel.assertReadHeadersOutbound()
  240. XCTAssertEqual(
  241. writtenHeaders.headers,
  242. [
  243. GRPCHTTP2Keys.status.rawValue: "200",
  244. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  245. "some-custom-header": "some-custom-value",
  246. ]
  247. )
  248. // Receive client's message
  249. var buffer = ByteBuffer()
  250. buffer.writeInteger(UInt8(0)) // not compressed
  251. buffer.writeInteger(UInt32(42)) // message length
  252. buffer.writeRepeatingByte(0, count: 42) // message
  253. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  254. XCTAssertThrowsError(
  255. ofType: RPCError.self,
  256. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  257. ) { error in
  258. XCTAssertEqual(error.code, .internalError)
  259. XCTAssertEqual(error.message, "Failed to decode message")
  260. }
  261. // Make sure we haven't sent a response back and that we didn't read the received message
  262. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  263. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  264. }
  265. func testClientEndsStream() throws {
  266. let channel = EmbeddedChannel()
  267. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  268. try channel.pipeline.syncOperations.addHandler(handler)
  269. // Receive client's initial metadata with end stream set
  270. let clientInitialMetadata: HPACKHeaders = [
  271. GRPCHTTP2Keys.path.rawValue: "/test/test",
  272. GRPCHTTP2Keys.scheme.rawValue: "http",
  273. GRPCHTTP2Keys.method.rawValue: "POST",
  274. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  275. GRPCHTTP2Keys.te.rawValue: "trailers",
  276. ]
  277. XCTAssertNoThrow(
  278. try channel.writeInbound(
  279. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  280. )
  281. )
  282. // Make sure we haven't sent back an error response, and that we read the initial metadata
  283. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  284. XCTAssertEqual(
  285. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  286. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  287. )
  288. // Write back server's initial metadata
  289. let headers: HPACKHeaders = [
  290. "some-custom-header": "some-custom-value"
  291. ]
  292. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  293. Metadata(headers: headers)
  294. )
  295. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  296. // Make sure we wrote back the initial metadata
  297. let writtenHeaders = try channel.assertReadHeadersOutbound()
  298. XCTAssertEqual(
  299. writtenHeaders.headers,
  300. [
  301. GRPCHTTP2Keys.status.rawValue: "200",
  302. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  303. "some-custom-header": "some-custom-value",
  304. ]
  305. )
  306. // We should throw if the client sends another message, since it's closed the stream already.
  307. var buffer = ByteBuffer()
  308. buffer.writeInteger(UInt8(0)) // not compressed
  309. buffer.writeInteger(UInt32(42)) // message length
  310. buffer.writeRepeatingByte(0, count: 42) // message
  311. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  312. XCTAssertThrowsError(
  313. ofType: RPCError.self,
  314. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  315. ) { error in
  316. XCTAssertEqual(error.code, .internalError)
  317. XCTAssertEqual(error.message, "Invalid state")
  318. }
  319. }
  320. func testNormalFlow() throws {
  321. let channel = EmbeddedChannel()
  322. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  323. try channel.pipeline.syncOperations.addHandler(handler)
  324. // Receive client's initial metadata
  325. let clientInitialMetadata: HPACKHeaders = [
  326. GRPCHTTP2Keys.path.rawValue: "/test/test",
  327. GRPCHTTP2Keys.scheme.rawValue: "http",
  328. GRPCHTTP2Keys.method.rawValue: "POST",
  329. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  330. GRPCHTTP2Keys.te.rawValue: "trailers",
  331. ]
  332. XCTAssertNoThrow(
  333. try channel.writeInbound(
  334. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  335. )
  336. )
  337. // Make sure we haven't sent back an error response, and that we read the initial metadata
  338. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  339. XCTAssertEqual(
  340. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  341. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  342. )
  343. // Write back server's initial metadata
  344. let headers: HPACKHeaders = [
  345. "some-custom-header": "some-custom-value"
  346. ]
  347. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  348. Metadata(headers: headers)
  349. )
  350. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  351. // Make sure we wrote back the initial metadata
  352. let writtenHeaders = try channel.assertReadHeadersOutbound()
  353. XCTAssertEqual(
  354. writtenHeaders.headers,
  355. [
  356. GRPCHTTP2Keys.status.rawValue: "200",
  357. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  358. "some-custom-header": "some-custom-value",
  359. ]
  360. )
  361. // Receive client's message
  362. var buffer = ByteBuffer()
  363. buffer.writeInteger(UInt8(0)) // not compressed
  364. buffer.writeInteger(UInt32(42)) // message length
  365. buffer.writeRepeatingByte(0, count: 42) // message
  366. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  367. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  368. // Make sure we haven't sent back an error response, and that we read the message properly
  369. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  370. XCTAssertEqual(
  371. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  372. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  373. )
  374. // Write back response
  375. let serverDataPayload = RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  376. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  377. // Make sure we wrote back the right message
  378. let writtenMessage = try channel.assertReadDataOutbound()
  379. var expectedBuffer = ByteBuffer()
  380. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  381. expectedBuffer.writeInteger(UInt32(42)) // message length
  382. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  383. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  384. // Send back status to end RPC
  385. let trailers = RPCResponsePart<GRPCNIOTransportBytes>.status(
  386. .init(code: .dataLoss, message: "Test data loss"),
  387. ["custom-header": "custom-value"]
  388. )
  389. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  390. // Make sure we wrote back the status and trailers
  391. let writtenStatus = try channel.assertReadHeadersOutbound()
  392. XCTAssertTrue(writtenStatus.endStream)
  393. XCTAssertEqual(
  394. writtenStatus.headers,
  395. [
  396. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  397. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  398. "custom-header": "custom-value",
  399. ]
  400. )
  401. // Writing after EOS is an error.
  402. XCTAssertThrowsError(
  403. ofType: RPCError.self,
  404. try channel.writeOutbound(trailers)
  405. ) { error in
  406. XCTAssertEqual(error.code, .internalError)
  407. XCTAssertEqual(error.message, "Can't write status, stream has already closed")
  408. }
  409. }
  410. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  411. let channel = EmbeddedChannel()
  412. let handler = self.makeServerStreamHandler(channel: channel)
  413. try channel.pipeline.syncOperations.addHandler(handler)
  414. // Receive client's initial metadata
  415. let clientInitialMetadata: HPACKHeaders = [
  416. GRPCHTTP2Keys.path.rawValue: "/test/test",
  417. GRPCHTTP2Keys.scheme.rawValue: "http",
  418. GRPCHTTP2Keys.method.rawValue: "POST",
  419. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  420. GRPCHTTP2Keys.te.rawValue: "trailers",
  421. ]
  422. XCTAssertNoThrow(
  423. try channel.writeInbound(
  424. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  425. )
  426. )
  427. // Make sure we haven't sent back an error response, and that we read the initial metadata
  428. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  429. XCTAssertEqual(
  430. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  431. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  432. )
  433. // Write back server's initial metadata
  434. let headers: HPACKHeaders = [
  435. "some-custom-header": "some-custom-value"
  436. ]
  437. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  438. Metadata(headers: headers)
  439. )
  440. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  441. // Make sure we wrote back the initial metadata
  442. let writtenHeaders = try channel.assertReadHeadersOutbound()
  443. XCTAssertEqual(
  444. writtenHeaders.headers,
  445. [
  446. GRPCHTTP2Keys.status.rawValue: "200",
  447. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  448. "some-custom-header": "some-custom-value",
  449. ]
  450. )
  451. // Receive client's first message
  452. var buffer = ByteBuffer()
  453. buffer.writeInteger(UInt8(0)) // not compressed
  454. XCTAssertNoThrow(
  455. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  456. )
  457. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  458. buffer.clear()
  459. buffer.writeInteger(UInt32(30)) // message length
  460. XCTAssertNoThrow(
  461. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  462. )
  463. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  464. buffer.clear()
  465. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  466. XCTAssertNoThrow(
  467. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  468. )
  469. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  470. buffer.clear()
  471. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  472. XCTAssertNoThrow(
  473. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  474. )
  475. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  476. buffer.clear()
  477. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  478. XCTAssertNoThrow(
  479. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  480. )
  481. var expected = ByteBuffer()
  482. expected.writeRepeatingByte(0, count: 10)
  483. expected.writeRepeatingByte(1, count: 10)
  484. expected.writeRepeatingByte(2, count: 10)
  485. // Make sure we haven't sent back an error response, and that we read the message properly
  486. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  487. XCTAssertEqual(
  488. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  489. RPCRequestPart.message(GRPCNIOTransportBytes(expected))
  490. )
  491. }
  492. func testReceiveMultipleHeaders() throws {
  493. let channel = EmbeddedChannel()
  494. let handler = self.makeServerStreamHandler(channel: channel)
  495. try channel.pipeline.syncOperations.addHandler(handler)
  496. // Receive client's initial metadata
  497. let clientInitialMetadata: HPACKHeaders = [
  498. GRPCHTTP2Keys.path.rawValue: "/test/test",
  499. GRPCHTTP2Keys.scheme.rawValue: "http",
  500. GRPCHTTP2Keys.method.rawValue: "POST",
  501. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  502. GRPCHTTP2Keys.te.rawValue: "trailers",
  503. ]
  504. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  505. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  506. // Receive them again. Should be a protocol violation.
  507. XCTAssertThrowsError(
  508. ofType: RPCError.self,
  509. try channel.writeInbound(
  510. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  511. )
  512. ) { error in
  513. XCTAssertEqual(error.code, .unavailable)
  514. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  515. }
  516. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  517. switch payload {
  518. case .rstStream(let errorCode):
  519. XCTAssertEqual(errorCode, .protocolError)
  520. default:
  521. XCTFail("Expected RST_STREAM, got \(payload)")
  522. }
  523. }
  524. func testSendMultipleMessagesInSingleBuffer() throws {
  525. let channel = EmbeddedChannel()
  526. let handler = self.makeServerStreamHandler(channel: channel)
  527. try channel.pipeline.syncOperations.addHandler(handler)
  528. // Receive client's initial metadata
  529. let clientInitialMetadata: HPACKHeaders = [
  530. GRPCHTTP2Keys.path.rawValue: "/test/test",
  531. GRPCHTTP2Keys.scheme.rawValue: "http",
  532. GRPCHTTP2Keys.method.rawValue: "POST",
  533. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  534. GRPCHTTP2Keys.te.rawValue: "trailers",
  535. ]
  536. XCTAssertNoThrow(
  537. try channel.writeInbound(
  538. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  539. )
  540. )
  541. // Make sure we haven't sent back an error response, and that we read the initial metadata
  542. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  543. XCTAssertEqual(
  544. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  545. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  546. )
  547. // Write back server's initial metadata
  548. let headers: HPACKHeaders = [
  549. "some-custom-header": "some-custom-value"
  550. ]
  551. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  552. Metadata(headers: headers)
  553. )
  554. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  555. // Read out the metadata
  556. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  557. // This is where this test actually begins. We want to write two messages
  558. // without flushing, and make sure that no messages are sent down the pipeline
  559. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  560. // Write back first message and make sure nothing's written in the channel.
  561. XCTAssertNoThrow(
  562. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  563. )
  564. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  565. // Write back second message and make sure nothing's written in the channel.
  566. XCTAssertNoThrow(
  567. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  568. )
  569. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  570. // Now flush and check we *do* write the data.
  571. channel.flush()
  572. let writtenMessage = try channel.assertReadDataOutbound()
  573. // Make sure both messages have been framed together in the ByteBuffer.
  574. XCTAssertEqual(
  575. writtenMessage.data,
  576. .byteBuffer(
  577. .init(bytes: [
  578. // First message
  579. 0, // Compression disabled
  580. 0, 0, 0, 4, // Message length
  581. 1, 1, 1, 1, // First message data
  582. // Second message
  583. 0, // Compression disabled
  584. 0, 0, 0, 4, // Message length
  585. 2, 2, 2, 2, // Second message data
  586. ])
  587. )
  588. )
  589. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  590. }
  591. func testMessageAndStatusAreNotReordered() throws {
  592. let channel = EmbeddedChannel()
  593. let handler = self.makeServerStreamHandler(channel: channel)
  594. try channel.pipeline.syncOperations.addHandler(handler)
  595. // Receive client's initial metadata
  596. let clientInitialMetadata: HPACKHeaders = [
  597. GRPCHTTP2Keys.path.rawValue: "/test/test",
  598. GRPCHTTP2Keys.scheme.rawValue: "http",
  599. GRPCHTTP2Keys.method.rawValue: "POST",
  600. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  601. GRPCHTTP2Keys.te.rawValue: "trailers",
  602. ]
  603. XCTAssertNoThrow(
  604. try channel.writeInbound(
  605. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  606. )
  607. )
  608. // Make sure we haven't sent back an error response, and that we read the initial metadata
  609. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  610. XCTAssertEqual(
  611. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  612. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  613. )
  614. // Write back server's initial metadata
  615. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  616. Metadata(headers: [:])
  617. )
  618. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  619. // Read out the metadata
  620. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  621. // This is where this test actually begins. We want to write a message followed
  622. // by status and trailers, and only flush after both writes.
  623. // Because messages are buffered and potentially bundled together in a single
  624. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  625. // and trailers won't be written before the messages.
  626. // Write back message and make sure nothing's written in the channel.
  627. XCTAssertNoThrow(
  628. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  629. )
  630. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  631. // Write status + metadata and make sure nothing's written.
  632. XCTAssertNoThrow(
  633. channel.write(
  634. RPCResponsePart<GRPCNIOTransportBytes>.status(.init(code: .ok, message: ""), [:])
  635. )
  636. )
  637. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  638. // Now flush and check we *do* write the data in the right order: message first,
  639. // trailers second.
  640. channel.flush()
  641. let writtenMessage = try channel.assertReadDataOutbound()
  642. // Make sure we first get message.
  643. XCTAssertEqual(
  644. writtenMessage.data,
  645. .byteBuffer(
  646. .init(bytes: [
  647. // First message
  648. 0, // Compression disabled
  649. 0, 0, 0, 4, // Message length
  650. 1, 1, 1, 1, // First message data
  651. ])
  652. )
  653. )
  654. XCTAssertFalse(writtenMessage.endStream)
  655. // Make sure we get trailers.
  656. let writtenTrailers = try channel.assertReadHeadersOutbound()
  657. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  658. XCTAssertTrue(writtenTrailers.endStream)
  659. // Make sure we get nothing else.
  660. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  661. }
  662. func testMethodDescriptorPromiseSucceeds() throws {
  663. let channel = EmbeddedChannel()
  664. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  665. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  666. try channel.pipeline.syncOperations.addHandler(handler)
  667. // Receive client's initial metadata
  668. let clientInitialMetadata: HPACKHeaders = [
  669. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  670. GRPCHTTP2Keys.scheme.rawValue: "http",
  671. GRPCHTTP2Keys.method.rawValue: "POST",
  672. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  673. GRPCHTTP2Keys.te.rawValue: "trailers",
  674. ]
  675. XCTAssertNoThrow(
  676. try channel.writeInbound(
  677. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  678. )
  679. )
  680. // Make sure we haven't sent back an error response, and that we read the initial metadata
  681. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  682. XCTAssertEqual(
  683. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  684. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  685. )
  686. XCTAssertEqual(
  687. try promise.futureResult.wait(),
  688. MethodDescriptor(fullyQualifiedService: "SomeService", method: "SomeMethod")
  689. )
  690. }
  691. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  692. let channel = EmbeddedChannel()
  693. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  694. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  695. try channel.pipeline.syncOperations.addHandler(handler)
  696. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  697. XCTAssertThrowsError(
  698. ofType: RPCError.self,
  699. try promise.futureResult.wait()
  700. ) { error in
  701. XCTAssertEqual(error.code, .unavailable)
  702. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  703. }
  704. }
  705. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  706. let channel = EmbeddedChannel()
  707. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  708. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  709. try channel.pipeline.syncOperations.addHandler(handler)
  710. // Receive client's initial metadata
  711. let clientInitialMetadata: HPACKHeaders = [
  712. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  713. GRPCHTTP2Keys.scheme.rawValue: "http",
  714. GRPCHTTP2Keys.method.rawValue: "POST",
  715. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  716. GRPCHTTP2Keys.te.rawValue: "trailers",
  717. ]
  718. XCTAssertNoThrow(
  719. try channel.writeInbound(
  720. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  721. )
  722. )
  723. XCTAssertThrowsError(
  724. ofType: RPCError.self,
  725. try promise.futureResult.wait()
  726. ) { error in
  727. XCTAssertEqual(error.code, .unavailable)
  728. XCTAssertEqual(error.message, "RPC was rejected.")
  729. }
  730. }
  731. func testUnexpectedStreamClose_ErrorFired() throws {
  732. let channel = EmbeddedChannel()
  733. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  734. let handler = self.makeServerStreamHandler(
  735. channel: channel,
  736. descriptorPromise: promise,
  737. disableAssertions: true
  738. )
  739. try channel.pipeline.syncOperations.addHandler(handler)
  740. // Receive client's initial metadata
  741. let clientInitialMetadata: HPACKHeaders = [
  742. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  743. GRPCHTTP2Keys.scheme.rawValue: "http",
  744. GRPCHTTP2Keys.method.rawValue: "POST",
  745. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  746. GRPCHTTP2Keys.te.rawValue: "trailers",
  747. ]
  748. XCTAssertNoThrow(
  749. try channel.writeInbound(
  750. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  751. )
  752. )
  753. // Make sure we haven't sent back an error response, and that we read the initial metadata
  754. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  755. XCTAssertEqual(
  756. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  757. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  758. )
  759. // An error is fired down the pipeline
  760. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  761. channel.pipeline.fireErrorCaught(thrownError)
  762. // The server handler simply forwards the error.
  763. XCTAssertThrowsError(
  764. ofType: type(of: thrownError),
  765. try channel.throwIfErrorCaught()
  766. ) { error in
  767. XCTAssertEqual(error, thrownError)
  768. }
  769. // We should now be closed: check we can't write anymore.
  770. XCTAssertThrowsError(
  771. ofType: RPCError.self,
  772. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  773. ) { error in
  774. XCTAssertEqual(error.code, .internalError)
  775. XCTAssertEqual(error.message, "Invalid state")
  776. }
  777. }
  778. func testUnexpectedStreamClose_ChannelInactive() throws {
  779. let channel = EmbeddedChannel()
  780. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  781. let handler = self.makeServerStreamHandler(
  782. channel: channel,
  783. descriptorPromise: promise,
  784. disableAssertions: true
  785. )
  786. try channel.pipeline.syncOperations.addHandler(handler)
  787. // Receive client's initial metadata
  788. let clientInitialMetadata: HPACKHeaders = [
  789. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  790. GRPCHTTP2Keys.scheme.rawValue: "http",
  791. GRPCHTTP2Keys.method.rawValue: "POST",
  792. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  793. GRPCHTTP2Keys.te.rawValue: "trailers",
  794. ]
  795. XCTAssertNoThrow(
  796. try channel.writeInbound(
  797. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  798. )
  799. )
  800. // Make sure we haven't sent back an error response, and that we read the initial metadata
  801. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  802. XCTAssertEqual(
  803. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  804. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  805. )
  806. // Channel becomes inactive
  807. channel.pipeline.fireChannelInactive()
  808. // The server handler fires an error
  809. XCTAssertThrowsError(
  810. ofType: RPCError.self,
  811. try channel.throwIfErrorCaught()
  812. ) { error in
  813. XCTAssertEqual(error.code, .unavailable)
  814. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  815. }
  816. // We should now be closed: check we can't write anymore.
  817. XCTAssertThrowsError(
  818. ofType: RPCError.self,
  819. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  820. ) { error in
  821. XCTAssertEqual(error.code, .internalError)
  822. XCTAssertEqual(error.message, "Invalid state")
  823. }
  824. }
  825. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  826. let channel = EmbeddedChannel()
  827. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  828. let handler = self.makeServerStreamHandler(
  829. channel: channel,
  830. descriptorPromise: promise,
  831. disableAssertions: true
  832. )
  833. try channel.pipeline.syncOperations.addHandler(handler)
  834. // Receive client's initial metadata
  835. let clientInitialMetadata: HPACKHeaders = [
  836. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  837. GRPCHTTP2Keys.scheme.rawValue: "http",
  838. GRPCHTTP2Keys.method.rawValue: "POST",
  839. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  840. GRPCHTTP2Keys.te.rawValue: "trailers",
  841. ]
  842. XCTAssertNoThrow(
  843. try channel.writeInbound(
  844. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  845. )
  846. )
  847. // Make sure we haven't sent back an error response, and that we read the initial metadata
  848. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  849. XCTAssertEqual(
  850. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  851. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  852. )
  853. // We receive RST_STREAM frame
  854. // Assert the server handler fires an error
  855. XCTAssertThrowsError(
  856. ofType: RPCError.self,
  857. try channel.writeInbound(
  858. HTTP2Frame.FramePayload.rstStream(.internalError)
  859. )
  860. ) { error in
  861. XCTAssertEqual(error.code, .unavailable)
  862. XCTAssertEqual(
  863. error.message,
  864. "Stream unexpectedly closed: received RST_STREAM frame (0x2: internal error)."
  865. )
  866. }
  867. // We should now be closed: check we can't write anymore.
  868. XCTAssertThrowsError(
  869. ofType: RPCError.self,
  870. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata([:]))
  871. ) { error in
  872. XCTAssertEqual(error.code, .internalError)
  873. XCTAssertEqual(error.message, "Invalid state")
  874. }
  875. }
  876. }
  877. struct ServerStreamHandlerTests {
  878. @available(gRPCSwiftNIOTransport 2.0, *)
  879. struct ConnectionAndStreamHandlers {
  880. let streamHandler: GRPCServerStreamHandler
  881. let connectionHandler: ServerConnectionManagementHandler
  882. }
  883. @available(gRPCSwiftNIOTransport 2.0, *)
  884. private func makeServerConnectionAndStreamHandlers(
  885. channel: any Channel,
  886. scheme: Scheme = .http,
  887. acceptedEncodings: CompressionAlgorithmSet = [],
  888. maxPayloadSize: Int = .max,
  889. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  890. disableAssertions: Bool = false
  891. ) -> ConnectionAndStreamHandlers {
  892. let connectionManagementHandler = ServerConnectionManagementHandler(
  893. eventLoop: channel.eventLoop,
  894. maxIdleTime: nil,
  895. maxAge: nil,
  896. maxGraceTime: nil,
  897. keepaliveTime: nil,
  898. keepaliveTimeout: nil,
  899. allowKeepaliveWithoutCalls: false,
  900. minPingIntervalWithoutCalls: .minutes(5),
  901. requireALPN: false
  902. )
  903. let streamHandler = GRPCServerStreamHandler(
  904. scheme: scheme,
  905. acceptedEncodings: acceptedEncodings,
  906. maxPayloadSize: maxPayloadSize,
  907. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  908. eventLoop: channel.eventLoop,
  909. skipStateMachineAssertions: disableAssertions
  910. )
  911. return ConnectionAndStreamHandlers(
  912. streamHandler: streamHandler,
  913. connectionHandler: connectionManagementHandler
  914. )
  915. }
  916. @Test("ChannelShouldQuiesceEvent is buffered and turns into RPC cancellation")
  917. @available(gRPCSwiftNIOTransport 2.0, *)
  918. func shouldQuiesceEventIsBufferedBeforeHandleIsSet() async throws {
  919. let channel = EmbeddedChannel()
  920. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  921. try channel.pipeline.syncOperations.addHandler(handler)
  922. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  923. await withServerContextRPCCancellationHandle { handle in
  924. handler.setCancellationHandle(handle)
  925. #expect(handle.isCancelled)
  926. }
  927. // Throwing is fine: the channel is closed abruptly, errors are expected.
  928. _ = try? channel.finish()
  929. }
  930. @Test("ChannelShouldQuiesceEvent turns into RPC cancellation")
  931. @available(gRPCSwiftNIOTransport 2.0, *)
  932. func shouldQuiesceEventTriggersCancellation() async throws {
  933. let channel = EmbeddedChannel()
  934. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  935. try channel.pipeline.syncOperations.addHandler(handler)
  936. await withServerContextRPCCancellationHandle { handle in
  937. handler.setCancellationHandle(handle)
  938. #expect(!handle.isCancelled)
  939. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  940. #expect(handle.isCancelled)
  941. }
  942. // Throwing is fine: the channel is closed abruptly, errors are expected.
  943. _ = try? channel.finish()
  944. }
  945. @Test("RST_STREAM turns into RPC cancellation")
  946. @available(gRPCSwiftNIOTransport 2.0, *)
  947. func rstStreamTriggersCancellation() async throws {
  948. let channel = EmbeddedChannel()
  949. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  950. try channel.pipeline.syncOperations.addHandler(handler)
  951. await withServerContextRPCCancellationHandle { handle in
  952. handler.setCancellationHandle(handle)
  953. #expect(!handle.isCancelled)
  954. let rstStream: HTTP2Frame.FramePayload = .rstStream(.cancel)
  955. channel.pipeline.fireChannelRead(rstStream)
  956. #expect(handle.isCancelled)
  957. }
  958. // Throwing is fine: the channel is closed abruptly, errors are expected.
  959. _ = try? channel.finish()
  960. }
  961. }
  962. extension EmbeddedChannel {
  963. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  964. guard
  965. case .headers(let writtenHeaders) = try XCTUnwrap(
  966. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  967. )
  968. else {
  969. throw TestError.assertionFailure("Expected to write headers")
  970. }
  971. return writtenHeaders
  972. }
  973. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  974. guard
  975. case .data(let writtenMessage) = try XCTUnwrap(
  976. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  977. )
  978. else {
  979. throw TestError.assertionFailure("Expected to write data")
  980. }
  981. return writtenMessage
  982. }
  983. }
  984. private enum TestError: Error {
  985. case assertionFailure(String)
  986. }