GRPCServerStreamHandlerTests.swift 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let channel = EmbeddedChannel()
  27. let handler = GRPCServerStreamHandler(
  28. scheme: .http,
  29. acceptedEncodings: [],
  30. maximumPayloadSize: 1,
  31. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  32. )
  33. try channel.pipeline.syncOperations.addHandler(handler)
  34. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  35. .ping(.init(), ack: false),
  36. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  37. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  38. // .priority(
  39. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  40. // ),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  53. let channel = EmbeddedChannel()
  54. let handler = GRPCServerStreamHandler(
  55. scheme: .http,
  56. acceptedEncodings: [],
  57. maximumPayloadSize: 1,
  58. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  59. )
  60. try channel.pipeline.syncOperations.addHandler(handler)
  61. // Receive client's initial metadata without content-type
  62. let clientInitialMetadata: HPACKHeaders = [
  63. GRPCHTTP2Keys.path.rawValue: "/test/test",
  64. GRPCHTTP2Keys.scheme.rawValue: "http",
  65. GRPCHTTP2Keys.method.rawValue: "POST",
  66. GRPCHTTP2Keys.te.rawValue: "trailers",
  67. ]
  68. XCTAssertNoThrow(
  69. try channel.writeInbound(
  70. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  71. )
  72. )
  73. // Make sure we have sent a trailers-only response
  74. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  75. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  76. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  77. }
  78. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  79. let channel = EmbeddedChannel()
  80. let handler = GRPCServerStreamHandler(
  81. scheme: .http,
  82. acceptedEncodings: [],
  83. maximumPayloadSize: 1,
  84. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  85. )
  86. try channel.pipeline.syncOperations.addHandler(handler)
  87. // Receive client's initial metadata without :method
  88. let clientInitialMetadata: HPACKHeaders = [
  89. GRPCHTTP2Keys.path.rawValue: "/test/test",
  90. GRPCHTTP2Keys.scheme.rawValue: "http",
  91. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  92. GRPCHTTP2Keys.te.rawValue: "trailers",
  93. ]
  94. XCTAssertNoThrow(
  95. try channel.writeInbound(
  96. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  97. )
  98. )
  99. // Make sure we have sent a trailers-only response
  100. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  101. XCTAssertEqual(
  102. writtenTrailersOnlyResponse.headers,
  103. [
  104. GRPCHTTP2Keys.status.rawValue: "200",
  105. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  106. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  107. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  108. ":method header is expected to be present and have a value of \"POST\".",
  109. ]
  110. )
  111. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  112. }
  113. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  114. let channel = EmbeddedChannel()
  115. let handler = GRPCServerStreamHandler(
  116. scheme: .http,
  117. acceptedEncodings: [],
  118. maximumPayloadSize: 1,
  119. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  120. )
  121. try channel.pipeline.syncOperations.addHandler(handler)
  122. // Receive client's initial metadata without :scheme
  123. let clientInitialMetadata: HPACKHeaders = [
  124. GRPCHTTP2Keys.path.rawValue: "/test/test",
  125. GRPCHTTP2Keys.method.rawValue: "POST",
  126. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  127. GRPCHTTP2Keys.te.rawValue: "trailers",
  128. ]
  129. XCTAssertNoThrow(
  130. try channel.writeInbound(
  131. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  132. )
  133. )
  134. // Make sure we have sent a trailers-only response
  135. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  136. XCTAssertEqual(
  137. writtenTrailersOnlyResponse.headers,
  138. [
  139. GRPCHTTP2Keys.status.rawValue: "200",
  140. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  141. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  142. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  143. ":scheme header must be present and one of \"http\" or \"https\".",
  144. ]
  145. )
  146. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  147. }
  148. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  149. let channel = EmbeddedChannel()
  150. let handler = GRPCServerStreamHandler(
  151. scheme: .http,
  152. acceptedEncodings: [],
  153. maximumPayloadSize: 1,
  154. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  155. )
  156. try channel.pipeline.syncOperations.addHandler(handler)
  157. // Receive client's initial metadata without :path
  158. let clientInitialMetadata: HPACKHeaders = [
  159. GRPCHTTP2Keys.scheme.rawValue: "http",
  160. GRPCHTTP2Keys.method.rawValue: "POST",
  161. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  162. GRPCHTTP2Keys.te.rawValue: "trailers",
  163. ]
  164. XCTAssertNoThrow(
  165. try channel.writeInbound(
  166. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  167. )
  168. )
  169. // Make sure we have sent a trailers-only response
  170. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  171. XCTAssertEqual(
  172. writtenTrailersOnlyResponse.headers,
  173. [
  174. GRPCHTTP2Keys.status.rawValue: "200",
  175. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  176. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  177. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  178. ]
  179. )
  180. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  181. }
  182. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  183. let channel = EmbeddedChannel()
  184. let handler = GRPCServerStreamHandler(
  185. scheme: .http,
  186. acceptedEncodings: [],
  187. maximumPayloadSize: 100,
  188. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  189. )
  190. try channel.pipeline.syncOperations.addHandler(handler)
  191. // Receive client's initial metadata
  192. let clientInitialMetadata: HPACKHeaders = [
  193. GRPCHTTP2Keys.path.rawValue: "/test/test",
  194. GRPCHTTP2Keys.scheme.rawValue: "http",
  195. GRPCHTTP2Keys.method.rawValue: "POST",
  196. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  197. GRPCHTTP2Keys.te.rawValue: "trailers",
  198. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  199. ]
  200. XCTAssertNoThrow(
  201. try channel.writeInbound(
  202. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  203. )
  204. )
  205. // Make sure we have sent a trailers-only response
  206. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  207. XCTAssertEqual(
  208. writtenTrailersOnlyResponse.headers,
  209. [
  210. GRPCHTTP2Keys.status.rawValue: "200",
  211. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  212. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  213. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  214. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  215. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  216. ]
  217. )
  218. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  219. }
  220. func testOverMaximumPayloadSize() throws {
  221. let channel = EmbeddedChannel()
  222. let handler = GRPCServerStreamHandler(
  223. scheme: .http,
  224. acceptedEncodings: [],
  225. maximumPayloadSize: 1,
  226. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  227. )
  228. try channel.pipeline.syncOperations.addHandler(handler)
  229. // Receive client's initial metadata
  230. let clientInitialMetadata: HPACKHeaders = [
  231. GRPCHTTP2Keys.path.rawValue: "/test/test",
  232. GRPCHTTP2Keys.scheme.rawValue: "http",
  233. GRPCHTTP2Keys.method.rawValue: "POST",
  234. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  235. GRPCHTTP2Keys.te.rawValue: "trailers",
  236. ]
  237. XCTAssertNoThrow(
  238. try channel.writeInbound(
  239. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  240. )
  241. )
  242. // Make sure we haven't sent back an error response, and that we read the initial metadata
  243. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  244. XCTAssertEqual(
  245. try channel.readInbound(as: RPCRequestPart.self),
  246. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  247. )
  248. // Write back server's initial metadata
  249. let headers: HPACKHeaders = [
  250. "some-custom-header": "some-custom-value"
  251. ]
  252. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  253. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  254. // Make sure we wrote back the initial metadata
  255. let writtenHeaders = try channel.assertReadHeadersOutbound()
  256. XCTAssertEqual(
  257. writtenHeaders.headers,
  258. [
  259. GRPCHTTP2Keys.status.rawValue: "200",
  260. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  261. "some-custom-header": "some-custom-value",
  262. ]
  263. )
  264. // Receive client's message
  265. var buffer = ByteBuffer()
  266. buffer.writeInteger(UInt8(0)) // not compressed
  267. buffer.writeInteger(UInt32(42)) // message length
  268. buffer.writeRepeatingByte(0, count: 42) // message
  269. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  270. XCTAssertThrowsError(
  271. ofType: RPCError.self,
  272. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  273. ) { error in
  274. XCTAssertEqual(error.code, .internalError)
  275. XCTAssertEqual(error.message, "Failed to decode message")
  276. }
  277. // Make sure we haven't sent a response back and that we didn't read the received message
  278. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  279. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  280. }
  281. func testClientEndsStream() throws {
  282. let channel = EmbeddedChannel()
  283. let handler = GRPCServerStreamHandler(
  284. scheme: .http,
  285. acceptedEncodings: [],
  286. maximumPayloadSize: 1,
  287. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  288. skipStateMachineAssertions: true
  289. )
  290. try channel.pipeline.syncOperations.addHandler(handler)
  291. // Receive client's initial metadata with end stream set
  292. let clientInitialMetadata: HPACKHeaders = [
  293. GRPCHTTP2Keys.path.rawValue: "/test/test",
  294. GRPCHTTP2Keys.scheme.rawValue: "http",
  295. GRPCHTTP2Keys.method.rawValue: "POST",
  296. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  297. GRPCHTTP2Keys.te.rawValue: "trailers",
  298. ]
  299. XCTAssertNoThrow(
  300. try channel.writeInbound(
  301. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  302. )
  303. )
  304. // Make sure we haven't sent back an error response, and that we read the initial metadata
  305. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  306. XCTAssertEqual(
  307. try channel.readInbound(as: RPCRequestPart.self),
  308. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  309. )
  310. // Write back server's initial metadata
  311. let headers: HPACKHeaders = [
  312. "some-custom-header": "some-custom-value"
  313. ]
  314. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  315. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  316. // Make sure we wrote back the initial metadata
  317. let writtenHeaders = try channel.assertReadHeadersOutbound()
  318. XCTAssertEqual(
  319. writtenHeaders.headers,
  320. [
  321. GRPCHTTP2Keys.status.rawValue: "200",
  322. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  323. "some-custom-header": "some-custom-value",
  324. ]
  325. )
  326. // We should throw if the client sends another message, since it's closed the stream already.
  327. var buffer = ByteBuffer()
  328. buffer.writeInteger(UInt8(0)) // not compressed
  329. buffer.writeInteger(UInt32(42)) // message length
  330. buffer.writeRepeatingByte(0, count: 42) // message
  331. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  332. XCTAssertThrowsError(
  333. ofType: RPCError.self,
  334. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  335. ) { error in
  336. XCTAssertEqual(error.code, .internalError)
  337. XCTAssertEqual(error.message, "Invalid state")
  338. }
  339. }
  340. func testNormalFlow() throws {
  341. let channel = EmbeddedChannel()
  342. let handler = GRPCServerStreamHandler(
  343. scheme: .http,
  344. acceptedEncodings: [],
  345. maximumPayloadSize: 42,
  346. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  347. skipStateMachineAssertions: true
  348. )
  349. try channel.pipeline.syncOperations.addHandler(handler)
  350. // Receive client's initial metadata
  351. let clientInitialMetadata: HPACKHeaders = [
  352. GRPCHTTP2Keys.path.rawValue: "/test/test",
  353. GRPCHTTP2Keys.scheme.rawValue: "http",
  354. GRPCHTTP2Keys.method.rawValue: "POST",
  355. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  356. GRPCHTTP2Keys.te.rawValue: "trailers",
  357. ]
  358. XCTAssertNoThrow(
  359. try channel.writeInbound(
  360. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  361. )
  362. )
  363. // Make sure we haven't sent back an error response, and that we read the initial metadata
  364. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  365. XCTAssertEqual(
  366. try channel.readInbound(as: RPCRequestPart.self),
  367. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  368. )
  369. // Write back server's initial metadata
  370. let headers: HPACKHeaders = [
  371. "some-custom-header": "some-custom-value"
  372. ]
  373. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  374. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  375. // Make sure we wrote back the initial metadata
  376. let writtenHeaders = try channel.assertReadHeadersOutbound()
  377. XCTAssertEqual(
  378. writtenHeaders.headers,
  379. [
  380. GRPCHTTP2Keys.status.rawValue: "200",
  381. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  382. "some-custom-header": "some-custom-value",
  383. ]
  384. )
  385. // Receive client's message
  386. var buffer = ByteBuffer()
  387. buffer.writeInteger(UInt8(0)) // not compressed
  388. buffer.writeInteger(UInt32(42)) // message length
  389. buffer.writeRepeatingByte(0, count: 42) // message
  390. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  391. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  392. // Make sure we haven't sent back an error response, and that we read the message properly
  393. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  394. XCTAssertEqual(
  395. try channel.readInbound(as: RPCRequestPart.self),
  396. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  397. )
  398. // Write back response
  399. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  400. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  401. // Make sure we wrote back the right message
  402. let writtenMessage = try channel.assertReadDataOutbound()
  403. var expectedBuffer = ByteBuffer()
  404. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  405. expectedBuffer.writeInteger(UInt32(42)) // message length
  406. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  407. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  408. // Send back status to end RPC
  409. let trailers = RPCResponsePart.status(
  410. .init(code: .dataLoss, message: "Test data loss"),
  411. ["custom-header": "custom-value"]
  412. )
  413. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  414. // Make sure we wrote back the status and trailers
  415. let writtenStatus = try channel.assertReadHeadersOutbound()
  416. XCTAssertTrue(writtenStatus.endStream)
  417. XCTAssertEqual(
  418. writtenStatus.headers,
  419. [
  420. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  421. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  422. "custom-header": "custom-value",
  423. ]
  424. )
  425. // Try writing and assert it throws to make sure we don't allow writes
  426. // after closing.
  427. XCTAssertThrowsError(
  428. ofType: RPCError.self,
  429. try channel.writeOutbound(trailers)
  430. ) { error in
  431. XCTAssertEqual(error.code, .internalError)
  432. XCTAssertEqual(error.message, "Invalid state")
  433. }
  434. }
  435. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  436. let channel = EmbeddedChannel()
  437. let handler = GRPCServerStreamHandler(
  438. scheme: .http,
  439. acceptedEncodings: [],
  440. maximumPayloadSize: 100,
  441. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  442. )
  443. try channel.pipeline.syncOperations.addHandler(handler)
  444. // Receive client's initial metadata
  445. let clientInitialMetadata: HPACKHeaders = [
  446. GRPCHTTP2Keys.path.rawValue: "/test/test",
  447. GRPCHTTP2Keys.scheme.rawValue: "http",
  448. GRPCHTTP2Keys.method.rawValue: "POST",
  449. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  450. GRPCHTTP2Keys.te.rawValue: "trailers",
  451. ]
  452. XCTAssertNoThrow(
  453. try channel.writeInbound(
  454. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  455. )
  456. )
  457. // Make sure we haven't sent back an error response, and that we read the initial metadata
  458. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  459. XCTAssertEqual(
  460. try channel.readInbound(as: RPCRequestPart.self),
  461. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  462. )
  463. // Write back server's initial metadata
  464. let headers: HPACKHeaders = [
  465. "some-custom-header": "some-custom-value"
  466. ]
  467. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  468. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  469. // Make sure we wrote back the initial metadata
  470. let writtenHeaders = try channel.assertReadHeadersOutbound()
  471. XCTAssertEqual(
  472. writtenHeaders.headers,
  473. [
  474. GRPCHTTP2Keys.status.rawValue: "200",
  475. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  476. "some-custom-header": "some-custom-value",
  477. ]
  478. )
  479. // Receive client's first message
  480. var buffer = ByteBuffer()
  481. buffer.writeInteger(UInt8(0)) // not compressed
  482. XCTAssertNoThrow(
  483. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  484. )
  485. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  486. buffer.clear()
  487. buffer.writeInteger(UInt32(30)) // message length
  488. XCTAssertNoThrow(
  489. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  490. )
  491. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  492. buffer.clear()
  493. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  494. XCTAssertNoThrow(
  495. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  496. )
  497. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  498. buffer.clear()
  499. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  500. XCTAssertNoThrow(
  501. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  502. )
  503. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  504. buffer.clear()
  505. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  506. XCTAssertNoThrow(
  507. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  508. )
  509. // Make sure we haven't sent back an error response, and that we read the message properly
  510. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  511. XCTAssertEqual(
  512. try channel.readInbound(as: RPCRequestPart.self),
  513. RPCRequestPart.message(
  514. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  515. + [UInt8](repeating: 2, count: 10)
  516. )
  517. )
  518. }
  519. func testReceiveMultipleHeaders() throws {
  520. let channel = EmbeddedChannel()
  521. let handler = GRPCServerStreamHandler(
  522. scheme: .http,
  523. acceptedEncodings: [],
  524. maximumPayloadSize: 100,
  525. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  526. )
  527. try channel.pipeline.syncOperations.addHandler(handler)
  528. // Receive client's initial metadata
  529. let clientInitialMetadata: HPACKHeaders = [
  530. GRPCHTTP2Keys.path.rawValue: "/test/test",
  531. GRPCHTTP2Keys.scheme.rawValue: "http",
  532. GRPCHTTP2Keys.method.rawValue: "POST",
  533. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  534. GRPCHTTP2Keys.te.rawValue: "trailers",
  535. ]
  536. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  537. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  538. // Receive them again. Should be a protocol violation.
  539. XCTAssertThrowsError(
  540. ofType: RPCError.self,
  541. try channel.writeInbound(
  542. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  543. )
  544. ) { error in
  545. XCTAssertEqual(error.code, .unavailable)
  546. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  547. }
  548. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  549. switch payload {
  550. case .rstStream(let errorCode):
  551. XCTAssertEqual(errorCode, .protocolError)
  552. default:
  553. XCTFail("Expected RST_STREAM, got \(payload)")
  554. }
  555. }
  556. func testSendMultipleMessagesInSingleBuffer() throws {
  557. let channel = EmbeddedChannel()
  558. let handler = GRPCServerStreamHandler(
  559. scheme: .http,
  560. acceptedEncodings: [],
  561. maximumPayloadSize: 100,
  562. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  563. )
  564. try channel.pipeline.syncOperations.addHandler(handler)
  565. // Receive client's initial metadata
  566. let clientInitialMetadata: HPACKHeaders = [
  567. GRPCHTTP2Keys.path.rawValue: "/test/test",
  568. GRPCHTTP2Keys.scheme.rawValue: "http",
  569. GRPCHTTP2Keys.method.rawValue: "POST",
  570. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  571. GRPCHTTP2Keys.te.rawValue: "trailers",
  572. ]
  573. XCTAssertNoThrow(
  574. try channel.writeInbound(
  575. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  576. )
  577. )
  578. // Make sure we haven't sent back an error response, and that we read the initial metadata
  579. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  580. XCTAssertEqual(
  581. try channel.readInbound(as: RPCRequestPart.self),
  582. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  583. )
  584. // Write back server's initial metadata
  585. let headers: HPACKHeaders = [
  586. "some-custom-header": "some-custom-value"
  587. ]
  588. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  589. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  590. // Read out the metadata
  591. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  592. // This is where this test actually begins. We want to write two messages
  593. // without flushing, and make sure that no messages are sent down the pipeline
  594. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  595. // Write back first message and make sure nothing's written in the channel.
  596. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  597. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  598. // Write back second message and make sure nothing's written in the channel.
  599. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  600. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  601. // Now flush and check we *do* write the data.
  602. channel.flush()
  603. let writtenMessage = try channel.assertReadDataOutbound()
  604. // Make sure both messages have been framed together in the ByteBuffer.
  605. XCTAssertEqual(
  606. writtenMessage.data,
  607. .byteBuffer(
  608. .init(bytes: [
  609. // First message
  610. 0, // Compression disabled
  611. 0, 0, 0, 4, // Message length
  612. 1, 1, 1, 1, // First message data
  613. // Second message
  614. 0, // Compression disabled
  615. 0, 0, 0, 4, // Message length
  616. 2, 2, 2, 2, // Second message data
  617. ])
  618. )
  619. )
  620. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  621. }
  622. func testMessageAndStatusAreNotReordered() throws {
  623. let channel = EmbeddedChannel()
  624. let handler = GRPCServerStreamHandler(
  625. scheme: .http,
  626. acceptedEncodings: [],
  627. maximumPayloadSize: 100,
  628. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  629. )
  630. try channel.pipeline.syncOperations.addHandler(handler)
  631. // Receive client's initial metadata
  632. let clientInitialMetadata: HPACKHeaders = [
  633. GRPCHTTP2Keys.path.rawValue: "/test/test",
  634. GRPCHTTP2Keys.scheme.rawValue: "http",
  635. GRPCHTTP2Keys.method.rawValue: "POST",
  636. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  637. GRPCHTTP2Keys.te.rawValue: "trailers",
  638. ]
  639. XCTAssertNoThrow(
  640. try channel.writeInbound(
  641. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  642. )
  643. )
  644. // Make sure we haven't sent back an error response, and that we read the initial metadata
  645. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  646. XCTAssertEqual(
  647. try channel.readInbound(as: RPCRequestPart.self),
  648. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  649. )
  650. // Write back server's initial metadata
  651. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  652. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  653. // Read out the metadata
  654. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  655. // This is where this test actually begins. We want to write a message followed
  656. // by status and trailers, and only flush after both writes.
  657. // Because messages are buffered and potentially bundled together in a single
  658. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  659. // and trailers won't be written before the messages.
  660. // Write back message and make sure nothing's written in the channel.
  661. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  662. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  663. // Write status + metadata and make sure nothing's written.
  664. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  665. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  666. // Now flush and check we *do* write the data in the right order: message first,
  667. // trailers second.
  668. channel.flush()
  669. let writtenMessage = try channel.assertReadDataOutbound()
  670. // Make sure we first get message.
  671. XCTAssertEqual(
  672. writtenMessage.data,
  673. .byteBuffer(
  674. .init(bytes: [
  675. // First message
  676. 0, // Compression disabled
  677. 0, 0, 0, 4, // Message length
  678. 1, 1, 1, 1, // First message data
  679. ])
  680. )
  681. )
  682. XCTAssertFalse(writtenMessage.endStream)
  683. // Make sure we get trailers.
  684. let writtenTrailers = try channel.assertReadHeadersOutbound()
  685. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  686. XCTAssertTrue(writtenTrailers.endStream)
  687. // Make sure we get nothing else.
  688. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  689. }
  690. func testMethodDescriptorPromiseSucceeds() throws {
  691. let channel = EmbeddedChannel()
  692. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  693. let handler = GRPCServerStreamHandler(
  694. scheme: .http,
  695. acceptedEncodings: [],
  696. maximumPayloadSize: 100,
  697. methodDescriptorPromise: promise,
  698. skipStateMachineAssertions: true
  699. )
  700. try channel.pipeline.syncOperations.addHandler(handler)
  701. // Receive client's initial metadata
  702. let clientInitialMetadata: HPACKHeaders = [
  703. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  704. GRPCHTTP2Keys.scheme.rawValue: "http",
  705. GRPCHTTP2Keys.method.rawValue: "POST",
  706. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  707. GRPCHTTP2Keys.te.rawValue: "trailers",
  708. ]
  709. XCTAssertNoThrow(
  710. try channel.writeInbound(
  711. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  712. )
  713. )
  714. // Make sure we haven't sent back an error response, and that we read the initial metadata
  715. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  716. XCTAssertEqual(
  717. try channel.readInbound(as: RPCRequestPart.self),
  718. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  719. )
  720. XCTAssertEqual(
  721. try promise.futureResult.wait(),
  722. MethodDescriptor(service: "SomeService", method: "SomeMethod")
  723. )
  724. }
  725. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  726. let channel = EmbeddedChannel()
  727. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  728. let handler = GRPCServerStreamHandler(
  729. scheme: .http,
  730. acceptedEncodings: [],
  731. maximumPayloadSize: 100,
  732. methodDescriptorPromise: promise,
  733. skipStateMachineAssertions: true
  734. )
  735. try channel.pipeline.syncOperations.addHandler(handler)
  736. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  737. XCTAssertThrowsError(
  738. ofType: RPCError.self,
  739. try promise.futureResult.wait()
  740. ) { error in
  741. XCTAssertEqual(error.code, .unavailable)
  742. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  743. }
  744. }
  745. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  746. let channel = EmbeddedChannel()
  747. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  748. let handler = GRPCServerStreamHandler(
  749. scheme: .http,
  750. acceptedEncodings: [],
  751. maximumPayloadSize: 100,
  752. methodDescriptorPromise: promise,
  753. skipStateMachineAssertions: true
  754. )
  755. try channel.pipeline.syncOperations.addHandler(handler)
  756. // Receive client's initial metadata
  757. let clientInitialMetadata: HPACKHeaders = [
  758. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  759. GRPCHTTP2Keys.scheme.rawValue: "http",
  760. GRPCHTTP2Keys.method.rawValue: "POST",
  761. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  762. GRPCHTTP2Keys.te.rawValue: "trailers",
  763. ]
  764. XCTAssertNoThrow(
  765. try channel.writeInbound(
  766. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  767. )
  768. )
  769. XCTAssertThrowsError(
  770. ofType: RPCError.self,
  771. try promise.futureResult.wait()
  772. ) { error in
  773. XCTAssertEqual(error.code, .unavailable)
  774. XCTAssertEqual(error.message, "RPC was rejected.")
  775. }
  776. }
  777. func testUnexpectedStreamClose_ErrorFired() throws {
  778. let channel = EmbeddedChannel()
  779. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  780. let handler = GRPCServerStreamHandler(
  781. scheme: .http,
  782. acceptedEncodings: [],
  783. maximumPayloadSize: 100,
  784. methodDescriptorPromise: promise,
  785. skipStateMachineAssertions: true
  786. )
  787. try channel.pipeline.syncOperations.addHandler(handler)
  788. // Receive client's initial metadata
  789. let clientInitialMetadata: HPACKHeaders = [
  790. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  791. GRPCHTTP2Keys.scheme.rawValue: "http",
  792. GRPCHTTP2Keys.method.rawValue: "POST",
  793. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  794. GRPCHTTP2Keys.te.rawValue: "trailers",
  795. ]
  796. XCTAssertNoThrow(
  797. try channel.writeInbound(
  798. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  799. )
  800. )
  801. // Make sure we haven't sent back an error response, and that we read the initial metadata
  802. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  803. XCTAssertEqual(
  804. try channel.readInbound(as: RPCRequestPart.self),
  805. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  806. )
  807. // An error is fired down the pipeline
  808. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  809. channel.pipeline.fireErrorCaught(thrownError)
  810. // The server handler simply forwards the error.
  811. XCTAssertThrowsError(
  812. ofType: type(of: thrownError),
  813. try channel.throwIfErrorCaught()
  814. ) { error in
  815. XCTAssertEqual(error, thrownError)
  816. }
  817. // We should now be closed: check we can't write anymore.
  818. XCTAssertThrowsError(
  819. ofType: RPCError.self,
  820. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  821. ) { error in
  822. XCTAssertEqual(error.code, .internalError)
  823. XCTAssertEqual(error.message, "Invalid state")
  824. }
  825. }
  826. func testUnexpectedStreamClose_ChannelInactive() throws {
  827. let channel = EmbeddedChannel()
  828. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  829. let handler = GRPCServerStreamHandler(
  830. scheme: .http,
  831. acceptedEncodings: [],
  832. maximumPayloadSize: 100,
  833. methodDescriptorPromise: promise,
  834. skipStateMachineAssertions: true
  835. )
  836. try channel.pipeline.syncOperations.addHandler(handler)
  837. // Receive client's initial metadata
  838. let clientInitialMetadata: HPACKHeaders = [
  839. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  840. GRPCHTTP2Keys.scheme.rawValue: "http",
  841. GRPCHTTP2Keys.method.rawValue: "POST",
  842. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  843. GRPCHTTP2Keys.te.rawValue: "trailers",
  844. ]
  845. XCTAssertNoThrow(
  846. try channel.writeInbound(
  847. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  848. )
  849. )
  850. // Make sure we haven't sent back an error response, and that we read the initial metadata
  851. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  852. XCTAssertEqual(
  853. try channel.readInbound(as: RPCRequestPart.self),
  854. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  855. )
  856. // Channel becomes inactive
  857. channel.pipeline.fireChannelInactive()
  858. // The server handler fires an error
  859. XCTAssertThrowsError(
  860. ofType: RPCError.self,
  861. try channel.throwIfErrorCaught()
  862. ) { error in
  863. XCTAssertEqual(error.code, .unavailable)
  864. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  865. }
  866. // We should now be closed: check we can't write anymore.
  867. XCTAssertThrowsError(
  868. ofType: RPCError.self,
  869. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  870. ) { error in
  871. XCTAssertEqual(error.code, .internalError)
  872. XCTAssertEqual(error.message, "Invalid state")
  873. }
  874. }
  875. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  876. let channel = EmbeddedChannel()
  877. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  878. let handler = GRPCServerStreamHandler(
  879. scheme: .http,
  880. acceptedEncodings: [],
  881. maximumPayloadSize: 100,
  882. methodDescriptorPromise: promise,
  883. skipStateMachineAssertions: true
  884. )
  885. try channel.pipeline.syncOperations.addHandler(handler)
  886. // Receive client's initial metadata
  887. let clientInitialMetadata: HPACKHeaders = [
  888. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  889. GRPCHTTP2Keys.scheme.rawValue: "http",
  890. GRPCHTTP2Keys.method.rawValue: "POST",
  891. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  892. GRPCHTTP2Keys.te.rawValue: "trailers",
  893. ]
  894. XCTAssertNoThrow(
  895. try channel.writeInbound(
  896. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  897. )
  898. )
  899. // Make sure we haven't sent back an error response, and that we read the initial metadata
  900. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  901. XCTAssertEqual(
  902. try channel.readInbound(as: RPCRequestPart.self),
  903. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  904. )
  905. // We receive RST_STREAM frame
  906. // Assert the server handler fires an error
  907. XCTAssertThrowsError(
  908. ofType: RPCError.self,
  909. try channel.writeInbound(
  910. HTTP2Frame.FramePayload.rstStream(.internalError)
  911. )
  912. ) { error in
  913. XCTAssertEqual(error.code, .unavailable)
  914. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  915. }
  916. // We should now be closed: check we can't write anymore.
  917. XCTAssertThrowsError(
  918. ofType: RPCError.self,
  919. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  920. ) { error in
  921. XCTAssertEqual(error.code, .internalError)
  922. XCTAssertEqual(error.message, "Invalid state")
  923. }
  924. }
  925. }
  926. extension EmbeddedChannel {
  927. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  928. guard
  929. case .headers(let writtenHeaders) = try XCTUnwrap(
  930. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  931. )
  932. else {
  933. throw TestError.assertionFailure("Expected to write headers")
  934. }
  935. return writtenHeaders
  936. }
  937. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  938. guard
  939. case .data(let writtenMessage) = try XCTUnwrap(
  940. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  941. )
  942. else {
  943. throw TestError.assertionFailure("Expected to write data")
  944. }
  945. return writtenMessage
  946. }
  947. }
  948. private enum TestError: Error {
  949. case assertionFailure(String)
  950. }