GRPCServerStreamHandlerTests.swift 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import Testing
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. @available(gRPCSwiftNIOTransport 2.0, *)
  25. final class GRPCServerStreamHandlerTests: XCTestCase {
  26. private func makeServerStreamHandler(
  27. channel: any Channel,
  28. scheme: Scheme = .http,
  29. acceptedEncodings: CompressionAlgorithmSet = [],
  30. maxPayloadSize: Int = .max,
  31. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  32. disableAssertions: Bool = false
  33. ) -> GRPCServerStreamHandler {
  34. return GRPCServerStreamHandler(
  35. scheme: scheme,
  36. acceptedEncodings: acceptedEncodings,
  37. maxPayloadSize: maxPayloadSize,
  38. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  39. eventLoop: channel.eventLoop,
  40. skipStateMachineAssertions: disableAssertions
  41. )
  42. }
  43. func testH2FramesAreIgnored() throws {
  44. let channel = EmbeddedChannel()
  45. let handler = self.makeServerStreamHandler(channel: channel)
  46. try channel.pipeline.syncOperations.addHandler(handler)
  47. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  48. .ping(.init(), ack: false),
  49. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  50. .priority(
  51. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  52. ),
  53. .settings(.ack),
  54. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  55. .windowUpdate(windowSizeIncrement: 4),
  56. .alternativeService(origin: nil, field: nil),
  57. .origin([]),
  58. ]
  59. for toBeIgnored in framesToBeIgnored {
  60. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  61. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  62. }
  63. }
  64. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  65. let channel = EmbeddedChannel()
  66. let handler = self.makeServerStreamHandler(channel: channel)
  67. try channel.pipeline.syncOperations.addHandler(handler)
  68. // Receive client's initial metadata without content-type
  69. let clientInitialMetadata: HPACKHeaders = [
  70. GRPCHTTP2Keys.path.rawValue: "/test/test",
  71. GRPCHTTP2Keys.scheme.rawValue: "http",
  72. GRPCHTTP2Keys.method.rawValue: "POST",
  73. GRPCHTTP2Keys.te.rawValue: "trailers",
  74. ]
  75. XCTAssertNoThrow(
  76. try channel.writeInbound(
  77. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  78. )
  79. )
  80. // Make sure we have sent a trailers-only response
  81. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  82. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  83. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  84. }
  85. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  86. let channel = EmbeddedChannel()
  87. let handler = self.makeServerStreamHandler(channel: channel)
  88. try channel.pipeline.syncOperations.addHandler(handler)
  89. // Receive client's initial metadata without :method
  90. let clientInitialMetadata: HPACKHeaders = [
  91. GRPCHTTP2Keys.path.rawValue: "/test/test",
  92. GRPCHTTP2Keys.scheme.rawValue: "http",
  93. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  94. GRPCHTTP2Keys.te.rawValue: "trailers",
  95. ]
  96. XCTAssertNoThrow(
  97. try channel.writeInbound(
  98. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  99. )
  100. )
  101. // Make sure we have sent a trailers-only response
  102. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  103. XCTAssertEqual(
  104. writtenTrailersOnlyResponse.headers,
  105. [
  106. GRPCHTTP2Keys.status.rawValue: "200",
  107. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  108. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  109. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  110. ":method header is expected to be present and have a value of \"POST\".",
  111. ]
  112. )
  113. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  114. }
  115. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  116. let channel = EmbeddedChannel()
  117. let handler = self.makeServerStreamHandler(channel: channel)
  118. try channel.pipeline.syncOperations.addHandler(handler)
  119. // Receive client's initial metadata without :scheme
  120. let clientInitialMetadata: HPACKHeaders = [
  121. GRPCHTTP2Keys.path.rawValue: "/test/test",
  122. GRPCHTTP2Keys.method.rawValue: "POST",
  123. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  124. GRPCHTTP2Keys.te.rawValue: "trailers",
  125. ]
  126. XCTAssertNoThrow(
  127. try channel.writeInbound(
  128. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  129. )
  130. )
  131. // Make sure we have sent a trailers-only response
  132. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  133. XCTAssertEqual(
  134. writtenTrailersOnlyResponse.headers,
  135. [
  136. GRPCHTTP2Keys.status.rawValue: "200",
  137. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  138. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  139. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  140. ":scheme header must be present and one of \"http\" or \"https\".",
  141. ]
  142. )
  143. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  144. }
  145. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  146. let channel = EmbeddedChannel()
  147. let handler = self.makeServerStreamHandler(channel: channel)
  148. try channel.pipeline.syncOperations.addHandler(handler)
  149. // Receive client's initial metadata without :path
  150. let clientInitialMetadata: HPACKHeaders = [
  151. GRPCHTTP2Keys.scheme.rawValue: "http",
  152. GRPCHTTP2Keys.method.rawValue: "POST",
  153. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  154. GRPCHTTP2Keys.te.rawValue: "trailers",
  155. ]
  156. XCTAssertNoThrow(
  157. try channel.writeInbound(
  158. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  159. )
  160. )
  161. // Make sure we have sent a trailers-only response
  162. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  163. XCTAssertEqual(
  164. writtenTrailersOnlyResponse.headers,
  165. [
  166. GRPCHTTP2Keys.status.rawValue: "200",
  167. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  168. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  169. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  170. ]
  171. )
  172. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  173. }
  174. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  175. let channel = EmbeddedChannel()
  176. let handler = self.makeServerStreamHandler(channel: channel)
  177. try channel.pipeline.syncOperations.addHandler(handler)
  178. // Receive client's initial metadata
  179. let clientInitialMetadata: HPACKHeaders = [
  180. GRPCHTTP2Keys.path.rawValue: "/test/test",
  181. GRPCHTTP2Keys.scheme.rawValue: "http",
  182. GRPCHTTP2Keys.method.rawValue: "POST",
  183. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  184. GRPCHTTP2Keys.te.rawValue: "trailers",
  185. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  186. ]
  187. XCTAssertNoThrow(
  188. try channel.writeInbound(
  189. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  190. )
  191. )
  192. // Make sure we have sent a trailers-only response
  193. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  194. XCTAssertEqual(
  195. writtenTrailersOnlyResponse.headers,
  196. [
  197. GRPCHTTP2Keys.status.rawValue: "200",
  198. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  199. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  200. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  201. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  202. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  203. ]
  204. )
  205. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  206. }
  207. func testOverMaximumPayloadSize() throws {
  208. let channel = EmbeddedChannel()
  209. let handler = self.makeServerStreamHandler(channel: channel, maxPayloadSize: 1)
  210. try channel.pipeline.syncOperations.addHandler(handler)
  211. // Receive client's initial metadata
  212. let clientInitialMetadata: HPACKHeaders = [
  213. GRPCHTTP2Keys.path.rawValue: "/test/test",
  214. GRPCHTTP2Keys.scheme.rawValue: "http",
  215. GRPCHTTP2Keys.method.rawValue: "POST",
  216. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  217. GRPCHTTP2Keys.te.rawValue: "trailers",
  218. ]
  219. XCTAssertNoThrow(
  220. try channel.writeInbound(
  221. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  222. )
  223. )
  224. // Make sure we haven't sent back an error response, and that we read the initial metadata
  225. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  226. XCTAssertEqual(
  227. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  228. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  229. )
  230. // Write back server's initial metadata
  231. let headers: HPACKHeaders = [
  232. "some-custom-header": "some-custom-value"
  233. ]
  234. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  235. Metadata(headers: headers)
  236. )
  237. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  238. // Make sure we wrote back the initial metadata
  239. let writtenHeaders = try channel.assertReadHeadersOutbound()
  240. XCTAssertEqual(
  241. writtenHeaders.headers,
  242. [
  243. GRPCHTTP2Keys.status.rawValue: "200",
  244. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  245. "some-custom-header": "some-custom-value",
  246. ]
  247. )
  248. // Receive client's message
  249. var buffer = ByteBuffer()
  250. buffer.writeInteger(UInt8(0)) // not compressed
  251. buffer.writeInteger(UInt32(42)) // message length
  252. buffer.writeRepeatingByte(0, count: 42) // message
  253. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  254. XCTAssertThrowsError(
  255. ofType: RPCError.self,
  256. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  257. ) { error in
  258. XCTAssertEqual(error.code, .internalError)
  259. XCTAssertEqual(error.message, "Failed to decode message")
  260. }
  261. // Make sure we haven't sent a response back and that we didn't read the received message
  262. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  263. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  264. }
  265. func testClientEndsStream() throws {
  266. let channel = EmbeddedChannel()
  267. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  268. try channel.pipeline.syncOperations.addHandler(handler)
  269. // Receive client's initial metadata with end stream set
  270. let clientInitialMetadata: HPACKHeaders = [
  271. GRPCHTTP2Keys.path.rawValue: "/test/test",
  272. GRPCHTTP2Keys.scheme.rawValue: "http",
  273. GRPCHTTP2Keys.method.rawValue: "POST",
  274. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  275. GRPCHTTP2Keys.te.rawValue: "trailers",
  276. ]
  277. XCTAssertNoThrow(
  278. try channel.writeInbound(
  279. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  280. )
  281. )
  282. // Make sure we haven't sent back an error response, and that we read the initial metadata
  283. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  284. XCTAssertEqual(
  285. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  286. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  287. )
  288. // Write back server's initial metadata
  289. let headers: HPACKHeaders = [
  290. "some-custom-header": "some-custom-value"
  291. ]
  292. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  293. Metadata(headers: headers)
  294. )
  295. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  296. // Make sure we wrote back the initial metadata
  297. let writtenHeaders = try channel.assertReadHeadersOutbound()
  298. XCTAssertEqual(
  299. writtenHeaders.headers,
  300. [
  301. GRPCHTTP2Keys.status.rawValue: "200",
  302. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  303. "some-custom-header": "some-custom-value",
  304. ]
  305. )
  306. // We should throw if the client sends another message, since it's closed the stream already.
  307. var buffer = ByteBuffer()
  308. buffer.writeInteger(UInt8(0)) // not compressed
  309. buffer.writeInteger(UInt32(42)) // message length
  310. buffer.writeRepeatingByte(0, count: 42) // message
  311. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  312. XCTAssertThrowsError(
  313. ofType: RPCError.self,
  314. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  315. ) { error in
  316. XCTAssertEqual(error.code, .internalError)
  317. XCTAssertEqual(error.message, "Invalid state")
  318. }
  319. }
  320. func testNormalFlow() throws {
  321. let channel = EmbeddedChannel()
  322. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  323. try channel.pipeline.syncOperations.addHandler(handler)
  324. // Receive client's initial metadata
  325. let clientInitialMetadata: HPACKHeaders = [
  326. GRPCHTTP2Keys.path.rawValue: "/test/test",
  327. GRPCHTTP2Keys.scheme.rawValue: "http",
  328. GRPCHTTP2Keys.method.rawValue: "POST",
  329. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  330. GRPCHTTP2Keys.te.rawValue: "trailers",
  331. ]
  332. XCTAssertNoThrow(
  333. try channel.writeInbound(
  334. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  335. )
  336. )
  337. // Make sure we haven't sent back an error response, and that we read the initial metadata
  338. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  339. XCTAssertEqual(
  340. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  341. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  342. )
  343. // Write back server's initial metadata
  344. let headers: HPACKHeaders = [
  345. "some-custom-header": "some-custom-value"
  346. ]
  347. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  348. Metadata(headers: headers)
  349. )
  350. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  351. // Make sure we wrote back the initial metadata
  352. let writtenHeaders = try channel.assertReadHeadersOutbound()
  353. XCTAssertEqual(
  354. writtenHeaders.headers,
  355. [
  356. GRPCHTTP2Keys.status.rawValue: "200",
  357. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  358. "some-custom-header": "some-custom-value",
  359. ]
  360. )
  361. // Receive client's message
  362. var buffer = ByteBuffer()
  363. buffer.writeInteger(UInt8(0)) // not compressed
  364. buffer.writeInteger(UInt32(42)) // message length
  365. buffer.writeRepeatingByte(0, count: 42) // message
  366. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  367. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  368. // Make sure we haven't sent back an error response, and that we read the message properly
  369. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  370. XCTAssertEqual(
  371. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  372. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  373. )
  374. // Write back response
  375. let serverDataPayload = RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  376. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  377. // Make sure we wrote back the right message
  378. let writtenMessage = try channel.assertReadDataOutbound()
  379. var expectedBuffer = ByteBuffer()
  380. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  381. expectedBuffer.writeInteger(UInt32(42)) // message length
  382. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  383. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  384. // Send back status to end RPC
  385. let trailers = RPCResponsePart<GRPCNIOTransportBytes>.status(
  386. .init(code: .dataLoss, message: "Test data loss"),
  387. ["custom-header": "custom-value"]
  388. )
  389. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  390. // Make sure we wrote back the status and trailers
  391. let writtenStatus = try channel.assertReadHeadersOutbound()
  392. XCTAssertTrue(writtenStatus.endStream)
  393. XCTAssertEqual(
  394. writtenStatus.headers,
  395. [
  396. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  397. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  398. "custom-header": "custom-value",
  399. ]
  400. )
  401. // Try writing and assert it throws to make sure we don't allow writes
  402. // after closing.
  403. XCTAssertThrowsError(
  404. ofType: RPCError.self,
  405. try channel.writeOutbound(trailers)
  406. ) { error in
  407. XCTAssertEqual(error.code, .internalError)
  408. XCTAssertEqual(error.message, "Invalid state")
  409. }
  410. }
  411. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  412. let channel = EmbeddedChannel()
  413. let handler = self.makeServerStreamHandler(channel: channel)
  414. try channel.pipeline.syncOperations.addHandler(handler)
  415. // Receive client's initial metadata
  416. let clientInitialMetadata: HPACKHeaders = [
  417. GRPCHTTP2Keys.path.rawValue: "/test/test",
  418. GRPCHTTP2Keys.scheme.rawValue: "http",
  419. GRPCHTTP2Keys.method.rawValue: "POST",
  420. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  421. GRPCHTTP2Keys.te.rawValue: "trailers",
  422. ]
  423. XCTAssertNoThrow(
  424. try channel.writeInbound(
  425. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  426. )
  427. )
  428. // Make sure we haven't sent back an error response, and that we read the initial metadata
  429. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  430. XCTAssertEqual(
  431. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  432. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  433. )
  434. // Write back server's initial metadata
  435. let headers: HPACKHeaders = [
  436. "some-custom-header": "some-custom-value"
  437. ]
  438. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  439. Metadata(headers: headers)
  440. )
  441. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  442. // Make sure we wrote back the initial metadata
  443. let writtenHeaders = try channel.assertReadHeadersOutbound()
  444. XCTAssertEqual(
  445. writtenHeaders.headers,
  446. [
  447. GRPCHTTP2Keys.status.rawValue: "200",
  448. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  449. "some-custom-header": "some-custom-value",
  450. ]
  451. )
  452. // Receive client's first message
  453. var buffer = ByteBuffer()
  454. buffer.writeInteger(UInt8(0)) // not compressed
  455. XCTAssertNoThrow(
  456. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  457. )
  458. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  459. buffer.clear()
  460. buffer.writeInteger(UInt32(30)) // message length
  461. XCTAssertNoThrow(
  462. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  463. )
  464. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  465. buffer.clear()
  466. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  467. XCTAssertNoThrow(
  468. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  469. )
  470. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  471. buffer.clear()
  472. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  473. XCTAssertNoThrow(
  474. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  475. )
  476. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  477. buffer.clear()
  478. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  479. XCTAssertNoThrow(
  480. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  481. )
  482. var expected = ByteBuffer()
  483. expected.writeRepeatingByte(0, count: 10)
  484. expected.writeRepeatingByte(1, count: 10)
  485. expected.writeRepeatingByte(2, count: 10)
  486. // Make sure we haven't sent back an error response, and that we read the message properly
  487. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  488. XCTAssertEqual(
  489. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  490. RPCRequestPart.message(GRPCNIOTransportBytes(expected))
  491. )
  492. }
  493. func testReceiveMultipleHeaders() throws {
  494. let channel = EmbeddedChannel()
  495. let handler = self.makeServerStreamHandler(channel: channel)
  496. try channel.pipeline.syncOperations.addHandler(handler)
  497. // Receive client's initial metadata
  498. let clientInitialMetadata: HPACKHeaders = [
  499. GRPCHTTP2Keys.path.rawValue: "/test/test",
  500. GRPCHTTP2Keys.scheme.rawValue: "http",
  501. GRPCHTTP2Keys.method.rawValue: "POST",
  502. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  503. GRPCHTTP2Keys.te.rawValue: "trailers",
  504. ]
  505. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  506. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  507. // Receive them again. Should be a protocol violation.
  508. XCTAssertThrowsError(
  509. ofType: RPCError.self,
  510. try channel.writeInbound(
  511. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  512. )
  513. ) { error in
  514. XCTAssertEqual(error.code, .unavailable)
  515. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  516. }
  517. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  518. switch payload {
  519. case .rstStream(let errorCode):
  520. XCTAssertEqual(errorCode, .protocolError)
  521. default:
  522. XCTFail("Expected RST_STREAM, got \(payload)")
  523. }
  524. }
  525. func testSendMultipleMessagesInSingleBuffer() throws {
  526. let channel = EmbeddedChannel()
  527. let handler = self.makeServerStreamHandler(channel: channel)
  528. try channel.pipeline.syncOperations.addHandler(handler)
  529. // Receive client's initial metadata
  530. let clientInitialMetadata: HPACKHeaders = [
  531. GRPCHTTP2Keys.path.rawValue: "/test/test",
  532. GRPCHTTP2Keys.scheme.rawValue: "http",
  533. GRPCHTTP2Keys.method.rawValue: "POST",
  534. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  535. GRPCHTTP2Keys.te.rawValue: "trailers",
  536. ]
  537. XCTAssertNoThrow(
  538. try channel.writeInbound(
  539. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  540. )
  541. )
  542. // Make sure we haven't sent back an error response, and that we read the initial metadata
  543. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  544. XCTAssertEqual(
  545. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  546. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  547. )
  548. // Write back server's initial metadata
  549. let headers: HPACKHeaders = [
  550. "some-custom-header": "some-custom-value"
  551. ]
  552. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  553. Metadata(headers: headers)
  554. )
  555. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  556. // Read out the metadata
  557. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  558. // This is where this test actually begins. We want to write two messages
  559. // without flushing, and make sure that no messages are sent down the pipeline
  560. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  561. // Write back first message and make sure nothing's written in the channel.
  562. XCTAssertNoThrow(
  563. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  564. )
  565. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  566. // Write back second message and make sure nothing's written in the channel.
  567. XCTAssertNoThrow(
  568. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  569. )
  570. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  571. // Now flush and check we *do* write the data.
  572. channel.flush()
  573. let writtenMessage = try channel.assertReadDataOutbound()
  574. // Make sure both messages have been framed together in the ByteBuffer.
  575. XCTAssertEqual(
  576. writtenMessage.data,
  577. .byteBuffer(
  578. .init(bytes: [
  579. // First message
  580. 0, // Compression disabled
  581. 0, 0, 0, 4, // Message length
  582. 1, 1, 1, 1, // First message data
  583. // Second message
  584. 0, // Compression disabled
  585. 0, 0, 0, 4, // Message length
  586. 2, 2, 2, 2, // Second message data
  587. ])
  588. )
  589. )
  590. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  591. }
  592. func testMessageAndStatusAreNotReordered() throws {
  593. let channel = EmbeddedChannel()
  594. let handler = self.makeServerStreamHandler(channel: channel)
  595. try channel.pipeline.syncOperations.addHandler(handler)
  596. // Receive client's initial metadata
  597. let clientInitialMetadata: HPACKHeaders = [
  598. GRPCHTTP2Keys.path.rawValue: "/test/test",
  599. GRPCHTTP2Keys.scheme.rawValue: "http",
  600. GRPCHTTP2Keys.method.rawValue: "POST",
  601. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  602. GRPCHTTP2Keys.te.rawValue: "trailers",
  603. ]
  604. XCTAssertNoThrow(
  605. try channel.writeInbound(
  606. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  607. )
  608. )
  609. // Make sure we haven't sent back an error response, and that we read the initial metadata
  610. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  611. XCTAssertEqual(
  612. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  613. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  614. )
  615. // Write back server's initial metadata
  616. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  617. Metadata(headers: [:])
  618. )
  619. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  620. // Read out the metadata
  621. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  622. // This is where this test actually begins. We want to write a message followed
  623. // by status and trailers, and only flush after both writes.
  624. // Because messages are buffered and potentially bundled together in a single
  625. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  626. // and trailers won't be written before the messages.
  627. // Write back message and make sure nothing's written in the channel.
  628. XCTAssertNoThrow(
  629. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  630. )
  631. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  632. // Write status + metadata and make sure nothing's written.
  633. XCTAssertNoThrow(
  634. channel.write(
  635. RPCResponsePart<GRPCNIOTransportBytes>.status(.init(code: .ok, message: ""), [:])
  636. )
  637. )
  638. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  639. // Now flush and check we *do* write the data in the right order: message first,
  640. // trailers second.
  641. channel.flush()
  642. let writtenMessage = try channel.assertReadDataOutbound()
  643. // Make sure we first get message.
  644. XCTAssertEqual(
  645. writtenMessage.data,
  646. .byteBuffer(
  647. .init(bytes: [
  648. // First message
  649. 0, // Compression disabled
  650. 0, 0, 0, 4, // Message length
  651. 1, 1, 1, 1, // First message data
  652. ])
  653. )
  654. )
  655. XCTAssertFalse(writtenMessage.endStream)
  656. // Make sure we get trailers.
  657. let writtenTrailers = try channel.assertReadHeadersOutbound()
  658. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  659. XCTAssertTrue(writtenTrailers.endStream)
  660. // Make sure we get nothing else.
  661. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  662. }
  663. func testMethodDescriptorPromiseSucceeds() throws {
  664. let channel = EmbeddedChannel()
  665. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  666. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  667. try channel.pipeline.syncOperations.addHandler(handler)
  668. // Receive client's initial metadata
  669. let clientInitialMetadata: HPACKHeaders = [
  670. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  671. GRPCHTTP2Keys.scheme.rawValue: "http",
  672. GRPCHTTP2Keys.method.rawValue: "POST",
  673. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  674. GRPCHTTP2Keys.te.rawValue: "trailers",
  675. ]
  676. XCTAssertNoThrow(
  677. try channel.writeInbound(
  678. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  679. )
  680. )
  681. // Make sure we haven't sent back an error response, and that we read the initial metadata
  682. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  683. XCTAssertEqual(
  684. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  685. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  686. )
  687. XCTAssertEqual(
  688. try promise.futureResult.wait(),
  689. MethodDescriptor(fullyQualifiedService: "SomeService", method: "SomeMethod")
  690. )
  691. }
  692. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  693. let channel = EmbeddedChannel()
  694. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  695. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  696. try channel.pipeline.syncOperations.addHandler(handler)
  697. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  698. XCTAssertThrowsError(
  699. ofType: RPCError.self,
  700. try promise.futureResult.wait()
  701. ) { error in
  702. XCTAssertEqual(error.code, .unavailable)
  703. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  704. }
  705. }
  706. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  707. let channel = EmbeddedChannel()
  708. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  709. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  710. try channel.pipeline.syncOperations.addHandler(handler)
  711. // Receive client's initial metadata
  712. let clientInitialMetadata: HPACKHeaders = [
  713. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  714. GRPCHTTP2Keys.scheme.rawValue: "http",
  715. GRPCHTTP2Keys.method.rawValue: "POST",
  716. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  717. GRPCHTTP2Keys.te.rawValue: "trailers",
  718. ]
  719. XCTAssertNoThrow(
  720. try channel.writeInbound(
  721. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  722. )
  723. )
  724. XCTAssertThrowsError(
  725. ofType: RPCError.self,
  726. try promise.futureResult.wait()
  727. ) { error in
  728. XCTAssertEqual(error.code, .unavailable)
  729. XCTAssertEqual(error.message, "RPC was rejected.")
  730. }
  731. }
  732. func testUnexpectedStreamClose_ErrorFired() throws {
  733. let channel = EmbeddedChannel()
  734. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  735. let handler = self.makeServerStreamHandler(
  736. channel: channel,
  737. descriptorPromise: promise,
  738. disableAssertions: true
  739. )
  740. try channel.pipeline.syncOperations.addHandler(handler)
  741. // Receive client's initial metadata
  742. let clientInitialMetadata: HPACKHeaders = [
  743. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  744. GRPCHTTP2Keys.scheme.rawValue: "http",
  745. GRPCHTTP2Keys.method.rawValue: "POST",
  746. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  747. GRPCHTTP2Keys.te.rawValue: "trailers",
  748. ]
  749. XCTAssertNoThrow(
  750. try channel.writeInbound(
  751. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  752. )
  753. )
  754. // Make sure we haven't sent back an error response, and that we read the initial metadata
  755. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  756. XCTAssertEqual(
  757. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  758. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  759. )
  760. // An error is fired down the pipeline
  761. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  762. channel.pipeline.fireErrorCaught(thrownError)
  763. // The server handler simply forwards the error.
  764. XCTAssertThrowsError(
  765. ofType: type(of: thrownError),
  766. try channel.throwIfErrorCaught()
  767. ) { error in
  768. XCTAssertEqual(error, thrownError)
  769. }
  770. // We should now be closed: check we can't write anymore.
  771. XCTAssertThrowsError(
  772. ofType: RPCError.self,
  773. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  774. ) { error in
  775. XCTAssertEqual(error.code, .internalError)
  776. XCTAssertEqual(error.message, "Invalid state")
  777. }
  778. }
  779. func testUnexpectedStreamClose_ChannelInactive() throws {
  780. let channel = EmbeddedChannel()
  781. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  782. let handler = self.makeServerStreamHandler(
  783. channel: channel,
  784. descriptorPromise: promise,
  785. disableAssertions: true
  786. )
  787. try channel.pipeline.syncOperations.addHandler(handler)
  788. // Receive client's initial metadata
  789. let clientInitialMetadata: HPACKHeaders = [
  790. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  791. GRPCHTTP2Keys.scheme.rawValue: "http",
  792. GRPCHTTP2Keys.method.rawValue: "POST",
  793. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  794. GRPCHTTP2Keys.te.rawValue: "trailers",
  795. ]
  796. XCTAssertNoThrow(
  797. try channel.writeInbound(
  798. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  799. )
  800. )
  801. // Make sure we haven't sent back an error response, and that we read the initial metadata
  802. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  803. XCTAssertEqual(
  804. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  805. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  806. )
  807. // Channel becomes inactive
  808. channel.pipeline.fireChannelInactive()
  809. // The server handler fires an error
  810. XCTAssertThrowsError(
  811. ofType: RPCError.self,
  812. try channel.throwIfErrorCaught()
  813. ) { error in
  814. XCTAssertEqual(error.code, .unavailable)
  815. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  816. }
  817. // We should now be closed: check we can't write anymore.
  818. XCTAssertThrowsError(
  819. ofType: RPCError.self,
  820. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  821. ) { error in
  822. XCTAssertEqual(error.code, .internalError)
  823. XCTAssertEqual(error.message, "Invalid state")
  824. }
  825. }
  826. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  827. let channel = EmbeddedChannel()
  828. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  829. let handler = self.makeServerStreamHandler(
  830. channel: channel,
  831. descriptorPromise: promise,
  832. disableAssertions: true
  833. )
  834. try channel.pipeline.syncOperations.addHandler(handler)
  835. // Receive client's initial metadata
  836. let clientInitialMetadata: HPACKHeaders = [
  837. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  838. GRPCHTTP2Keys.scheme.rawValue: "http",
  839. GRPCHTTP2Keys.method.rawValue: "POST",
  840. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  841. GRPCHTTP2Keys.te.rawValue: "trailers",
  842. ]
  843. XCTAssertNoThrow(
  844. try channel.writeInbound(
  845. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  846. )
  847. )
  848. // Make sure we haven't sent back an error response, and that we read the initial metadata
  849. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  850. XCTAssertEqual(
  851. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  852. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  853. )
  854. // We receive RST_STREAM frame
  855. // Assert the server handler fires an error
  856. XCTAssertThrowsError(
  857. ofType: RPCError.self,
  858. try channel.writeInbound(
  859. HTTP2Frame.FramePayload.rstStream(.internalError)
  860. )
  861. ) { error in
  862. XCTAssertEqual(error.code, .unavailable)
  863. XCTAssertEqual(
  864. error.message,
  865. "Stream unexpectedly closed: received RST_STREAM frame (0x2: internal error)."
  866. )
  867. }
  868. // We should now be closed: check we can't write anymore.
  869. XCTAssertThrowsError(
  870. ofType: RPCError.self,
  871. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata([:]))
  872. ) { error in
  873. XCTAssertEqual(error.code, .internalError)
  874. XCTAssertEqual(error.message, "Invalid state")
  875. }
  876. }
  877. }
  878. struct ServerStreamHandlerTests {
  879. @available(gRPCSwiftNIOTransport 2.0, *)
  880. struct ConnectionAndStreamHandlers {
  881. let streamHandler: GRPCServerStreamHandler
  882. let connectionHandler: ServerConnectionManagementHandler
  883. }
  884. @available(gRPCSwiftNIOTransport 2.0, *)
  885. private func makeServerConnectionAndStreamHandlers(
  886. channel: any Channel,
  887. scheme: Scheme = .http,
  888. acceptedEncodings: CompressionAlgorithmSet = [],
  889. maxPayloadSize: Int = .max,
  890. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  891. disableAssertions: Bool = false
  892. ) -> ConnectionAndStreamHandlers {
  893. let connectionManagementHandler = ServerConnectionManagementHandler(
  894. eventLoop: channel.eventLoop,
  895. maxIdleTime: nil,
  896. maxAge: nil,
  897. maxGraceTime: nil,
  898. keepaliveTime: nil,
  899. keepaliveTimeout: nil,
  900. allowKeepaliveWithoutCalls: false,
  901. minPingIntervalWithoutCalls: .minutes(5),
  902. requireALPN: false
  903. )
  904. let streamHandler = GRPCServerStreamHandler(
  905. scheme: scheme,
  906. acceptedEncodings: acceptedEncodings,
  907. maxPayloadSize: maxPayloadSize,
  908. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  909. eventLoop: channel.eventLoop,
  910. skipStateMachineAssertions: disableAssertions
  911. )
  912. return ConnectionAndStreamHandlers(
  913. streamHandler: streamHandler,
  914. connectionHandler: connectionManagementHandler
  915. )
  916. }
  917. @Test("ChannelShouldQuiesceEvent is buffered and turns into RPC cancellation")
  918. @available(gRPCSwiftNIOTransport 2.0, *)
  919. func shouldQuiesceEventIsBufferedBeforeHandleIsSet() async throws {
  920. let channel = EmbeddedChannel()
  921. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  922. try channel.pipeline.syncOperations.addHandler(handler)
  923. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  924. await withServerContextRPCCancellationHandle { handle in
  925. handler.setCancellationHandle(handle)
  926. #expect(handle.isCancelled)
  927. }
  928. // Throwing is fine: the channel is closed abruptly, errors are expected.
  929. _ = try? channel.finish()
  930. }
  931. @Test("ChannelShouldQuiesceEvent turns into RPC cancellation")
  932. @available(gRPCSwiftNIOTransport 2.0, *)
  933. func shouldQuiesceEventTriggersCancellation() async throws {
  934. let channel = EmbeddedChannel()
  935. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  936. try channel.pipeline.syncOperations.addHandler(handler)
  937. await withServerContextRPCCancellationHandle { handle in
  938. handler.setCancellationHandle(handle)
  939. #expect(!handle.isCancelled)
  940. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  941. #expect(handle.isCancelled)
  942. }
  943. // Throwing is fine: the channel is closed abruptly, errors are expected.
  944. _ = try? channel.finish()
  945. }
  946. @Test("RST_STREAM turns into RPC cancellation")
  947. @available(gRPCSwiftNIOTransport 2.0, *)
  948. func rstStreamTriggersCancellation() async throws {
  949. let channel = EmbeddedChannel()
  950. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  951. try channel.pipeline.syncOperations.addHandler(handler)
  952. await withServerContextRPCCancellationHandle { handle in
  953. handler.setCancellationHandle(handle)
  954. #expect(!handle.isCancelled)
  955. let rstStream: HTTP2Frame.FramePayload = .rstStream(.cancel)
  956. channel.pipeline.fireChannelRead(rstStream)
  957. #expect(handle.isCancelled)
  958. }
  959. // Throwing is fine: the channel is closed abruptly, errors are expected.
  960. _ = try? channel.finish()
  961. }
  962. }
  963. extension EmbeddedChannel {
  964. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  965. guard
  966. case .headers(let writtenHeaders) = try XCTUnwrap(
  967. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  968. )
  969. else {
  970. throw TestError.assertionFailure("Expected to write headers")
  971. }
  972. return writtenHeaders
  973. }
  974. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  975. guard
  976. case .data(let writtenMessage) = try XCTUnwrap(
  977. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  978. )
  979. else {
  980. throw TestError.assertionFailure("Expected to write data")
  981. }
  982. return writtenMessage
  983. }
  984. }
  985. private enum TestError: Error {
  986. case assertionFailure(String)
  987. }