GRPCServerStreamHandlerTests.swift 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let channel = EmbeddedChannel()
  27. let handler = GRPCServerStreamHandler(
  28. scheme: .http,
  29. acceptedEncodings: [],
  30. maximumPayloadSize: 1,
  31. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  32. )
  33. try channel.pipeline.syncOperations.addHandler(handler)
  34. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  35. .ping(.init(), ack: false),
  36. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  37. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  38. // .priority(
  39. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  40. // ),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  53. let channel = EmbeddedChannel()
  54. let handler = GRPCServerStreamHandler(
  55. scheme: .http,
  56. acceptedEncodings: [],
  57. maximumPayloadSize: 1,
  58. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  59. )
  60. try channel.pipeline.syncOperations.addHandler(handler)
  61. // Receive client's initial metadata without content-type
  62. let clientInitialMetadata: HPACKHeaders = [
  63. GRPCHTTP2Keys.path.rawValue: "/test/test",
  64. GRPCHTTP2Keys.scheme.rawValue: "http",
  65. GRPCHTTP2Keys.method.rawValue: "POST",
  66. GRPCHTTP2Keys.te.rawValue: "trailers",
  67. ]
  68. XCTAssertNoThrow(
  69. try channel.writeInbound(
  70. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  71. )
  72. )
  73. // Make sure we have sent a trailers-only response
  74. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  75. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  76. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  77. }
  78. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  79. let channel = EmbeddedChannel()
  80. let handler = GRPCServerStreamHandler(
  81. scheme: .http,
  82. acceptedEncodings: [],
  83. maximumPayloadSize: 1,
  84. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  85. )
  86. try channel.pipeline.syncOperations.addHandler(handler)
  87. // Receive client's initial metadata without :method
  88. let clientInitialMetadata: HPACKHeaders = [
  89. GRPCHTTP2Keys.path.rawValue: "/test/test",
  90. GRPCHTTP2Keys.scheme.rawValue: "http",
  91. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  92. GRPCHTTP2Keys.te.rawValue: "trailers",
  93. ]
  94. XCTAssertNoThrow(
  95. try channel.writeInbound(
  96. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  97. )
  98. )
  99. // Make sure we have sent a trailers-only response
  100. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  101. XCTAssertEqual(
  102. writtenTrailersOnlyResponse.headers,
  103. [
  104. GRPCHTTP2Keys.status.rawValue: "200",
  105. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  106. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  107. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  108. ":method header is expected to be present and have a value of \"POST\".",
  109. ]
  110. )
  111. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  112. }
  113. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  114. let channel = EmbeddedChannel()
  115. let handler = GRPCServerStreamHandler(
  116. scheme: .http,
  117. acceptedEncodings: [],
  118. maximumPayloadSize: 1,
  119. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  120. )
  121. try channel.pipeline.syncOperations.addHandler(handler)
  122. // Receive client's initial metadata without :scheme
  123. let clientInitialMetadata: HPACKHeaders = [
  124. GRPCHTTP2Keys.path.rawValue: "/test/test",
  125. GRPCHTTP2Keys.method.rawValue: "POST",
  126. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  127. GRPCHTTP2Keys.te.rawValue: "trailers",
  128. ]
  129. XCTAssertNoThrow(
  130. try channel.writeInbound(
  131. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  132. )
  133. )
  134. // Make sure we have sent a trailers-only response
  135. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  136. XCTAssertEqual(
  137. writtenTrailersOnlyResponse.headers,
  138. [
  139. GRPCHTTP2Keys.status.rawValue: "200",
  140. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  141. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  142. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  143. ":scheme header must be present and one of \"http\" or \"https\".",
  144. ]
  145. )
  146. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  147. }
  148. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  149. let channel = EmbeddedChannel()
  150. let handler = GRPCServerStreamHandler(
  151. scheme: .http,
  152. acceptedEncodings: [],
  153. maximumPayloadSize: 1,
  154. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  155. )
  156. try channel.pipeline.syncOperations.addHandler(handler)
  157. // Receive client's initial metadata without :path
  158. let clientInitialMetadata: HPACKHeaders = [
  159. GRPCHTTP2Keys.scheme.rawValue: "http",
  160. GRPCHTTP2Keys.method.rawValue: "POST",
  161. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  162. GRPCHTTP2Keys.te.rawValue: "trailers",
  163. ]
  164. XCTAssertNoThrow(
  165. try channel.writeInbound(
  166. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  167. )
  168. )
  169. // Make sure we have sent a trailers-only response
  170. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  171. XCTAssertEqual(
  172. writtenTrailersOnlyResponse.headers,
  173. [
  174. GRPCHTTP2Keys.status.rawValue: "200",
  175. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  176. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  177. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  178. ]
  179. )
  180. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  181. }
  182. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  183. let channel = EmbeddedChannel()
  184. let handler = GRPCServerStreamHandler(
  185. scheme: .http,
  186. acceptedEncodings: [],
  187. maximumPayloadSize: 1,
  188. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  189. )
  190. try channel.pipeline.syncOperations.addHandler(handler)
  191. // Receive client's initial metadata without TE
  192. let clientInitialMetadata: HPACKHeaders = [
  193. GRPCHTTP2Keys.path.rawValue: "/test/test",
  194. GRPCHTTP2Keys.scheme.rawValue: "http",
  195. GRPCHTTP2Keys.method.rawValue: "POST",
  196. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  197. ]
  198. XCTAssertNoThrow(
  199. try channel.writeInbound(
  200. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  201. )
  202. )
  203. // Make sure we have sent a trailers-only response
  204. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  205. XCTAssertEqual(
  206. writtenTrailersOnlyResponse.headers,
  207. [
  208. GRPCHTTP2Keys.status.rawValue: "200",
  209. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  210. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  211. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  212. "\"te\" header is expected to be present and have a value of \"trailers\".",
  213. ]
  214. )
  215. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  216. }
  217. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  218. let channel = EmbeddedChannel()
  219. let handler = GRPCServerStreamHandler(
  220. scheme: .http,
  221. acceptedEncodings: [],
  222. maximumPayloadSize: 100,
  223. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  224. )
  225. try channel.pipeline.syncOperations.addHandler(handler)
  226. // Receive client's initial metadata
  227. let clientInitialMetadata: HPACKHeaders = [
  228. GRPCHTTP2Keys.path.rawValue: "/test/test",
  229. GRPCHTTP2Keys.scheme.rawValue: "http",
  230. GRPCHTTP2Keys.method.rawValue: "POST",
  231. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  232. GRPCHTTP2Keys.te.rawValue: "trailers",
  233. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  234. ]
  235. XCTAssertNoThrow(
  236. try channel.writeInbound(
  237. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  238. )
  239. )
  240. // Make sure we have sent a trailers-only response
  241. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  242. XCTAssertEqual(
  243. writtenTrailersOnlyResponse.headers,
  244. [
  245. GRPCHTTP2Keys.status.rawValue: "200",
  246. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  247. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  248. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  249. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  250. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  251. ]
  252. )
  253. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  254. }
  255. func testOverMaximumPayloadSize() throws {
  256. let channel = EmbeddedChannel()
  257. let handler = GRPCServerStreamHandler(
  258. scheme: .http,
  259. acceptedEncodings: [],
  260. maximumPayloadSize: 1,
  261. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  262. )
  263. try channel.pipeline.syncOperations.addHandler(handler)
  264. // Receive client's initial metadata
  265. let clientInitialMetadata: HPACKHeaders = [
  266. GRPCHTTP2Keys.path.rawValue: "/test/test",
  267. GRPCHTTP2Keys.scheme.rawValue: "http",
  268. GRPCHTTP2Keys.method.rawValue: "POST",
  269. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  270. GRPCHTTP2Keys.te.rawValue: "trailers",
  271. ]
  272. XCTAssertNoThrow(
  273. try channel.writeInbound(
  274. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  275. )
  276. )
  277. // Make sure we haven't sent back an error response, and that we read the initial metadata
  278. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  279. XCTAssertEqual(
  280. try channel.readInbound(as: RPCRequestPart.self),
  281. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  282. )
  283. // Write back server's initial metadata
  284. let headers: HPACKHeaders = [
  285. "some-custom-header": "some-custom-value"
  286. ]
  287. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  288. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  289. // Make sure we wrote back the initial metadata
  290. let writtenHeaders = try channel.assertReadHeadersOutbound()
  291. XCTAssertEqual(
  292. writtenHeaders.headers,
  293. [
  294. GRPCHTTP2Keys.status.rawValue: "200",
  295. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  296. "some-custom-header": "some-custom-value",
  297. ]
  298. )
  299. // Receive client's message
  300. var buffer = ByteBuffer()
  301. buffer.writeInteger(UInt8(0)) // not compressed
  302. buffer.writeInteger(UInt32(42)) // message length
  303. buffer.writeRepeatingByte(0, count: 42) // message
  304. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  305. XCTAssertThrowsError(
  306. ofType: RPCError.self,
  307. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  308. ) { error in
  309. XCTAssertEqual(error.code, .resourceExhausted)
  310. XCTAssertEqual(
  311. error.message,
  312. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  313. )
  314. }
  315. // Make sure we haven't sent a response back and that we didn't read the received message
  316. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  317. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  318. }
  319. func testClientEndsStream() throws {
  320. let channel = EmbeddedChannel()
  321. let handler = GRPCServerStreamHandler(
  322. scheme: .http,
  323. acceptedEncodings: [],
  324. maximumPayloadSize: 1,
  325. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  326. skipStateMachineAssertions: true
  327. )
  328. try channel.pipeline.syncOperations.addHandler(handler)
  329. // Receive client's initial metadata with end stream set
  330. let clientInitialMetadata: HPACKHeaders = [
  331. GRPCHTTP2Keys.path.rawValue: "/test/test",
  332. GRPCHTTP2Keys.scheme.rawValue: "http",
  333. GRPCHTTP2Keys.method.rawValue: "POST",
  334. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  335. GRPCHTTP2Keys.te.rawValue: "trailers",
  336. ]
  337. XCTAssertNoThrow(
  338. try channel.writeInbound(
  339. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  340. )
  341. )
  342. // Make sure we haven't sent back an error response, and that we read the initial metadata
  343. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  344. XCTAssertEqual(
  345. try channel.readInbound(as: RPCRequestPart.self),
  346. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  347. )
  348. // Write back server's initial metadata
  349. let headers: HPACKHeaders = [
  350. "some-custom-header": "some-custom-value"
  351. ]
  352. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  353. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  354. // Make sure we wrote back the initial metadata
  355. let writtenHeaders = try channel.assertReadHeadersOutbound()
  356. XCTAssertEqual(
  357. writtenHeaders.headers,
  358. [
  359. GRPCHTTP2Keys.status.rawValue: "200",
  360. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  361. "some-custom-header": "some-custom-value",
  362. ]
  363. )
  364. // We should throw if the client sends another message, since it's closed the stream already.
  365. var buffer = ByteBuffer()
  366. buffer.writeInteger(UInt8(0)) // not compressed
  367. buffer.writeInteger(UInt32(42)) // message length
  368. buffer.writeRepeatingByte(0, count: 42) // message
  369. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  370. XCTAssertThrowsError(
  371. ofType: RPCError.self,
  372. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  373. ) { error in
  374. XCTAssertEqual(error.code, .internalError)
  375. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  376. }
  377. }
  378. func testNormalFlow() throws {
  379. let channel = EmbeddedChannel()
  380. let handler = GRPCServerStreamHandler(
  381. scheme: .http,
  382. acceptedEncodings: [],
  383. maximumPayloadSize: 42,
  384. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  385. skipStateMachineAssertions: true
  386. )
  387. try channel.pipeline.syncOperations.addHandler(handler)
  388. // Receive client's initial metadata
  389. let clientInitialMetadata: HPACKHeaders = [
  390. GRPCHTTP2Keys.path.rawValue: "/test/test",
  391. GRPCHTTP2Keys.scheme.rawValue: "http",
  392. GRPCHTTP2Keys.method.rawValue: "POST",
  393. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  394. GRPCHTTP2Keys.te.rawValue: "trailers",
  395. ]
  396. XCTAssertNoThrow(
  397. try channel.writeInbound(
  398. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  399. )
  400. )
  401. // Make sure we haven't sent back an error response, and that we read the initial metadata
  402. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  403. XCTAssertEqual(
  404. try channel.readInbound(as: RPCRequestPart.self),
  405. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  406. )
  407. // Write back server's initial metadata
  408. let headers: HPACKHeaders = [
  409. "some-custom-header": "some-custom-value"
  410. ]
  411. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  412. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  413. // Make sure we wrote back the initial metadata
  414. let writtenHeaders = try channel.assertReadHeadersOutbound()
  415. XCTAssertEqual(
  416. writtenHeaders.headers,
  417. [
  418. GRPCHTTP2Keys.status.rawValue: "200",
  419. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  420. "some-custom-header": "some-custom-value",
  421. ]
  422. )
  423. // Receive client's message
  424. var buffer = ByteBuffer()
  425. buffer.writeInteger(UInt8(0)) // not compressed
  426. buffer.writeInteger(UInt32(42)) // message length
  427. buffer.writeRepeatingByte(0, count: 42) // message
  428. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  429. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  430. // Make sure we haven't sent back an error response, and that we read the message properly
  431. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  432. XCTAssertEqual(
  433. try channel.readInbound(as: RPCRequestPart.self),
  434. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  435. )
  436. // Write back response
  437. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  438. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  439. // Make sure we wrote back the right message
  440. let writtenMessage = try channel.assertReadDataOutbound()
  441. var expectedBuffer = ByteBuffer()
  442. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  443. expectedBuffer.writeInteger(UInt32(42)) // message length
  444. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  445. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  446. // Send back status to end RPC
  447. let trailers = RPCResponsePart.status(
  448. .init(code: .dataLoss, message: "Test data loss"),
  449. ["custom-header": "custom-value"]
  450. )
  451. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  452. // Make sure we wrote back the status and trailers
  453. let writtenStatus = try channel.assertReadHeadersOutbound()
  454. XCTAssertTrue(writtenStatus.endStream)
  455. XCTAssertEqual(
  456. writtenStatus.headers,
  457. [
  458. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  459. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  460. "custom-header": "custom-value",
  461. ]
  462. )
  463. // Try writing and assert it throws to make sure we don't allow writes
  464. // after closing.
  465. XCTAssertThrowsError(
  466. ofType: RPCError.self,
  467. try channel.writeOutbound(trailers)
  468. ) { error in
  469. XCTAssertEqual(error.code, .internalError)
  470. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  471. }
  472. }
  473. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  474. let channel = EmbeddedChannel()
  475. let handler = GRPCServerStreamHandler(
  476. scheme: .http,
  477. acceptedEncodings: [],
  478. maximumPayloadSize: 100,
  479. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  480. )
  481. try channel.pipeline.syncOperations.addHandler(handler)
  482. // Receive client's initial metadata
  483. let clientInitialMetadata: HPACKHeaders = [
  484. GRPCHTTP2Keys.path.rawValue: "/test/test",
  485. GRPCHTTP2Keys.scheme.rawValue: "http",
  486. GRPCHTTP2Keys.method.rawValue: "POST",
  487. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  488. GRPCHTTP2Keys.te.rawValue: "trailers",
  489. ]
  490. XCTAssertNoThrow(
  491. try channel.writeInbound(
  492. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  493. )
  494. )
  495. // Make sure we haven't sent back an error response, and that we read the initial metadata
  496. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  497. XCTAssertEqual(
  498. try channel.readInbound(as: RPCRequestPart.self),
  499. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  500. )
  501. // Write back server's initial metadata
  502. let headers: HPACKHeaders = [
  503. "some-custom-header": "some-custom-value"
  504. ]
  505. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  506. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  507. // Make sure we wrote back the initial metadata
  508. let writtenHeaders = try channel.assertReadHeadersOutbound()
  509. XCTAssertEqual(
  510. writtenHeaders.headers,
  511. [
  512. GRPCHTTP2Keys.status.rawValue: "200",
  513. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  514. "some-custom-header": "some-custom-value",
  515. ]
  516. )
  517. // Receive client's first message
  518. var buffer = ByteBuffer()
  519. buffer.writeInteger(UInt8(0)) // not compressed
  520. XCTAssertNoThrow(
  521. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  522. )
  523. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  524. buffer.clear()
  525. buffer.writeInteger(UInt32(30)) // message length
  526. XCTAssertNoThrow(
  527. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  528. )
  529. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  530. buffer.clear()
  531. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  532. XCTAssertNoThrow(
  533. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  534. )
  535. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  536. buffer.clear()
  537. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  538. XCTAssertNoThrow(
  539. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  540. )
  541. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  542. buffer.clear()
  543. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  544. XCTAssertNoThrow(
  545. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  546. )
  547. // Make sure we haven't sent back an error response, and that we read the message properly
  548. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  549. XCTAssertEqual(
  550. try channel.readInbound(as: RPCRequestPart.self),
  551. RPCRequestPart.message(
  552. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  553. + [UInt8](repeating: 2, count: 10)
  554. )
  555. )
  556. }
  557. func testReceiveMultipleHeaders() throws {
  558. let channel = EmbeddedChannel()
  559. let handler = GRPCServerStreamHandler(
  560. scheme: .http,
  561. acceptedEncodings: [],
  562. maximumPayloadSize: 100,
  563. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  564. )
  565. try channel.pipeline.syncOperations.addHandler(handler)
  566. // Receive client's initial metadata
  567. let clientInitialMetadata: HPACKHeaders = [
  568. GRPCHTTP2Keys.path.rawValue: "/test/test",
  569. GRPCHTTP2Keys.scheme.rawValue: "http",
  570. GRPCHTTP2Keys.method.rawValue: "POST",
  571. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  572. GRPCHTTP2Keys.te.rawValue: "trailers",
  573. ]
  574. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  575. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  576. // Receive them again. Should be a protocol violation.
  577. XCTAssertThrowsError(
  578. ofType: RPCError.self,
  579. try channel.writeInbound(
  580. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  581. )
  582. ) { error in
  583. XCTAssertEqual(error.code, .unavailable)
  584. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  585. }
  586. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  587. switch payload {
  588. case .rstStream(let errorCode):
  589. XCTAssertEqual(errorCode, .protocolError)
  590. default:
  591. XCTFail("Expected RST_STREAM, got \(payload)")
  592. }
  593. }
  594. func testSendMultipleMessagesInSingleBuffer() throws {
  595. let channel = EmbeddedChannel()
  596. let handler = GRPCServerStreamHandler(
  597. scheme: .http,
  598. acceptedEncodings: [],
  599. maximumPayloadSize: 100,
  600. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  601. )
  602. try channel.pipeline.syncOperations.addHandler(handler)
  603. // Receive client's initial metadata
  604. let clientInitialMetadata: HPACKHeaders = [
  605. GRPCHTTP2Keys.path.rawValue: "/test/test",
  606. GRPCHTTP2Keys.scheme.rawValue: "http",
  607. GRPCHTTP2Keys.method.rawValue: "POST",
  608. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  609. GRPCHTTP2Keys.te.rawValue: "trailers",
  610. ]
  611. XCTAssertNoThrow(
  612. try channel.writeInbound(
  613. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  614. )
  615. )
  616. // Make sure we haven't sent back an error response, and that we read the initial metadata
  617. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  618. XCTAssertEqual(
  619. try channel.readInbound(as: RPCRequestPart.self),
  620. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  621. )
  622. // Write back server's initial metadata
  623. let headers: HPACKHeaders = [
  624. "some-custom-header": "some-custom-value"
  625. ]
  626. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  627. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  628. // Read out the metadata
  629. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  630. // This is where this test actually begins. We want to write two messages
  631. // without flushing, and make sure that no messages are sent down the pipeline
  632. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  633. // Write back first message and make sure nothing's written in the channel.
  634. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  635. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  636. // Write back second message and make sure nothing's written in the channel.
  637. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  638. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  639. // Now flush and check we *do* write the data.
  640. channel.flush()
  641. let writtenMessage = try channel.assertReadDataOutbound()
  642. // Make sure both messages have been framed together in the ByteBuffer.
  643. XCTAssertEqual(
  644. writtenMessage.data,
  645. .byteBuffer(
  646. .init(bytes: [
  647. // First message
  648. 0, // Compression disabled
  649. 0, 0, 0, 4, // Message length
  650. 1, 1, 1, 1, // First message data
  651. // Second message
  652. 0, // Compression disabled
  653. 0, 0, 0, 4, // Message length
  654. 2, 2, 2, 2, // Second message data
  655. ])
  656. )
  657. )
  658. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  659. }
  660. func testMessageAndStatusAreNotReordered() throws {
  661. let channel = EmbeddedChannel()
  662. let handler = GRPCServerStreamHandler(
  663. scheme: .http,
  664. acceptedEncodings: [],
  665. maximumPayloadSize: 100,
  666. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  667. )
  668. try channel.pipeline.syncOperations.addHandler(handler)
  669. // Receive client's initial metadata
  670. let clientInitialMetadata: HPACKHeaders = [
  671. GRPCHTTP2Keys.path.rawValue: "/test/test",
  672. GRPCHTTP2Keys.scheme.rawValue: "http",
  673. GRPCHTTP2Keys.method.rawValue: "POST",
  674. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  675. GRPCHTTP2Keys.te.rawValue: "trailers",
  676. ]
  677. XCTAssertNoThrow(
  678. try channel.writeInbound(
  679. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  680. )
  681. )
  682. // Make sure we haven't sent back an error response, and that we read the initial metadata
  683. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  684. XCTAssertEqual(
  685. try channel.readInbound(as: RPCRequestPart.self),
  686. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  687. )
  688. // Write back server's initial metadata
  689. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  690. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  691. // Read out the metadata
  692. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  693. // This is where this test actually begins. We want to write a message followed
  694. // by status and trailers, and only flush after both writes.
  695. // Because messages are buffered and potentially bundled together in a single
  696. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  697. // and trailers won't be written before the messages.
  698. // Write back message and make sure nothing's written in the channel.
  699. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  700. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  701. // Write status + metadata and make sure nothing's written.
  702. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  703. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  704. // Now flush and check we *do* write the data in the right order: message first,
  705. // trailers second.
  706. channel.flush()
  707. let writtenMessage = try channel.assertReadDataOutbound()
  708. // Make sure we first get message.
  709. XCTAssertEqual(
  710. writtenMessage.data,
  711. .byteBuffer(
  712. .init(bytes: [
  713. // First message
  714. 0, // Compression disabled
  715. 0, 0, 0, 4, // Message length
  716. 1, 1, 1, 1, // First message data
  717. ])
  718. )
  719. )
  720. XCTAssertFalse(writtenMessage.endStream)
  721. // Make sure we get trailers.
  722. let writtenTrailers = try channel.assertReadHeadersOutbound()
  723. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  724. XCTAssertTrue(writtenTrailers.endStream)
  725. // Make sure we get nothing else.
  726. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  727. }
  728. func testMethodDescriptorPromiseSucceeds() throws {
  729. let channel = EmbeddedChannel()
  730. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  731. let handler = GRPCServerStreamHandler(
  732. scheme: .http,
  733. acceptedEncodings: [],
  734. maximumPayloadSize: 100,
  735. methodDescriptorPromise: promise,
  736. skipStateMachineAssertions: true
  737. )
  738. try channel.pipeline.syncOperations.addHandler(handler)
  739. // Receive client's initial metadata
  740. let clientInitialMetadata: HPACKHeaders = [
  741. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  742. GRPCHTTP2Keys.scheme.rawValue: "http",
  743. GRPCHTTP2Keys.method.rawValue: "POST",
  744. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  745. GRPCHTTP2Keys.te.rawValue: "trailers",
  746. ]
  747. XCTAssertNoThrow(
  748. try channel.writeInbound(
  749. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  750. )
  751. )
  752. // Make sure we haven't sent back an error response, and that we read the initial metadata
  753. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  754. XCTAssertEqual(
  755. try channel.readInbound(as: RPCRequestPart.self),
  756. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  757. )
  758. XCTAssertEqual(
  759. try promise.futureResult.wait(),
  760. MethodDescriptor(service: "SomeService", method: "SomeMethod")
  761. )
  762. }
  763. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  764. let channel = EmbeddedChannel()
  765. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  766. let handler = GRPCServerStreamHandler(
  767. scheme: .http,
  768. acceptedEncodings: [],
  769. maximumPayloadSize: 100,
  770. methodDescriptorPromise: promise,
  771. skipStateMachineAssertions: true
  772. )
  773. try channel.pipeline.syncOperations.addHandler(handler)
  774. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  775. XCTAssertThrowsError(
  776. ofType: RPCError.self,
  777. try promise.futureResult.wait()
  778. ) { error in
  779. XCTAssertEqual(error.code, .unavailable)
  780. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  781. }
  782. }
  783. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  784. let channel = EmbeddedChannel()
  785. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  786. let handler = GRPCServerStreamHandler(
  787. scheme: .http,
  788. acceptedEncodings: [],
  789. maximumPayloadSize: 100,
  790. methodDescriptorPromise: promise,
  791. skipStateMachineAssertions: true
  792. )
  793. try channel.pipeline.syncOperations.addHandler(handler)
  794. // Receive client's initial metadata
  795. let clientInitialMetadata: HPACKHeaders = [
  796. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  797. GRPCHTTP2Keys.scheme.rawValue: "http",
  798. GRPCHTTP2Keys.method.rawValue: "POST",
  799. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  800. GRPCHTTP2Keys.te.rawValue: "trailers",
  801. ]
  802. XCTAssertNoThrow(
  803. try channel.writeInbound(
  804. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  805. )
  806. )
  807. XCTAssertThrowsError(
  808. ofType: RPCError.self,
  809. try promise.futureResult.wait()
  810. ) { error in
  811. XCTAssertEqual(error.code, .unavailable)
  812. XCTAssertEqual(error.message, "RPC was rejected.")
  813. }
  814. }
  815. func testUnexpectedStreamClose_ErrorFired() throws {
  816. let channel = EmbeddedChannel()
  817. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  818. let handler = GRPCServerStreamHandler(
  819. scheme: .http,
  820. acceptedEncodings: [],
  821. maximumPayloadSize: 100,
  822. methodDescriptorPromise: promise,
  823. skipStateMachineAssertions: true
  824. )
  825. try channel.pipeline.syncOperations.addHandler(handler)
  826. // Receive client's initial metadata
  827. let clientInitialMetadata: HPACKHeaders = [
  828. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  829. GRPCHTTP2Keys.scheme.rawValue: "http",
  830. GRPCHTTP2Keys.method.rawValue: "POST",
  831. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  832. GRPCHTTP2Keys.te.rawValue: "trailers",
  833. ]
  834. XCTAssertNoThrow(
  835. try channel.writeInbound(
  836. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  837. )
  838. )
  839. // Make sure we haven't sent back an error response, and that we read the initial metadata
  840. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  841. XCTAssertEqual(
  842. try channel.readInbound(as: RPCRequestPart.self),
  843. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  844. )
  845. // An error is fired down the pipeline
  846. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  847. channel.pipeline.fireErrorCaught(thrownError)
  848. // The server handler simply forwards the error.
  849. XCTAssertThrowsError(
  850. ofType: type(of: thrownError),
  851. try channel.throwIfErrorCaught()
  852. ) { error in
  853. XCTAssertEqual(error, thrownError)
  854. }
  855. // We should now be closed: check we can't write anymore.
  856. XCTAssertThrowsError(
  857. ofType: RPCError.self,
  858. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  859. ) { error in
  860. XCTAssertEqual(error.code, .internalError)
  861. XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
  862. }
  863. }
  864. func testUnexpectedStreamClose_ChannelInactive() throws {
  865. let channel = EmbeddedChannel()
  866. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  867. let handler = GRPCServerStreamHandler(
  868. scheme: .http,
  869. acceptedEncodings: [],
  870. maximumPayloadSize: 100,
  871. methodDescriptorPromise: promise,
  872. skipStateMachineAssertions: true
  873. )
  874. try channel.pipeline.syncOperations.addHandler(handler)
  875. // Receive client's initial metadata
  876. let clientInitialMetadata: HPACKHeaders = [
  877. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  878. GRPCHTTP2Keys.scheme.rawValue: "http",
  879. GRPCHTTP2Keys.method.rawValue: "POST",
  880. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  881. GRPCHTTP2Keys.te.rawValue: "trailers",
  882. ]
  883. XCTAssertNoThrow(
  884. try channel.writeInbound(
  885. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  886. )
  887. )
  888. // Make sure we haven't sent back an error response, and that we read the initial metadata
  889. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  890. XCTAssertEqual(
  891. try channel.readInbound(as: RPCRequestPart.self),
  892. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  893. )
  894. // Channel becomes inactive
  895. channel.pipeline.fireChannelInactive()
  896. // The server handler fires an error
  897. XCTAssertThrowsError(
  898. ofType: RPCError.self,
  899. try channel.throwIfErrorCaught()
  900. ) { error in
  901. XCTAssertEqual(error.code, .unavailable)
  902. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  903. }
  904. // We should now be closed: check we can't write anymore.
  905. XCTAssertThrowsError(
  906. ofType: RPCError.self,
  907. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  908. ) { error in
  909. XCTAssertEqual(error.code, .internalError)
  910. XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
  911. }
  912. }
  913. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  914. let channel = EmbeddedChannel()
  915. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  916. let handler = GRPCServerStreamHandler(
  917. scheme: .http,
  918. acceptedEncodings: [],
  919. maximumPayloadSize: 100,
  920. methodDescriptorPromise: promise,
  921. skipStateMachineAssertions: true
  922. )
  923. try channel.pipeline.syncOperations.addHandler(handler)
  924. // Receive client's initial metadata
  925. let clientInitialMetadata: HPACKHeaders = [
  926. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  927. GRPCHTTP2Keys.scheme.rawValue: "http",
  928. GRPCHTTP2Keys.method.rawValue: "POST",
  929. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  930. GRPCHTTP2Keys.te.rawValue: "trailers",
  931. ]
  932. XCTAssertNoThrow(
  933. try channel.writeInbound(
  934. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  935. )
  936. )
  937. // Make sure we haven't sent back an error response, and that we read the initial metadata
  938. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  939. XCTAssertEqual(
  940. try channel.readInbound(as: RPCRequestPart.self),
  941. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  942. )
  943. // We receive RST_STREAM frame
  944. // Assert the server handler fires an error
  945. XCTAssertThrowsError(
  946. ofType: RPCError.self,
  947. try channel.writeInbound(
  948. HTTP2Frame.FramePayload.rstStream(.internalError)
  949. )
  950. ) { error in
  951. XCTAssertEqual(error.code, .unavailable)
  952. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  953. }
  954. // We should now be closed: check we can't write anymore.
  955. XCTAssertThrowsError(
  956. ofType: RPCError.self,
  957. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  958. ) { error in
  959. XCTAssertEqual(error.code, .internalError)
  960. XCTAssertEqual(error.message, "Server cannot send metadata if closed.")
  961. }
  962. }
  963. }
  964. extension EmbeddedChannel {
  965. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  966. guard
  967. case .headers(let writtenHeaders) = try XCTUnwrap(
  968. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  969. )
  970. else {
  971. throw TestError.assertionFailure("Expected to write headers")
  972. }
  973. return writtenHeaders
  974. }
  975. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  976. guard
  977. case .data(let writtenMessage) = try XCTUnwrap(
  978. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  979. )
  980. else {
  981. throw TestError.assertionFailure("Expected to write data")
  982. }
  983. return writtenMessage
  984. }
  985. }
  986. private enum TestError: Error {
  987. case assertionFailure(String)
  988. }