GRPCServerStreamHandlerTests.swift 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import Testing
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. @available(gRPCSwiftNIOTransport 1.0, *)
  25. final class GRPCServerStreamHandlerTests: XCTestCase {
  26. private func makeServerStreamHandler(
  27. channel: any Channel,
  28. scheme: Scheme = .http,
  29. acceptedEncodings: CompressionAlgorithmSet = [],
  30. maxPayloadSize: Int = .max,
  31. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  32. disableAssertions: Bool = false
  33. ) -> GRPCServerStreamHandler {
  34. let serverConnectionManagementHandler = ServerConnectionManagementHandler(
  35. eventLoop: channel.eventLoop,
  36. maxIdleTime: nil,
  37. maxAge: nil,
  38. maxGraceTime: nil,
  39. keepaliveTime: nil,
  40. keepaliveTimeout: nil,
  41. allowKeepaliveWithoutCalls: false,
  42. minPingIntervalWithoutCalls: .minutes(5),
  43. requireALPN: false
  44. )
  45. return GRPCServerStreamHandler(
  46. scheme: scheme,
  47. acceptedEncodings: acceptedEncodings,
  48. maxPayloadSize: maxPayloadSize,
  49. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  50. eventLoop: channel.eventLoop,
  51. connectionManagementHandler: serverConnectionManagementHandler.syncView,
  52. skipStateMachineAssertions: disableAssertions
  53. )
  54. }
  55. func testH2FramesAreIgnored() throws {
  56. let channel = EmbeddedChannel()
  57. let handler = self.makeServerStreamHandler(channel: channel)
  58. try channel.pipeline.syncOperations.addHandler(handler)
  59. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  60. .ping(.init(), ack: false),
  61. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  62. .priority(
  63. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  64. ),
  65. .settings(.ack),
  66. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  67. .windowUpdate(windowSizeIncrement: 4),
  68. .alternativeService(origin: nil, field: nil),
  69. .origin([]),
  70. ]
  71. for toBeIgnored in framesToBeIgnored {
  72. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  73. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  74. }
  75. }
  76. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  77. let channel = EmbeddedChannel()
  78. let handler = self.makeServerStreamHandler(channel: channel)
  79. try channel.pipeline.syncOperations.addHandler(handler)
  80. // Receive client's initial metadata without content-type
  81. let clientInitialMetadata: HPACKHeaders = [
  82. GRPCHTTP2Keys.path.rawValue: "/test/test",
  83. GRPCHTTP2Keys.scheme.rawValue: "http",
  84. GRPCHTTP2Keys.method.rawValue: "POST",
  85. GRPCHTTP2Keys.te.rawValue: "trailers",
  86. ]
  87. XCTAssertNoThrow(
  88. try channel.writeInbound(
  89. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  90. )
  91. )
  92. // Make sure we have sent a trailers-only response
  93. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  94. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  95. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  96. }
  97. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  98. let channel = EmbeddedChannel()
  99. let handler = self.makeServerStreamHandler(channel: channel)
  100. try channel.pipeline.syncOperations.addHandler(handler)
  101. // Receive client's initial metadata without :method
  102. let clientInitialMetadata: HPACKHeaders = [
  103. GRPCHTTP2Keys.path.rawValue: "/test/test",
  104. GRPCHTTP2Keys.scheme.rawValue: "http",
  105. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  106. GRPCHTTP2Keys.te.rawValue: "trailers",
  107. ]
  108. XCTAssertNoThrow(
  109. try channel.writeInbound(
  110. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  111. )
  112. )
  113. // Make sure we have sent a trailers-only response
  114. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  115. XCTAssertEqual(
  116. writtenTrailersOnlyResponse.headers,
  117. [
  118. GRPCHTTP2Keys.status.rawValue: "200",
  119. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  120. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  121. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  122. ":method header is expected to be present and have a value of \"POST\".",
  123. ]
  124. )
  125. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  126. }
  127. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  128. let channel = EmbeddedChannel()
  129. let handler = self.makeServerStreamHandler(channel: channel)
  130. try channel.pipeline.syncOperations.addHandler(handler)
  131. // Receive client's initial metadata without :scheme
  132. let clientInitialMetadata: HPACKHeaders = [
  133. GRPCHTTP2Keys.path.rawValue: "/test/test",
  134. GRPCHTTP2Keys.method.rawValue: "POST",
  135. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  136. GRPCHTTP2Keys.te.rawValue: "trailers",
  137. ]
  138. XCTAssertNoThrow(
  139. try channel.writeInbound(
  140. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  141. )
  142. )
  143. // Make sure we have sent a trailers-only response
  144. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  145. XCTAssertEqual(
  146. writtenTrailersOnlyResponse.headers,
  147. [
  148. GRPCHTTP2Keys.status.rawValue: "200",
  149. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  150. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  151. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  152. ":scheme header must be present and one of \"http\" or \"https\".",
  153. ]
  154. )
  155. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  156. }
  157. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  158. let channel = EmbeddedChannel()
  159. let handler = self.makeServerStreamHandler(channel: channel)
  160. try channel.pipeline.syncOperations.addHandler(handler)
  161. // Receive client's initial metadata without :path
  162. let clientInitialMetadata: HPACKHeaders = [
  163. GRPCHTTP2Keys.scheme.rawValue: "http",
  164. GRPCHTTP2Keys.method.rawValue: "POST",
  165. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  166. GRPCHTTP2Keys.te.rawValue: "trailers",
  167. ]
  168. XCTAssertNoThrow(
  169. try channel.writeInbound(
  170. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  171. )
  172. )
  173. // Make sure we have sent a trailers-only response
  174. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  175. XCTAssertEqual(
  176. writtenTrailersOnlyResponse.headers,
  177. [
  178. GRPCHTTP2Keys.status.rawValue: "200",
  179. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  180. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  181. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  182. ]
  183. )
  184. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  185. }
  186. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  187. let channel = EmbeddedChannel()
  188. let handler = self.makeServerStreamHandler(channel: channel)
  189. try channel.pipeline.syncOperations.addHandler(handler)
  190. // Receive client's initial metadata
  191. let clientInitialMetadata: HPACKHeaders = [
  192. GRPCHTTP2Keys.path.rawValue: "/test/test",
  193. GRPCHTTP2Keys.scheme.rawValue: "http",
  194. GRPCHTTP2Keys.method.rawValue: "POST",
  195. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  196. GRPCHTTP2Keys.te.rawValue: "trailers",
  197. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  198. ]
  199. XCTAssertNoThrow(
  200. try channel.writeInbound(
  201. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  202. )
  203. )
  204. // Make sure we have sent a trailers-only response
  205. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  206. XCTAssertEqual(
  207. writtenTrailersOnlyResponse.headers,
  208. [
  209. GRPCHTTP2Keys.status.rawValue: "200",
  210. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  211. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  212. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  213. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  214. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  215. ]
  216. )
  217. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  218. }
  219. func testOverMaximumPayloadSize() throws {
  220. let channel = EmbeddedChannel()
  221. let handler = self.makeServerStreamHandler(channel: channel, maxPayloadSize: 1)
  222. try channel.pipeline.syncOperations.addHandler(handler)
  223. // Receive client's initial metadata
  224. let clientInitialMetadata: HPACKHeaders = [
  225. GRPCHTTP2Keys.path.rawValue: "/test/test",
  226. GRPCHTTP2Keys.scheme.rawValue: "http",
  227. GRPCHTTP2Keys.method.rawValue: "POST",
  228. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  229. GRPCHTTP2Keys.te.rawValue: "trailers",
  230. ]
  231. XCTAssertNoThrow(
  232. try channel.writeInbound(
  233. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  234. )
  235. )
  236. // Make sure we haven't sent back an error response, and that we read the initial metadata
  237. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  238. XCTAssertEqual(
  239. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  240. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  241. )
  242. // Write back server's initial metadata
  243. let headers: HPACKHeaders = [
  244. "some-custom-header": "some-custom-value"
  245. ]
  246. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  247. Metadata(headers: headers)
  248. )
  249. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  250. // Make sure we wrote back the initial metadata
  251. let writtenHeaders = try channel.assertReadHeadersOutbound()
  252. XCTAssertEqual(
  253. writtenHeaders.headers,
  254. [
  255. GRPCHTTP2Keys.status.rawValue: "200",
  256. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  257. "some-custom-header": "some-custom-value",
  258. ]
  259. )
  260. // Receive client's message
  261. var buffer = ByteBuffer()
  262. buffer.writeInteger(UInt8(0)) // not compressed
  263. buffer.writeInteger(UInt32(42)) // message length
  264. buffer.writeRepeatingByte(0, count: 42) // message
  265. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  266. XCTAssertThrowsError(
  267. ofType: RPCError.self,
  268. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  269. ) { error in
  270. XCTAssertEqual(error.code, .internalError)
  271. XCTAssertEqual(error.message, "Failed to decode message")
  272. }
  273. // Make sure we haven't sent a response back and that we didn't read the received message
  274. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  275. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  276. }
  277. func testClientEndsStream() throws {
  278. let channel = EmbeddedChannel()
  279. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  280. try channel.pipeline.syncOperations.addHandler(handler)
  281. // Receive client's initial metadata with end stream set
  282. let clientInitialMetadata: HPACKHeaders = [
  283. GRPCHTTP2Keys.path.rawValue: "/test/test",
  284. GRPCHTTP2Keys.scheme.rawValue: "http",
  285. GRPCHTTP2Keys.method.rawValue: "POST",
  286. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  287. GRPCHTTP2Keys.te.rawValue: "trailers",
  288. ]
  289. XCTAssertNoThrow(
  290. try channel.writeInbound(
  291. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  292. )
  293. )
  294. // Make sure we haven't sent back an error response, and that we read the initial metadata
  295. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  296. XCTAssertEqual(
  297. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  298. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  299. )
  300. // Write back server's initial metadata
  301. let headers: HPACKHeaders = [
  302. "some-custom-header": "some-custom-value"
  303. ]
  304. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  305. Metadata(headers: headers)
  306. )
  307. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  308. // Make sure we wrote back the initial metadata
  309. let writtenHeaders = try channel.assertReadHeadersOutbound()
  310. XCTAssertEqual(
  311. writtenHeaders.headers,
  312. [
  313. GRPCHTTP2Keys.status.rawValue: "200",
  314. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  315. "some-custom-header": "some-custom-value",
  316. ]
  317. )
  318. // We should throw if the client sends another message, since it's closed the stream already.
  319. var buffer = ByteBuffer()
  320. buffer.writeInteger(UInt8(0)) // not compressed
  321. buffer.writeInteger(UInt32(42)) // message length
  322. buffer.writeRepeatingByte(0, count: 42) // message
  323. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  324. XCTAssertThrowsError(
  325. ofType: RPCError.self,
  326. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  327. ) { error in
  328. XCTAssertEqual(error.code, .internalError)
  329. XCTAssertEqual(error.message, "Invalid state")
  330. }
  331. }
  332. func testNormalFlow() throws {
  333. let channel = EmbeddedChannel()
  334. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  335. try channel.pipeline.syncOperations.addHandler(handler)
  336. // Receive client's initial metadata
  337. let clientInitialMetadata: HPACKHeaders = [
  338. GRPCHTTP2Keys.path.rawValue: "/test/test",
  339. GRPCHTTP2Keys.scheme.rawValue: "http",
  340. GRPCHTTP2Keys.method.rawValue: "POST",
  341. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  342. GRPCHTTP2Keys.te.rawValue: "trailers",
  343. ]
  344. XCTAssertNoThrow(
  345. try channel.writeInbound(
  346. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  347. )
  348. )
  349. // Make sure we haven't sent back an error response, and that we read the initial metadata
  350. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  351. XCTAssertEqual(
  352. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  353. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  354. )
  355. // Write back server's initial metadata
  356. let headers: HPACKHeaders = [
  357. "some-custom-header": "some-custom-value"
  358. ]
  359. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  360. Metadata(headers: headers)
  361. )
  362. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  363. // Make sure we wrote back the initial metadata
  364. let writtenHeaders = try channel.assertReadHeadersOutbound()
  365. XCTAssertEqual(
  366. writtenHeaders.headers,
  367. [
  368. GRPCHTTP2Keys.status.rawValue: "200",
  369. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  370. "some-custom-header": "some-custom-value",
  371. ]
  372. )
  373. // Receive client's message
  374. var buffer = ByteBuffer()
  375. buffer.writeInteger(UInt8(0)) // not compressed
  376. buffer.writeInteger(UInt32(42)) // message length
  377. buffer.writeRepeatingByte(0, count: 42) // message
  378. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  379. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  380. // Make sure we haven't sent back an error response, and that we read the message properly
  381. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  382. XCTAssertEqual(
  383. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  384. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  385. )
  386. // Write back response
  387. let serverDataPayload = RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  388. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  389. // Make sure we wrote back the right message
  390. let writtenMessage = try channel.assertReadDataOutbound()
  391. var expectedBuffer = ByteBuffer()
  392. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  393. expectedBuffer.writeInteger(UInt32(42)) // message length
  394. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  395. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  396. // Send back status to end RPC
  397. let trailers = RPCResponsePart<GRPCNIOTransportBytes>.status(
  398. .init(code: .dataLoss, message: "Test data loss"),
  399. ["custom-header": "custom-value"]
  400. )
  401. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  402. // Make sure we wrote back the status and trailers
  403. let writtenStatus = try channel.assertReadHeadersOutbound()
  404. XCTAssertTrue(writtenStatus.endStream)
  405. XCTAssertEqual(
  406. writtenStatus.headers,
  407. [
  408. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  409. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  410. "custom-header": "custom-value",
  411. ]
  412. )
  413. // Try writing and assert it throws to make sure we don't allow writes
  414. // after closing.
  415. XCTAssertThrowsError(
  416. ofType: RPCError.self,
  417. try channel.writeOutbound(trailers)
  418. ) { error in
  419. XCTAssertEqual(error.code, .internalError)
  420. XCTAssertEqual(error.message, "Invalid state")
  421. }
  422. }
  423. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  424. let channel = EmbeddedChannel()
  425. let handler = self.makeServerStreamHandler(channel: channel)
  426. try channel.pipeline.syncOperations.addHandler(handler)
  427. // Receive client's initial metadata
  428. let clientInitialMetadata: HPACKHeaders = [
  429. GRPCHTTP2Keys.path.rawValue: "/test/test",
  430. GRPCHTTP2Keys.scheme.rawValue: "http",
  431. GRPCHTTP2Keys.method.rawValue: "POST",
  432. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  433. GRPCHTTP2Keys.te.rawValue: "trailers",
  434. ]
  435. XCTAssertNoThrow(
  436. try channel.writeInbound(
  437. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  438. )
  439. )
  440. // Make sure we haven't sent back an error response, and that we read the initial metadata
  441. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  442. XCTAssertEqual(
  443. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  444. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  445. )
  446. // Write back server's initial metadata
  447. let headers: HPACKHeaders = [
  448. "some-custom-header": "some-custom-value"
  449. ]
  450. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  451. Metadata(headers: headers)
  452. )
  453. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  454. // Make sure we wrote back the initial metadata
  455. let writtenHeaders = try channel.assertReadHeadersOutbound()
  456. XCTAssertEqual(
  457. writtenHeaders.headers,
  458. [
  459. GRPCHTTP2Keys.status.rawValue: "200",
  460. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  461. "some-custom-header": "some-custom-value",
  462. ]
  463. )
  464. // Receive client's first message
  465. var buffer = ByteBuffer()
  466. buffer.writeInteger(UInt8(0)) // not compressed
  467. XCTAssertNoThrow(
  468. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  469. )
  470. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  471. buffer.clear()
  472. buffer.writeInteger(UInt32(30)) // message length
  473. XCTAssertNoThrow(
  474. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  475. )
  476. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  477. buffer.clear()
  478. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  479. XCTAssertNoThrow(
  480. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  481. )
  482. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  483. buffer.clear()
  484. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  485. XCTAssertNoThrow(
  486. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  487. )
  488. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  489. buffer.clear()
  490. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  491. XCTAssertNoThrow(
  492. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  493. )
  494. var expected = ByteBuffer()
  495. expected.writeRepeatingByte(0, count: 10)
  496. expected.writeRepeatingByte(1, count: 10)
  497. expected.writeRepeatingByte(2, count: 10)
  498. // Make sure we haven't sent back an error response, and that we read the message properly
  499. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  500. XCTAssertEqual(
  501. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  502. RPCRequestPart.message(GRPCNIOTransportBytes(expected))
  503. )
  504. }
  505. func testReceiveMultipleHeaders() throws {
  506. let channel = EmbeddedChannel()
  507. let handler = self.makeServerStreamHandler(channel: channel)
  508. try channel.pipeline.syncOperations.addHandler(handler)
  509. // Receive client's initial metadata
  510. let clientInitialMetadata: HPACKHeaders = [
  511. GRPCHTTP2Keys.path.rawValue: "/test/test",
  512. GRPCHTTP2Keys.scheme.rawValue: "http",
  513. GRPCHTTP2Keys.method.rawValue: "POST",
  514. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  515. GRPCHTTP2Keys.te.rawValue: "trailers",
  516. ]
  517. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  518. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  519. // Receive them again. Should be a protocol violation.
  520. XCTAssertThrowsError(
  521. ofType: RPCError.self,
  522. try channel.writeInbound(
  523. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  524. )
  525. ) { error in
  526. XCTAssertEqual(error.code, .unavailable)
  527. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  528. }
  529. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  530. switch payload {
  531. case .rstStream(let errorCode):
  532. XCTAssertEqual(errorCode, .protocolError)
  533. default:
  534. XCTFail("Expected RST_STREAM, got \(payload)")
  535. }
  536. }
  537. func testSendMultipleMessagesInSingleBuffer() throws {
  538. let channel = EmbeddedChannel()
  539. let handler = self.makeServerStreamHandler(channel: channel)
  540. try channel.pipeline.syncOperations.addHandler(handler)
  541. // Receive client's initial metadata
  542. let clientInitialMetadata: HPACKHeaders = [
  543. GRPCHTTP2Keys.path.rawValue: "/test/test",
  544. GRPCHTTP2Keys.scheme.rawValue: "http",
  545. GRPCHTTP2Keys.method.rawValue: "POST",
  546. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  547. GRPCHTTP2Keys.te.rawValue: "trailers",
  548. ]
  549. XCTAssertNoThrow(
  550. try channel.writeInbound(
  551. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  552. )
  553. )
  554. // Make sure we haven't sent back an error response, and that we read the initial metadata
  555. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  556. XCTAssertEqual(
  557. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  558. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  559. )
  560. // Write back server's initial metadata
  561. let headers: HPACKHeaders = [
  562. "some-custom-header": "some-custom-value"
  563. ]
  564. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  565. Metadata(headers: headers)
  566. )
  567. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  568. // Read out the metadata
  569. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  570. // This is where this test actually begins. We want to write two messages
  571. // without flushing, and make sure that no messages are sent down the pipeline
  572. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  573. // Write back first message and make sure nothing's written in the channel.
  574. XCTAssertNoThrow(
  575. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  576. )
  577. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  578. // Write back second message and make sure nothing's written in the channel.
  579. XCTAssertNoThrow(
  580. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  581. )
  582. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  583. // Now flush and check we *do* write the data.
  584. channel.flush()
  585. let writtenMessage = try channel.assertReadDataOutbound()
  586. // Make sure both messages have been framed together in the ByteBuffer.
  587. XCTAssertEqual(
  588. writtenMessage.data,
  589. .byteBuffer(
  590. .init(bytes: [
  591. // First message
  592. 0, // Compression disabled
  593. 0, 0, 0, 4, // Message length
  594. 1, 1, 1, 1, // First message data
  595. // Second message
  596. 0, // Compression disabled
  597. 0, 0, 0, 4, // Message length
  598. 2, 2, 2, 2, // Second message data
  599. ])
  600. )
  601. )
  602. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  603. }
  604. func testMessageAndStatusAreNotReordered() throws {
  605. let channel = EmbeddedChannel()
  606. let handler = self.makeServerStreamHandler(channel: channel)
  607. try channel.pipeline.syncOperations.addHandler(handler)
  608. // Receive client's initial metadata
  609. let clientInitialMetadata: HPACKHeaders = [
  610. GRPCHTTP2Keys.path.rawValue: "/test/test",
  611. GRPCHTTP2Keys.scheme.rawValue: "http",
  612. GRPCHTTP2Keys.method.rawValue: "POST",
  613. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  614. GRPCHTTP2Keys.te.rawValue: "trailers",
  615. ]
  616. XCTAssertNoThrow(
  617. try channel.writeInbound(
  618. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  619. )
  620. )
  621. // Make sure we haven't sent back an error response, and that we read the initial metadata
  622. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  623. XCTAssertEqual(
  624. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  625. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  626. )
  627. // Write back server's initial metadata
  628. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  629. Metadata(headers: [:])
  630. )
  631. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  632. // Read out the metadata
  633. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  634. // This is where this test actually begins. We want to write a message followed
  635. // by status and trailers, and only flush after both writes.
  636. // Because messages are buffered and potentially bundled together in a single
  637. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  638. // and trailers won't be written before the messages.
  639. // Write back message and make sure nothing's written in the channel.
  640. XCTAssertNoThrow(
  641. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  642. )
  643. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  644. // Write status + metadata and make sure nothing's written.
  645. XCTAssertNoThrow(
  646. channel.write(
  647. RPCResponsePart<GRPCNIOTransportBytes>.status(.init(code: .ok, message: ""), [:])
  648. )
  649. )
  650. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  651. // Now flush and check we *do* write the data in the right order: message first,
  652. // trailers second.
  653. channel.flush()
  654. let writtenMessage = try channel.assertReadDataOutbound()
  655. // Make sure we first get message.
  656. XCTAssertEqual(
  657. writtenMessage.data,
  658. .byteBuffer(
  659. .init(bytes: [
  660. // First message
  661. 0, // Compression disabled
  662. 0, 0, 0, 4, // Message length
  663. 1, 1, 1, 1, // First message data
  664. ])
  665. )
  666. )
  667. XCTAssertFalse(writtenMessage.endStream)
  668. // Make sure we get trailers.
  669. let writtenTrailers = try channel.assertReadHeadersOutbound()
  670. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  671. XCTAssertTrue(writtenTrailers.endStream)
  672. // Make sure we get nothing else.
  673. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  674. }
  675. func testMethodDescriptorPromiseSucceeds() throws {
  676. let channel = EmbeddedChannel()
  677. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  678. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  679. try channel.pipeline.syncOperations.addHandler(handler)
  680. // Receive client's initial metadata
  681. let clientInitialMetadata: HPACKHeaders = [
  682. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  683. GRPCHTTP2Keys.scheme.rawValue: "http",
  684. GRPCHTTP2Keys.method.rawValue: "POST",
  685. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  686. GRPCHTTP2Keys.te.rawValue: "trailers",
  687. ]
  688. XCTAssertNoThrow(
  689. try channel.writeInbound(
  690. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  691. )
  692. )
  693. // Make sure we haven't sent back an error response, and that we read the initial metadata
  694. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  695. XCTAssertEqual(
  696. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  697. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  698. )
  699. XCTAssertEqual(
  700. try promise.futureResult.wait(),
  701. MethodDescriptor(fullyQualifiedService: "SomeService", method: "SomeMethod")
  702. )
  703. }
  704. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  705. let channel = EmbeddedChannel()
  706. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  707. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  708. try channel.pipeline.syncOperations.addHandler(handler)
  709. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  710. XCTAssertThrowsError(
  711. ofType: RPCError.self,
  712. try promise.futureResult.wait()
  713. ) { error in
  714. XCTAssertEqual(error.code, .unavailable)
  715. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  716. }
  717. }
  718. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  719. let channel = EmbeddedChannel()
  720. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  721. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  722. try channel.pipeline.syncOperations.addHandler(handler)
  723. // Receive client's initial metadata
  724. let clientInitialMetadata: HPACKHeaders = [
  725. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  726. GRPCHTTP2Keys.scheme.rawValue: "http",
  727. GRPCHTTP2Keys.method.rawValue: "POST",
  728. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  729. GRPCHTTP2Keys.te.rawValue: "trailers",
  730. ]
  731. XCTAssertNoThrow(
  732. try channel.writeInbound(
  733. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  734. )
  735. )
  736. XCTAssertThrowsError(
  737. ofType: RPCError.self,
  738. try promise.futureResult.wait()
  739. ) { error in
  740. XCTAssertEqual(error.code, .unavailable)
  741. XCTAssertEqual(error.message, "RPC was rejected.")
  742. }
  743. }
  744. func testUnexpectedStreamClose_ErrorFired() throws {
  745. let channel = EmbeddedChannel()
  746. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  747. let handler = self.makeServerStreamHandler(
  748. channel: channel,
  749. descriptorPromise: promise,
  750. disableAssertions: true
  751. )
  752. try channel.pipeline.syncOperations.addHandler(handler)
  753. // Receive client's initial metadata
  754. let clientInitialMetadata: HPACKHeaders = [
  755. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  756. GRPCHTTP2Keys.scheme.rawValue: "http",
  757. GRPCHTTP2Keys.method.rawValue: "POST",
  758. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  759. GRPCHTTP2Keys.te.rawValue: "trailers",
  760. ]
  761. XCTAssertNoThrow(
  762. try channel.writeInbound(
  763. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  764. )
  765. )
  766. // Make sure we haven't sent back an error response, and that we read the initial metadata
  767. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  768. XCTAssertEqual(
  769. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  770. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  771. )
  772. // An error is fired down the pipeline
  773. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  774. channel.pipeline.fireErrorCaught(thrownError)
  775. // The server handler simply forwards the error.
  776. XCTAssertThrowsError(
  777. ofType: type(of: thrownError),
  778. try channel.throwIfErrorCaught()
  779. ) { error in
  780. XCTAssertEqual(error, thrownError)
  781. }
  782. // We should now be closed: check we can't write anymore.
  783. XCTAssertThrowsError(
  784. ofType: RPCError.self,
  785. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  786. ) { error in
  787. XCTAssertEqual(error.code, .internalError)
  788. XCTAssertEqual(error.message, "Invalid state")
  789. }
  790. }
  791. func testUnexpectedStreamClose_ChannelInactive() throws {
  792. let channel = EmbeddedChannel()
  793. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  794. let handler = self.makeServerStreamHandler(
  795. channel: channel,
  796. descriptorPromise: promise,
  797. disableAssertions: true
  798. )
  799. try channel.pipeline.syncOperations.addHandler(handler)
  800. // Receive client's initial metadata
  801. let clientInitialMetadata: HPACKHeaders = [
  802. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  803. GRPCHTTP2Keys.scheme.rawValue: "http",
  804. GRPCHTTP2Keys.method.rawValue: "POST",
  805. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  806. GRPCHTTP2Keys.te.rawValue: "trailers",
  807. ]
  808. XCTAssertNoThrow(
  809. try channel.writeInbound(
  810. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  811. )
  812. )
  813. // Make sure we haven't sent back an error response, and that we read the initial metadata
  814. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  815. XCTAssertEqual(
  816. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  817. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  818. )
  819. // Channel becomes inactive
  820. channel.pipeline.fireChannelInactive()
  821. // The server handler fires an error
  822. XCTAssertThrowsError(
  823. ofType: RPCError.self,
  824. try channel.throwIfErrorCaught()
  825. ) { error in
  826. XCTAssertEqual(error.code, .unavailable)
  827. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  828. }
  829. // We should now be closed: check we can't write anymore.
  830. XCTAssertThrowsError(
  831. ofType: RPCError.self,
  832. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  833. ) { error in
  834. XCTAssertEqual(error.code, .internalError)
  835. XCTAssertEqual(error.message, "Invalid state")
  836. }
  837. }
  838. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  839. let channel = EmbeddedChannel()
  840. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  841. let handler = self.makeServerStreamHandler(
  842. channel: channel,
  843. descriptorPromise: promise,
  844. disableAssertions: true
  845. )
  846. try channel.pipeline.syncOperations.addHandler(handler)
  847. // Receive client's initial metadata
  848. let clientInitialMetadata: HPACKHeaders = [
  849. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  850. GRPCHTTP2Keys.scheme.rawValue: "http",
  851. GRPCHTTP2Keys.method.rawValue: "POST",
  852. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  853. GRPCHTTP2Keys.te.rawValue: "trailers",
  854. ]
  855. XCTAssertNoThrow(
  856. try channel.writeInbound(
  857. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  858. )
  859. )
  860. // Make sure we haven't sent back an error response, and that we read the initial metadata
  861. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  862. XCTAssertEqual(
  863. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  864. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  865. )
  866. // We receive RST_STREAM frame
  867. // Assert the server handler fires an error
  868. XCTAssertThrowsError(
  869. ofType: RPCError.self,
  870. try channel.writeInbound(
  871. HTTP2Frame.FramePayload.rstStream(.internalError)
  872. )
  873. ) { error in
  874. XCTAssertEqual(error.code, .unavailable)
  875. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  876. }
  877. // We should now be closed: check we can't write anymore.
  878. XCTAssertThrowsError(
  879. ofType: RPCError.self,
  880. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata([:]))
  881. ) { error in
  882. XCTAssertEqual(error.code, .internalError)
  883. XCTAssertEqual(error.message, "Invalid state")
  884. }
  885. }
  886. }
  887. struct ServerStreamHandlerTests {
  888. @available(gRPCSwiftNIOTransport 1.0, *)
  889. struct ConnectionAndStreamHandlers {
  890. let streamHandler: GRPCServerStreamHandler
  891. let connectionHandler: ServerConnectionManagementHandler
  892. }
  893. @available(gRPCSwiftNIOTransport 1.0, *)
  894. private func makeServerConnectionAndStreamHandlers(
  895. channel: any Channel,
  896. scheme: Scheme = .http,
  897. acceptedEncodings: CompressionAlgorithmSet = [],
  898. maxPayloadSize: Int = .max,
  899. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  900. disableAssertions: Bool = false
  901. ) -> ConnectionAndStreamHandlers {
  902. let connectionManagementHandler = ServerConnectionManagementHandler(
  903. eventLoop: channel.eventLoop,
  904. maxIdleTime: nil,
  905. maxAge: nil,
  906. maxGraceTime: nil,
  907. keepaliveTime: nil,
  908. keepaliveTimeout: nil,
  909. allowKeepaliveWithoutCalls: false,
  910. minPingIntervalWithoutCalls: .minutes(5),
  911. requireALPN: false
  912. )
  913. let streamHandler = GRPCServerStreamHandler(
  914. scheme: scheme,
  915. acceptedEncodings: acceptedEncodings,
  916. maxPayloadSize: maxPayloadSize,
  917. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  918. eventLoop: channel.eventLoop,
  919. connectionManagementHandler: connectionManagementHandler.syncView,
  920. skipStateMachineAssertions: disableAssertions
  921. )
  922. return ConnectionAndStreamHandlers(
  923. streamHandler: streamHandler,
  924. connectionHandler: connectionManagementHandler
  925. )
  926. }
  927. @Test("ChannelShouldQuiesceEvent is buffered and turns into RPC cancellation")
  928. @available(gRPCSwiftNIOTransport 1.0, *)
  929. func shouldQuiesceEventIsBufferedBeforeHandleIsSet() async throws {
  930. let channel = EmbeddedChannel()
  931. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  932. try channel.pipeline.syncOperations.addHandler(handler)
  933. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  934. await withServerContextRPCCancellationHandle { handle in
  935. handler.setCancellationHandle(handle)
  936. #expect(handle.isCancelled)
  937. }
  938. // Throwing is fine: the channel is closed abruptly, errors are expected.
  939. _ = try? channel.finish()
  940. }
  941. @Test("ChannelShouldQuiesceEvent turns into RPC cancellation")
  942. @available(gRPCSwiftNIOTransport 1.0, *)
  943. func shouldQuiesceEventTriggersCancellation() async throws {
  944. let channel = EmbeddedChannel()
  945. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  946. try channel.pipeline.syncOperations.addHandler(handler)
  947. await withServerContextRPCCancellationHandle { handle in
  948. handler.setCancellationHandle(handle)
  949. #expect(!handle.isCancelled)
  950. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  951. #expect(handle.isCancelled)
  952. }
  953. // Throwing is fine: the channel is closed abruptly, errors are expected.
  954. _ = try? channel.finish()
  955. }
  956. @Test("RST_STREAM turns into RPC cancellation")
  957. @available(gRPCSwiftNIOTransport 1.0, *)
  958. func rstStreamTriggersCancellation() async throws {
  959. let channel = EmbeddedChannel()
  960. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  961. try channel.pipeline.syncOperations.addHandler(handler)
  962. await withServerContextRPCCancellationHandle { handle in
  963. handler.setCancellationHandle(handle)
  964. #expect(!handle.isCancelled)
  965. let rstStream: HTTP2Frame.FramePayload = .rstStream(.cancel)
  966. channel.pipeline.fireChannelRead(rstStream)
  967. #expect(handle.isCancelled)
  968. }
  969. // Throwing is fine: the channel is closed abruptly, errors are expected.
  970. _ = try? channel.finish()
  971. }
  972. @Test("Connection FrameStats are updated when writing headers or data frames")
  973. @available(gRPCSwiftNIOTransport 1.0, *)
  974. func connectionFrameStatsAreUpdatedAccordingly() async throws {
  975. let channel = EmbeddedChannel()
  976. let handlers = self.makeServerConnectionAndStreamHandlers(channel: channel)
  977. try channel.pipeline.syncOperations.addHandler(handlers.streamHandler)
  978. // We have written nothing yet, so expect FrameStats/didWriteHeadersOrData to be false
  979. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  980. // FrameStats aren't affected by pings received
  981. channel.pipeline.fireChannelRead(
  982. HTTP2Frame.FramePayload.ping(.init(withInteger: 42), ack: false)
  983. )
  984. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  985. // Now write back headers and make sure FrameStats are updated accordingly:
  986. // To do that, we first need to receive client's initial metadata...
  987. let clientInitialMetadata: HPACKHeaders = [
  988. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  989. GRPCHTTP2Keys.scheme.rawValue: "http",
  990. GRPCHTTP2Keys.method.rawValue: "POST",
  991. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  992. GRPCHTTP2Keys.te.rawValue: "trailers",
  993. ]
  994. try channel.writeInbound(
  995. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  996. )
  997. // Now we write back server's initial metadata...
  998. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata([:])
  999. try channel.writeOutbound(serverInitialMetadata)
  1000. // And this should have updated the FrameStats
  1001. #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  1002. // Manually reset the FrameStats to make sure that writing data also updates it correctly.
  1003. handlers.connectionHandler.frameStats.reset()
  1004. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  1005. try channel.writeOutbound(RPCResponsePart.message(GRPCNIOTransportBytes([42])))
  1006. #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  1007. // Clean up.
  1008. // Throwing is fine: the channel is closed abruptly, errors are expected.
  1009. _ = try? channel.finish()
  1010. }
  1011. }
  1012. extension EmbeddedChannel {
  1013. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  1014. guard
  1015. case .headers(let writtenHeaders) = try XCTUnwrap(
  1016. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  1017. )
  1018. else {
  1019. throw TestError.assertionFailure("Expected to write headers")
  1020. }
  1021. return writtenHeaders
  1022. }
  1023. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  1024. guard
  1025. case .data(let writtenMessage) = try XCTUnwrap(
  1026. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  1027. )
  1028. else {
  1029. throw TestError.assertionFailure("Expected to write data")
  1030. }
  1031. return writtenMessage
  1032. }
  1033. }
  1034. private enum TestError: Error {
  1035. case assertionFailure(String)
  1036. }