2
0

GRPCServerStreamHandlerTests.swift 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let channel = EmbeddedChannel()
  27. let handler = GRPCServerStreamHandler(
  28. scheme: .http,
  29. acceptedEncodings: [],
  30. maximumPayloadSize: 1,
  31. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  32. )
  33. try channel.pipeline.syncOperations.addHandler(handler)
  34. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  35. .ping(.init(), ack: false),
  36. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  37. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  38. // .priority(
  39. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  40. // ),
  41. .settings(.ack),
  42. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  43. .windowUpdate(windowSizeIncrement: 4),
  44. .alternativeService(origin: nil, field: nil),
  45. .origin([]),
  46. ]
  47. for toBeIgnored in framesToBeIgnored {
  48. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  49. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  50. }
  51. }
  52. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  53. let channel = EmbeddedChannel()
  54. let handler = GRPCServerStreamHandler(
  55. scheme: .http,
  56. acceptedEncodings: [],
  57. maximumPayloadSize: 1,
  58. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  59. )
  60. try channel.pipeline.syncOperations.addHandler(handler)
  61. // Receive client's initial metadata without content-type
  62. let clientInitialMetadata: HPACKHeaders = [
  63. GRPCHTTP2Keys.path.rawValue: "/test/test",
  64. GRPCHTTP2Keys.scheme.rawValue: "http",
  65. GRPCHTTP2Keys.method.rawValue: "POST",
  66. GRPCHTTP2Keys.te.rawValue: "trailers",
  67. ]
  68. XCTAssertNoThrow(
  69. try channel.writeInbound(
  70. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  71. )
  72. )
  73. // Make sure we have sent a trailers-only response
  74. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  75. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  76. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  77. }
  78. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  79. let channel = EmbeddedChannel()
  80. let handler = GRPCServerStreamHandler(
  81. scheme: .http,
  82. acceptedEncodings: [],
  83. maximumPayloadSize: 1,
  84. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  85. )
  86. try channel.pipeline.syncOperations.addHandler(handler)
  87. // Receive client's initial metadata without :method
  88. let clientInitialMetadata: HPACKHeaders = [
  89. GRPCHTTP2Keys.path.rawValue: "/test/test",
  90. GRPCHTTP2Keys.scheme.rawValue: "http",
  91. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  92. GRPCHTTP2Keys.te.rawValue: "trailers",
  93. ]
  94. XCTAssertNoThrow(
  95. try channel.writeInbound(
  96. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  97. )
  98. )
  99. // Make sure we have sent a trailers-only response
  100. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  101. XCTAssertEqual(
  102. writtenTrailersOnlyResponse.headers,
  103. [
  104. GRPCHTTP2Keys.status.rawValue: "200",
  105. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  106. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  107. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  108. ":method header is expected to be present and have a value of \"POST\".",
  109. ]
  110. )
  111. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  112. }
  113. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  114. let channel = EmbeddedChannel()
  115. let handler = GRPCServerStreamHandler(
  116. scheme: .http,
  117. acceptedEncodings: [],
  118. maximumPayloadSize: 1,
  119. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  120. )
  121. try channel.pipeline.syncOperations.addHandler(handler)
  122. // Receive client's initial metadata without :scheme
  123. let clientInitialMetadata: HPACKHeaders = [
  124. GRPCHTTP2Keys.path.rawValue: "/test/test",
  125. GRPCHTTP2Keys.method.rawValue: "POST",
  126. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  127. GRPCHTTP2Keys.te.rawValue: "trailers",
  128. ]
  129. XCTAssertNoThrow(
  130. try channel.writeInbound(
  131. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  132. )
  133. )
  134. // Make sure we have sent a trailers-only response
  135. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  136. XCTAssertEqual(
  137. writtenTrailersOnlyResponse.headers,
  138. [
  139. GRPCHTTP2Keys.status.rawValue: "200",
  140. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  141. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  142. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  143. ":scheme header must be present and one of \"http\" or \"https\".",
  144. ]
  145. )
  146. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  147. }
  148. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  149. let channel = EmbeddedChannel()
  150. let handler = GRPCServerStreamHandler(
  151. scheme: .http,
  152. acceptedEncodings: [],
  153. maximumPayloadSize: 1,
  154. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  155. )
  156. try channel.pipeline.syncOperations.addHandler(handler)
  157. // Receive client's initial metadata without :path
  158. let clientInitialMetadata: HPACKHeaders = [
  159. GRPCHTTP2Keys.scheme.rawValue: "http",
  160. GRPCHTTP2Keys.method.rawValue: "POST",
  161. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  162. GRPCHTTP2Keys.te.rawValue: "trailers",
  163. ]
  164. XCTAssertNoThrow(
  165. try channel.writeInbound(
  166. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  167. )
  168. )
  169. // Make sure we have sent a trailers-only response
  170. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  171. XCTAssertEqual(
  172. writtenTrailersOnlyResponse.headers,
  173. [
  174. GRPCHTTP2Keys.status.rawValue: "200",
  175. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  176. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  177. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  178. ]
  179. )
  180. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  181. }
  182. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  183. let channel = EmbeddedChannel()
  184. let handler = GRPCServerStreamHandler(
  185. scheme: .http,
  186. acceptedEncodings: [],
  187. maximumPayloadSize: 1,
  188. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  189. )
  190. try channel.pipeline.syncOperations.addHandler(handler)
  191. // Receive client's initial metadata without TE
  192. let clientInitialMetadata: HPACKHeaders = [
  193. GRPCHTTP2Keys.path.rawValue: "/test/test",
  194. GRPCHTTP2Keys.scheme.rawValue: "http",
  195. GRPCHTTP2Keys.method.rawValue: "POST",
  196. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  197. ]
  198. XCTAssertNoThrow(
  199. try channel.writeInbound(
  200. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  201. )
  202. )
  203. // Make sure we have sent a trailers-only response
  204. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  205. XCTAssertEqual(
  206. writtenTrailersOnlyResponse.headers,
  207. [
  208. GRPCHTTP2Keys.status.rawValue: "200",
  209. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  210. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  211. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  212. "\"te\" header is expected to be present and have a value of \"trailers\".",
  213. ]
  214. )
  215. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  216. }
  217. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  218. let channel = EmbeddedChannel()
  219. let handler = GRPCServerStreamHandler(
  220. scheme: .http,
  221. acceptedEncodings: [],
  222. maximumPayloadSize: 100,
  223. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  224. )
  225. try channel.pipeline.syncOperations.addHandler(handler)
  226. // Receive client's initial metadata
  227. let clientInitialMetadata: HPACKHeaders = [
  228. GRPCHTTP2Keys.path.rawValue: "/test/test",
  229. GRPCHTTP2Keys.scheme.rawValue: "http",
  230. GRPCHTTP2Keys.method.rawValue: "POST",
  231. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  232. GRPCHTTP2Keys.te.rawValue: "trailers",
  233. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  234. ]
  235. XCTAssertNoThrow(
  236. try channel.writeInbound(
  237. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  238. )
  239. )
  240. // Make sure we have sent a trailers-only response
  241. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  242. XCTAssertEqual(
  243. writtenTrailersOnlyResponse.headers,
  244. [
  245. GRPCHTTP2Keys.status.rawValue: "200",
  246. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  247. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  248. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  249. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  250. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  251. ]
  252. )
  253. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  254. }
  255. func testOverMaximumPayloadSize() throws {
  256. let channel = EmbeddedChannel()
  257. let handler = GRPCServerStreamHandler(
  258. scheme: .http,
  259. acceptedEncodings: [],
  260. maximumPayloadSize: 1,
  261. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  262. )
  263. try channel.pipeline.syncOperations.addHandler(handler)
  264. // Receive client's initial metadata
  265. let clientInitialMetadata: HPACKHeaders = [
  266. GRPCHTTP2Keys.path.rawValue: "/test/test",
  267. GRPCHTTP2Keys.scheme.rawValue: "http",
  268. GRPCHTTP2Keys.method.rawValue: "POST",
  269. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  270. GRPCHTTP2Keys.te.rawValue: "trailers",
  271. ]
  272. XCTAssertNoThrow(
  273. try channel.writeInbound(
  274. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  275. )
  276. )
  277. // Make sure we haven't sent back an error response, and that we read the initial metadata
  278. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  279. XCTAssertEqual(
  280. try channel.readInbound(as: RPCRequestPart.self),
  281. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  282. )
  283. // Write back server's initial metadata
  284. let headers: HPACKHeaders = [
  285. "some-custom-header": "some-custom-value"
  286. ]
  287. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  288. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  289. // Make sure we wrote back the initial metadata
  290. let writtenHeaders = try channel.assertReadHeadersOutbound()
  291. XCTAssertEqual(
  292. writtenHeaders.headers,
  293. [
  294. GRPCHTTP2Keys.status.rawValue: "200",
  295. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  296. "some-custom-header": "some-custom-value",
  297. ]
  298. )
  299. // Receive client's message
  300. var buffer = ByteBuffer()
  301. buffer.writeInteger(UInt8(0)) // not compressed
  302. buffer.writeInteger(UInt32(42)) // message length
  303. buffer.writeRepeatingByte(0, count: 42) // message
  304. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  305. XCTAssertThrowsError(
  306. ofType: RPCError.self,
  307. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  308. ) { error in
  309. XCTAssertEqual(error.code, .internalError)
  310. XCTAssertEqual(error.message, "Failed to decode message")
  311. }
  312. // Make sure we haven't sent a response back and that we didn't read the received message
  313. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  314. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  315. }
  316. func testClientEndsStream() throws {
  317. let channel = EmbeddedChannel()
  318. let handler = GRPCServerStreamHandler(
  319. scheme: .http,
  320. acceptedEncodings: [],
  321. maximumPayloadSize: 1,
  322. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  323. skipStateMachineAssertions: true
  324. )
  325. try channel.pipeline.syncOperations.addHandler(handler)
  326. // Receive client's initial metadata with end stream set
  327. let clientInitialMetadata: HPACKHeaders = [
  328. GRPCHTTP2Keys.path.rawValue: "/test/test",
  329. GRPCHTTP2Keys.scheme.rawValue: "http",
  330. GRPCHTTP2Keys.method.rawValue: "POST",
  331. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  332. GRPCHTTP2Keys.te.rawValue: "trailers",
  333. ]
  334. XCTAssertNoThrow(
  335. try channel.writeInbound(
  336. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  337. )
  338. )
  339. // Make sure we haven't sent back an error response, and that we read the initial metadata
  340. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  341. XCTAssertEqual(
  342. try channel.readInbound(as: RPCRequestPart.self),
  343. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  344. )
  345. // Write back server's initial metadata
  346. let headers: HPACKHeaders = [
  347. "some-custom-header": "some-custom-value"
  348. ]
  349. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  350. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  351. // Make sure we wrote back the initial metadata
  352. let writtenHeaders = try channel.assertReadHeadersOutbound()
  353. XCTAssertEqual(
  354. writtenHeaders.headers,
  355. [
  356. GRPCHTTP2Keys.status.rawValue: "200",
  357. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  358. "some-custom-header": "some-custom-value",
  359. ]
  360. )
  361. // We should throw if the client sends another message, since it's closed the stream already.
  362. var buffer = ByteBuffer()
  363. buffer.writeInteger(UInt8(0)) // not compressed
  364. buffer.writeInteger(UInt32(42)) // message length
  365. buffer.writeRepeatingByte(0, count: 42) // message
  366. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  367. XCTAssertThrowsError(
  368. ofType: RPCError.self,
  369. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  370. ) { error in
  371. XCTAssertEqual(error.code, .internalError)
  372. XCTAssertEqual(error.message, "Invalid state")
  373. }
  374. }
  375. func testNormalFlow() throws {
  376. let channel = EmbeddedChannel()
  377. let handler = GRPCServerStreamHandler(
  378. scheme: .http,
  379. acceptedEncodings: [],
  380. maximumPayloadSize: 42,
  381. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  382. skipStateMachineAssertions: true
  383. )
  384. try channel.pipeline.syncOperations.addHandler(handler)
  385. // Receive client's initial metadata
  386. let clientInitialMetadata: HPACKHeaders = [
  387. GRPCHTTP2Keys.path.rawValue: "/test/test",
  388. GRPCHTTP2Keys.scheme.rawValue: "http",
  389. GRPCHTTP2Keys.method.rawValue: "POST",
  390. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  391. GRPCHTTP2Keys.te.rawValue: "trailers",
  392. ]
  393. XCTAssertNoThrow(
  394. try channel.writeInbound(
  395. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  396. )
  397. )
  398. // Make sure we haven't sent back an error response, and that we read the initial metadata
  399. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  400. XCTAssertEqual(
  401. try channel.readInbound(as: RPCRequestPart.self),
  402. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  403. )
  404. // Write back server's initial metadata
  405. let headers: HPACKHeaders = [
  406. "some-custom-header": "some-custom-value"
  407. ]
  408. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  409. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  410. // Make sure we wrote back the initial metadata
  411. let writtenHeaders = try channel.assertReadHeadersOutbound()
  412. XCTAssertEqual(
  413. writtenHeaders.headers,
  414. [
  415. GRPCHTTP2Keys.status.rawValue: "200",
  416. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  417. "some-custom-header": "some-custom-value",
  418. ]
  419. )
  420. // Receive client's message
  421. var buffer = ByteBuffer()
  422. buffer.writeInteger(UInt8(0)) // not compressed
  423. buffer.writeInteger(UInt32(42)) // message length
  424. buffer.writeRepeatingByte(0, count: 42) // message
  425. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  426. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  427. // Make sure we haven't sent back an error response, and that we read the message properly
  428. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  429. XCTAssertEqual(
  430. try channel.readInbound(as: RPCRequestPart.self),
  431. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  432. )
  433. // Write back response
  434. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  435. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  436. // Make sure we wrote back the right message
  437. let writtenMessage = try channel.assertReadDataOutbound()
  438. var expectedBuffer = ByteBuffer()
  439. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  440. expectedBuffer.writeInteger(UInt32(42)) // message length
  441. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  442. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  443. // Send back status to end RPC
  444. let trailers = RPCResponsePart.status(
  445. .init(code: .dataLoss, message: "Test data loss"),
  446. ["custom-header": "custom-value"]
  447. )
  448. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  449. // Make sure we wrote back the status and trailers
  450. let writtenStatus = try channel.assertReadHeadersOutbound()
  451. XCTAssertTrue(writtenStatus.endStream)
  452. XCTAssertEqual(
  453. writtenStatus.headers,
  454. [
  455. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  456. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  457. "custom-header": "custom-value",
  458. ]
  459. )
  460. // Try writing and assert it throws to make sure we don't allow writes
  461. // after closing.
  462. XCTAssertThrowsError(
  463. ofType: RPCError.self,
  464. try channel.writeOutbound(trailers)
  465. ) { error in
  466. XCTAssertEqual(error.code, .internalError)
  467. XCTAssertEqual(error.message, "Invalid state")
  468. }
  469. }
  470. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  471. let channel = EmbeddedChannel()
  472. let handler = GRPCServerStreamHandler(
  473. scheme: .http,
  474. acceptedEncodings: [],
  475. maximumPayloadSize: 100,
  476. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  477. )
  478. try channel.pipeline.syncOperations.addHandler(handler)
  479. // Receive client's initial metadata
  480. let clientInitialMetadata: HPACKHeaders = [
  481. GRPCHTTP2Keys.path.rawValue: "/test/test",
  482. GRPCHTTP2Keys.scheme.rawValue: "http",
  483. GRPCHTTP2Keys.method.rawValue: "POST",
  484. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  485. GRPCHTTP2Keys.te.rawValue: "trailers",
  486. ]
  487. XCTAssertNoThrow(
  488. try channel.writeInbound(
  489. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  490. )
  491. )
  492. // Make sure we haven't sent back an error response, and that we read the initial metadata
  493. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  494. XCTAssertEqual(
  495. try channel.readInbound(as: RPCRequestPart.self),
  496. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  497. )
  498. // Write back server's initial metadata
  499. let headers: HPACKHeaders = [
  500. "some-custom-header": "some-custom-value"
  501. ]
  502. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  503. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  504. // Make sure we wrote back the initial metadata
  505. let writtenHeaders = try channel.assertReadHeadersOutbound()
  506. XCTAssertEqual(
  507. writtenHeaders.headers,
  508. [
  509. GRPCHTTP2Keys.status.rawValue: "200",
  510. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  511. "some-custom-header": "some-custom-value",
  512. ]
  513. )
  514. // Receive client's first message
  515. var buffer = ByteBuffer()
  516. buffer.writeInteger(UInt8(0)) // not compressed
  517. XCTAssertNoThrow(
  518. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  519. )
  520. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  521. buffer.clear()
  522. buffer.writeInteger(UInt32(30)) // message length
  523. XCTAssertNoThrow(
  524. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  525. )
  526. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  527. buffer.clear()
  528. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  529. XCTAssertNoThrow(
  530. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  531. )
  532. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  533. buffer.clear()
  534. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  535. XCTAssertNoThrow(
  536. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  537. )
  538. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  539. buffer.clear()
  540. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  541. XCTAssertNoThrow(
  542. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  543. )
  544. // Make sure we haven't sent back an error response, and that we read the message properly
  545. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  546. XCTAssertEqual(
  547. try channel.readInbound(as: RPCRequestPart.self),
  548. RPCRequestPart.message(
  549. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  550. + [UInt8](repeating: 2, count: 10)
  551. )
  552. )
  553. }
  554. func testReceiveMultipleHeaders() throws {
  555. let channel = EmbeddedChannel()
  556. let handler = GRPCServerStreamHandler(
  557. scheme: .http,
  558. acceptedEncodings: [],
  559. maximumPayloadSize: 100,
  560. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  561. )
  562. try channel.pipeline.syncOperations.addHandler(handler)
  563. // Receive client's initial metadata
  564. let clientInitialMetadata: HPACKHeaders = [
  565. GRPCHTTP2Keys.path.rawValue: "/test/test",
  566. GRPCHTTP2Keys.scheme.rawValue: "http",
  567. GRPCHTTP2Keys.method.rawValue: "POST",
  568. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  569. GRPCHTTP2Keys.te.rawValue: "trailers",
  570. ]
  571. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  572. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  573. // Receive them again. Should be a protocol violation.
  574. XCTAssertThrowsError(
  575. ofType: RPCError.self,
  576. try channel.writeInbound(
  577. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  578. )
  579. ) { error in
  580. XCTAssertEqual(error.code, .unavailable)
  581. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  582. }
  583. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  584. switch payload {
  585. case .rstStream(let errorCode):
  586. XCTAssertEqual(errorCode, .protocolError)
  587. default:
  588. XCTFail("Expected RST_STREAM, got \(payload)")
  589. }
  590. }
  591. func testSendMultipleMessagesInSingleBuffer() throws {
  592. let channel = EmbeddedChannel()
  593. let handler = GRPCServerStreamHandler(
  594. scheme: .http,
  595. acceptedEncodings: [],
  596. maximumPayloadSize: 100,
  597. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  598. )
  599. try channel.pipeline.syncOperations.addHandler(handler)
  600. // Receive client's initial metadata
  601. let clientInitialMetadata: HPACKHeaders = [
  602. GRPCHTTP2Keys.path.rawValue: "/test/test",
  603. GRPCHTTP2Keys.scheme.rawValue: "http",
  604. GRPCHTTP2Keys.method.rawValue: "POST",
  605. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  606. GRPCHTTP2Keys.te.rawValue: "trailers",
  607. ]
  608. XCTAssertNoThrow(
  609. try channel.writeInbound(
  610. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  611. )
  612. )
  613. // Make sure we haven't sent back an error response, and that we read the initial metadata
  614. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  615. XCTAssertEqual(
  616. try channel.readInbound(as: RPCRequestPart.self),
  617. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  618. )
  619. // Write back server's initial metadata
  620. let headers: HPACKHeaders = [
  621. "some-custom-header": "some-custom-value"
  622. ]
  623. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  624. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  625. // Read out the metadata
  626. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  627. // This is where this test actually begins. We want to write two messages
  628. // without flushing, and make sure that no messages are sent down the pipeline
  629. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  630. // Write back first message and make sure nothing's written in the channel.
  631. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  632. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  633. // Write back second message and make sure nothing's written in the channel.
  634. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  635. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  636. // Now flush and check we *do* write the data.
  637. channel.flush()
  638. let writtenMessage = try channel.assertReadDataOutbound()
  639. // Make sure both messages have been framed together in the ByteBuffer.
  640. XCTAssertEqual(
  641. writtenMessage.data,
  642. .byteBuffer(
  643. .init(bytes: [
  644. // First message
  645. 0, // Compression disabled
  646. 0, 0, 0, 4, // Message length
  647. 1, 1, 1, 1, // First message data
  648. // Second message
  649. 0, // Compression disabled
  650. 0, 0, 0, 4, // Message length
  651. 2, 2, 2, 2, // Second message data
  652. ])
  653. )
  654. )
  655. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  656. }
  657. func testMessageAndStatusAreNotReordered() throws {
  658. let channel = EmbeddedChannel()
  659. let handler = GRPCServerStreamHandler(
  660. scheme: .http,
  661. acceptedEncodings: [],
  662. maximumPayloadSize: 100,
  663. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  664. )
  665. try channel.pipeline.syncOperations.addHandler(handler)
  666. // Receive client's initial metadata
  667. let clientInitialMetadata: HPACKHeaders = [
  668. GRPCHTTP2Keys.path.rawValue: "/test/test",
  669. GRPCHTTP2Keys.scheme.rawValue: "http",
  670. GRPCHTTP2Keys.method.rawValue: "POST",
  671. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  672. GRPCHTTP2Keys.te.rawValue: "trailers",
  673. ]
  674. XCTAssertNoThrow(
  675. try channel.writeInbound(
  676. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  677. )
  678. )
  679. // Make sure we haven't sent back an error response, and that we read the initial metadata
  680. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  681. XCTAssertEqual(
  682. try channel.readInbound(as: RPCRequestPart.self),
  683. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  684. )
  685. // Write back server's initial metadata
  686. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  687. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  688. // Read out the metadata
  689. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  690. // This is where this test actually begins. We want to write a message followed
  691. // by status and trailers, and only flush after both writes.
  692. // Because messages are buffered and potentially bundled together in a single
  693. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  694. // and trailers won't be written before the messages.
  695. // Write back message and make sure nothing's written in the channel.
  696. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  697. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  698. // Write status + metadata and make sure nothing's written.
  699. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  700. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  701. // Now flush and check we *do* write the data in the right order: message first,
  702. // trailers second.
  703. channel.flush()
  704. let writtenMessage = try channel.assertReadDataOutbound()
  705. // Make sure we first get message.
  706. XCTAssertEqual(
  707. writtenMessage.data,
  708. .byteBuffer(
  709. .init(bytes: [
  710. // First message
  711. 0, // Compression disabled
  712. 0, 0, 0, 4, // Message length
  713. 1, 1, 1, 1, // First message data
  714. ])
  715. )
  716. )
  717. XCTAssertFalse(writtenMessage.endStream)
  718. // Make sure we get trailers.
  719. let writtenTrailers = try channel.assertReadHeadersOutbound()
  720. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  721. XCTAssertTrue(writtenTrailers.endStream)
  722. // Make sure we get nothing else.
  723. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  724. }
  725. func testMethodDescriptorPromiseSucceeds() throws {
  726. let channel = EmbeddedChannel()
  727. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  728. let handler = GRPCServerStreamHandler(
  729. scheme: .http,
  730. acceptedEncodings: [],
  731. maximumPayloadSize: 100,
  732. methodDescriptorPromise: promise,
  733. skipStateMachineAssertions: true
  734. )
  735. try channel.pipeline.syncOperations.addHandler(handler)
  736. // Receive client's initial metadata
  737. let clientInitialMetadata: HPACKHeaders = [
  738. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  739. GRPCHTTP2Keys.scheme.rawValue: "http",
  740. GRPCHTTP2Keys.method.rawValue: "POST",
  741. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  742. GRPCHTTP2Keys.te.rawValue: "trailers",
  743. ]
  744. XCTAssertNoThrow(
  745. try channel.writeInbound(
  746. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  747. )
  748. )
  749. // Make sure we haven't sent back an error response, and that we read the initial metadata
  750. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  751. XCTAssertEqual(
  752. try channel.readInbound(as: RPCRequestPart.self),
  753. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  754. )
  755. XCTAssertEqual(
  756. try promise.futureResult.wait(),
  757. MethodDescriptor(service: "SomeService", method: "SomeMethod")
  758. )
  759. }
  760. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  761. let channel = EmbeddedChannel()
  762. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  763. let handler = GRPCServerStreamHandler(
  764. scheme: .http,
  765. acceptedEncodings: [],
  766. maximumPayloadSize: 100,
  767. methodDescriptorPromise: promise,
  768. skipStateMachineAssertions: true
  769. )
  770. try channel.pipeline.syncOperations.addHandler(handler)
  771. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  772. XCTAssertThrowsError(
  773. ofType: RPCError.self,
  774. try promise.futureResult.wait()
  775. ) { error in
  776. XCTAssertEqual(error.code, .unavailable)
  777. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  778. }
  779. }
  780. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  781. let channel = EmbeddedChannel()
  782. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  783. let handler = GRPCServerStreamHandler(
  784. scheme: .http,
  785. acceptedEncodings: [],
  786. maximumPayloadSize: 100,
  787. methodDescriptorPromise: promise,
  788. skipStateMachineAssertions: true
  789. )
  790. try channel.pipeline.syncOperations.addHandler(handler)
  791. // Receive client's initial metadata
  792. let clientInitialMetadata: HPACKHeaders = [
  793. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  794. GRPCHTTP2Keys.scheme.rawValue: "http",
  795. GRPCHTTP2Keys.method.rawValue: "POST",
  796. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  797. GRPCHTTP2Keys.te.rawValue: "trailers",
  798. ]
  799. XCTAssertNoThrow(
  800. try channel.writeInbound(
  801. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  802. )
  803. )
  804. XCTAssertThrowsError(
  805. ofType: RPCError.self,
  806. try promise.futureResult.wait()
  807. ) { error in
  808. XCTAssertEqual(error.code, .unavailable)
  809. XCTAssertEqual(error.message, "RPC was rejected.")
  810. }
  811. }
  812. func testUnexpectedStreamClose_ErrorFired() throws {
  813. let channel = EmbeddedChannel()
  814. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  815. let handler = GRPCServerStreamHandler(
  816. scheme: .http,
  817. acceptedEncodings: [],
  818. maximumPayloadSize: 100,
  819. methodDescriptorPromise: promise,
  820. skipStateMachineAssertions: true
  821. )
  822. try channel.pipeline.syncOperations.addHandler(handler)
  823. // Receive client's initial metadata
  824. let clientInitialMetadata: HPACKHeaders = [
  825. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  826. GRPCHTTP2Keys.scheme.rawValue: "http",
  827. GRPCHTTP2Keys.method.rawValue: "POST",
  828. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  829. GRPCHTTP2Keys.te.rawValue: "trailers",
  830. ]
  831. XCTAssertNoThrow(
  832. try channel.writeInbound(
  833. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  834. )
  835. )
  836. // Make sure we haven't sent back an error response, and that we read the initial metadata
  837. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  838. XCTAssertEqual(
  839. try channel.readInbound(as: RPCRequestPart.self),
  840. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  841. )
  842. // An error is fired down the pipeline
  843. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  844. channel.pipeline.fireErrorCaught(thrownError)
  845. // The server handler simply forwards the error.
  846. XCTAssertThrowsError(
  847. ofType: type(of: thrownError),
  848. try channel.throwIfErrorCaught()
  849. ) { error in
  850. XCTAssertEqual(error, thrownError)
  851. }
  852. // We should now be closed: check we can't write anymore.
  853. XCTAssertThrowsError(
  854. ofType: RPCError.self,
  855. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  856. ) { error in
  857. XCTAssertEqual(error.code, .internalError)
  858. XCTAssertEqual(error.message, "Invalid state")
  859. }
  860. }
  861. func testUnexpectedStreamClose_ChannelInactive() throws {
  862. let channel = EmbeddedChannel()
  863. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  864. let handler = GRPCServerStreamHandler(
  865. scheme: .http,
  866. acceptedEncodings: [],
  867. maximumPayloadSize: 100,
  868. methodDescriptorPromise: promise,
  869. skipStateMachineAssertions: true
  870. )
  871. try channel.pipeline.syncOperations.addHandler(handler)
  872. // Receive client's initial metadata
  873. let clientInitialMetadata: HPACKHeaders = [
  874. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  875. GRPCHTTP2Keys.scheme.rawValue: "http",
  876. GRPCHTTP2Keys.method.rawValue: "POST",
  877. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  878. GRPCHTTP2Keys.te.rawValue: "trailers",
  879. ]
  880. XCTAssertNoThrow(
  881. try channel.writeInbound(
  882. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  883. )
  884. )
  885. // Make sure we haven't sent back an error response, and that we read the initial metadata
  886. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  887. XCTAssertEqual(
  888. try channel.readInbound(as: RPCRequestPart.self),
  889. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  890. )
  891. // Channel becomes inactive
  892. channel.pipeline.fireChannelInactive()
  893. // The server handler fires an error
  894. XCTAssertThrowsError(
  895. ofType: RPCError.self,
  896. try channel.throwIfErrorCaught()
  897. ) { error in
  898. XCTAssertEqual(error.code, .unavailable)
  899. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  900. }
  901. // We should now be closed: check we can't write anymore.
  902. XCTAssertThrowsError(
  903. ofType: RPCError.self,
  904. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  905. ) { error in
  906. XCTAssertEqual(error.code, .internalError)
  907. XCTAssertEqual(error.message, "Invalid state")
  908. }
  909. }
  910. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  911. let channel = EmbeddedChannel()
  912. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  913. let handler = GRPCServerStreamHandler(
  914. scheme: .http,
  915. acceptedEncodings: [],
  916. maximumPayloadSize: 100,
  917. methodDescriptorPromise: promise,
  918. skipStateMachineAssertions: true
  919. )
  920. try channel.pipeline.syncOperations.addHandler(handler)
  921. // Receive client's initial metadata
  922. let clientInitialMetadata: HPACKHeaders = [
  923. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  924. GRPCHTTP2Keys.scheme.rawValue: "http",
  925. GRPCHTTP2Keys.method.rawValue: "POST",
  926. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  927. GRPCHTTP2Keys.te.rawValue: "trailers",
  928. ]
  929. XCTAssertNoThrow(
  930. try channel.writeInbound(
  931. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  932. )
  933. )
  934. // Make sure we haven't sent back an error response, and that we read the initial metadata
  935. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  936. XCTAssertEqual(
  937. try channel.readInbound(as: RPCRequestPart.self),
  938. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  939. )
  940. // We receive RST_STREAM frame
  941. // Assert the server handler fires an error
  942. XCTAssertThrowsError(
  943. ofType: RPCError.self,
  944. try channel.writeInbound(
  945. HTTP2Frame.FramePayload.rstStream(.internalError)
  946. )
  947. ) { error in
  948. XCTAssertEqual(error.code, .unavailable)
  949. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  950. }
  951. // We should now be closed: check we can't write anymore.
  952. XCTAssertThrowsError(
  953. ofType: RPCError.self,
  954. try channel.writeOutbound(RPCResponsePart.metadata(Metadata()))
  955. ) { error in
  956. XCTAssertEqual(error.code, .internalError)
  957. XCTAssertEqual(error.message, "Invalid state")
  958. }
  959. }
  960. }
  961. extension EmbeddedChannel {
  962. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  963. guard
  964. case .headers(let writtenHeaders) = try XCTUnwrap(
  965. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  966. )
  967. else {
  968. throw TestError.assertionFailure("Expected to write headers")
  969. }
  970. return writtenHeaders
  971. }
  972. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  973. guard
  974. case .data(let writtenMessage) = try XCTUnwrap(
  975. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  976. )
  977. else {
  978. throw TestError.assertionFailure("Expected to write data")
  979. }
  980. return writtenMessage
  981. }
  982. }
  983. private enum TestError: Error {
  984. case assertionFailure(String)
  985. }