GRPCServerStreamHandlerTests.swift 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let channel = EmbeddedChannel()
  27. let handler = GRPCServerStreamHandler(
  28. scheme: .http,
  29. acceptedEncodings: [],
  30. maximumPayloadSize: 1,
  31. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  32. )
  33. try channel.pipeline.syncOperations.addHandler(handler)
  34. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  35. .ping(.init(), ack: false),
  36. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  37. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  38. // initialiser is internal, so I can't create one of these frames.
  39. .rstStream(.cancel),
  40. .settings(.ack),
  41. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  42. .windowUpdate(windowSizeIncrement: 4),
  43. .alternativeService(origin: nil, field: nil),
  44. .origin([]),
  45. ]
  46. for toBeIgnored in framesToBeIgnored {
  47. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  48. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  49. }
  50. }
  51. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  52. let channel = EmbeddedChannel()
  53. let handler = GRPCServerStreamHandler(
  54. scheme: .http,
  55. acceptedEncodings: [],
  56. maximumPayloadSize: 1,
  57. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  58. )
  59. try channel.pipeline.syncOperations.addHandler(handler)
  60. // Receive client's initial metadata without content-type
  61. let clientInitialMetadata: HPACKHeaders = [
  62. GRPCHTTP2Keys.path.rawValue: "/test/test",
  63. GRPCHTTP2Keys.scheme.rawValue: "http",
  64. GRPCHTTP2Keys.method.rawValue: "POST",
  65. GRPCHTTP2Keys.te.rawValue: "trailers",
  66. ]
  67. XCTAssertNoThrow(
  68. try channel.writeInbound(
  69. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  70. )
  71. )
  72. // Make sure we have sent a trailers-only response
  73. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  74. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  75. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  76. }
  77. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  78. let channel = EmbeddedChannel()
  79. let handler = GRPCServerStreamHandler(
  80. scheme: .http,
  81. acceptedEncodings: [],
  82. maximumPayloadSize: 1,
  83. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  84. )
  85. try channel.pipeline.syncOperations.addHandler(handler)
  86. // Receive client's initial metadata without :method
  87. let clientInitialMetadata: HPACKHeaders = [
  88. GRPCHTTP2Keys.path.rawValue: "/test/test",
  89. GRPCHTTP2Keys.scheme.rawValue: "http",
  90. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  91. GRPCHTTP2Keys.te.rawValue: "trailers",
  92. ]
  93. XCTAssertNoThrow(
  94. try channel.writeInbound(
  95. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  96. )
  97. )
  98. // Make sure we have sent a trailers-only response
  99. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  100. XCTAssertEqual(
  101. writtenTrailersOnlyResponse.headers,
  102. [
  103. GRPCHTTP2Keys.status.rawValue: "200",
  104. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  105. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  106. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  107. ":method header is expected to be present and have a value of \"POST\".",
  108. ]
  109. )
  110. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  111. }
  112. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  113. let channel = EmbeddedChannel()
  114. let handler = GRPCServerStreamHandler(
  115. scheme: .http,
  116. acceptedEncodings: [],
  117. maximumPayloadSize: 1,
  118. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  119. )
  120. try channel.pipeline.syncOperations.addHandler(handler)
  121. // Receive client's initial metadata without :scheme
  122. let clientInitialMetadata: HPACKHeaders = [
  123. GRPCHTTP2Keys.path.rawValue: "/test/test",
  124. GRPCHTTP2Keys.method.rawValue: "POST",
  125. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  126. GRPCHTTP2Keys.te.rawValue: "trailers",
  127. ]
  128. XCTAssertNoThrow(
  129. try channel.writeInbound(
  130. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  131. )
  132. )
  133. // Make sure we have sent a trailers-only response
  134. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  135. XCTAssertEqual(
  136. writtenTrailersOnlyResponse.headers,
  137. [
  138. GRPCHTTP2Keys.status.rawValue: "200",
  139. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  140. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  141. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  142. ":scheme header must be present and one of \"http\" or \"https\".",
  143. ]
  144. )
  145. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  146. }
  147. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  148. let channel = EmbeddedChannel()
  149. let handler = GRPCServerStreamHandler(
  150. scheme: .http,
  151. acceptedEncodings: [],
  152. maximumPayloadSize: 1,
  153. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  154. )
  155. try channel.pipeline.syncOperations.addHandler(handler)
  156. // Receive client's initial metadata without :path
  157. let clientInitialMetadata: HPACKHeaders = [
  158. GRPCHTTP2Keys.scheme.rawValue: "http",
  159. GRPCHTTP2Keys.method.rawValue: "POST",
  160. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  161. GRPCHTTP2Keys.te.rawValue: "trailers",
  162. ]
  163. XCTAssertNoThrow(
  164. try channel.writeInbound(
  165. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  166. )
  167. )
  168. // Make sure we have sent a trailers-only response
  169. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  170. XCTAssertEqual(
  171. writtenTrailersOnlyResponse.headers,
  172. [
  173. GRPCHTTP2Keys.status.rawValue: "200",
  174. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  175. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  176. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  177. ]
  178. )
  179. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  180. }
  181. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  182. let channel = EmbeddedChannel()
  183. let handler = GRPCServerStreamHandler(
  184. scheme: .http,
  185. acceptedEncodings: [],
  186. maximumPayloadSize: 1,
  187. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  188. )
  189. try channel.pipeline.syncOperations.addHandler(handler)
  190. // Receive client's initial metadata without TE
  191. let clientInitialMetadata: HPACKHeaders = [
  192. GRPCHTTP2Keys.path.rawValue: "/test/test",
  193. GRPCHTTP2Keys.scheme.rawValue: "http",
  194. GRPCHTTP2Keys.method.rawValue: "POST",
  195. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  196. ]
  197. XCTAssertNoThrow(
  198. try channel.writeInbound(
  199. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  200. )
  201. )
  202. // Make sure we have sent a trailers-only response
  203. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  204. XCTAssertEqual(
  205. writtenTrailersOnlyResponse.headers,
  206. [
  207. GRPCHTTP2Keys.status.rawValue: "200",
  208. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  209. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  210. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  211. "\"te\" header is expected to be present and have a value of \"trailers\".",
  212. ]
  213. )
  214. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  215. }
  216. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  217. let channel = EmbeddedChannel()
  218. let handler = GRPCServerStreamHandler(
  219. scheme: .http,
  220. acceptedEncodings: [],
  221. maximumPayloadSize: 100,
  222. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  223. )
  224. try channel.pipeline.syncOperations.addHandler(handler)
  225. // Receive client's initial metadata
  226. let clientInitialMetadata: HPACKHeaders = [
  227. GRPCHTTP2Keys.path.rawValue: "/test/test",
  228. GRPCHTTP2Keys.scheme.rawValue: "http",
  229. GRPCHTTP2Keys.method.rawValue: "POST",
  230. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  231. GRPCHTTP2Keys.te.rawValue: "trailers",
  232. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  233. ]
  234. XCTAssertNoThrow(
  235. try channel.writeInbound(
  236. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  237. )
  238. )
  239. // Make sure we have sent a trailers-only response
  240. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  241. XCTAssertEqual(
  242. writtenTrailersOnlyResponse.headers,
  243. [
  244. GRPCHTTP2Keys.status.rawValue: "200",
  245. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  246. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  247. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  248. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  249. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  250. ]
  251. )
  252. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  253. }
  254. func testOverMaximumPayloadSize() throws {
  255. let channel = EmbeddedChannel()
  256. let handler = GRPCServerStreamHandler(
  257. scheme: .http,
  258. acceptedEncodings: [],
  259. maximumPayloadSize: 1,
  260. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  261. )
  262. try channel.pipeline.syncOperations.addHandler(handler)
  263. // Receive client's initial metadata
  264. let clientInitialMetadata: HPACKHeaders = [
  265. GRPCHTTP2Keys.path.rawValue: "/test/test",
  266. GRPCHTTP2Keys.scheme.rawValue: "http",
  267. GRPCHTTP2Keys.method.rawValue: "POST",
  268. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  269. GRPCHTTP2Keys.te.rawValue: "trailers",
  270. ]
  271. XCTAssertNoThrow(
  272. try channel.writeInbound(
  273. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  274. )
  275. )
  276. // Make sure we haven't sent back an error response, and that we read the initial metadata
  277. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  278. XCTAssertEqual(
  279. try channel.readInbound(as: RPCRequestPart.self),
  280. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  281. )
  282. // Write back server's initial metadata
  283. let headers: HPACKHeaders = [
  284. "some-custom-header": "some-custom-value"
  285. ]
  286. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  287. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  288. // Make sure we wrote back the initial metadata
  289. let writtenHeaders = try channel.assertReadHeadersOutbound()
  290. XCTAssertEqual(
  291. writtenHeaders.headers,
  292. [
  293. GRPCHTTP2Keys.status.rawValue: "200",
  294. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  295. "some-custom-header": "some-custom-value",
  296. ]
  297. )
  298. // Receive client's message
  299. var buffer = ByteBuffer()
  300. buffer.writeInteger(UInt8(0)) // not compressed
  301. buffer.writeInteger(UInt32(42)) // message length
  302. buffer.writeRepeatingByte(0, count: 42) // message
  303. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  304. XCTAssertThrowsError(
  305. ofType: RPCError.self,
  306. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  307. ) { error in
  308. XCTAssertEqual(error.code, .resourceExhausted)
  309. XCTAssertEqual(
  310. error.message,
  311. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  312. )
  313. }
  314. // Make sure we haven't sent a response back and that we didn't read the received message
  315. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  316. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  317. }
  318. func testClientEndsStream() throws {
  319. let channel = EmbeddedChannel()
  320. let handler = GRPCServerStreamHandler(
  321. scheme: .http,
  322. acceptedEncodings: [],
  323. maximumPayloadSize: 1,
  324. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  325. skipStateMachineAssertions: true
  326. )
  327. try channel.pipeline.syncOperations.addHandler(handler)
  328. // Receive client's initial metadata with end stream set
  329. let clientInitialMetadata: HPACKHeaders = [
  330. GRPCHTTP2Keys.path.rawValue: "/test/test",
  331. GRPCHTTP2Keys.scheme.rawValue: "http",
  332. GRPCHTTP2Keys.method.rawValue: "POST",
  333. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  334. GRPCHTTP2Keys.te.rawValue: "trailers",
  335. ]
  336. XCTAssertNoThrow(
  337. try channel.writeInbound(
  338. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  339. )
  340. )
  341. // Make sure we haven't sent back an error response, and that we read the initial metadata
  342. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  343. XCTAssertEqual(
  344. try channel.readInbound(as: RPCRequestPart.self),
  345. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  346. )
  347. // Write back server's initial metadata
  348. let headers: HPACKHeaders = [
  349. "some-custom-header": "some-custom-value"
  350. ]
  351. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  352. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  353. // Make sure we wrote back the initial metadata
  354. let writtenHeaders = try channel.assertReadHeadersOutbound()
  355. XCTAssertEqual(
  356. writtenHeaders.headers,
  357. [
  358. GRPCHTTP2Keys.status.rawValue: "200",
  359. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  360. "some-custom-header": "some-custom-value",
  361. ]
  362. )
  363. // We should throw if the client sends another message, since it's closed the stream already.
  364. var buffer = ByteBuffer()
  365. buffer.writeInteger(UInt8(0)) // not compressed
  366. buffer.writeInteger(UInt32(42)) // message length
  367. buffer.writeRepeatingByte(0, count: 42) // message
  368. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  369. XCTAssertThrowsError(
  370. ofType: RPCError.self,
  371. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  372. ) { error in
  373. XCTAssertEqual(error.code, .internalError)
  374. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  375. }
  376. }
  377. func testNormalFlow() throws {
  378. let channel = EmbeddedChannel()
  379. let handler = GRPCServerStreamHandler(
  380. scheme: .http,
  381. acceptedEncodings: [],
  382. maximumPayloadSize: 42,
  383. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  384. skipStateMachineAssertions: true
  385. )
  386. try channel.pipeline.syncOperations.addHandler(handler)
  387. // Receive client's initial metadata
  388. let clientInitialMetadata: HPACKHeaders = [
  389. GRPCHTTP2Keys.path.rawValue: "/test/test",
  390. GRPCHTTP2Keys.scheme.rawValue: "http",
  391. GRPCHTTP2Keys.method.rawValue: "POST",
  392. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  393. GRPCHTTP2Keys.te.rawValue: "trailers",
  394. ]
  395. XCTAssertNoThrow(
  396. try channel.writeInbound(
  397. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  398. )
  399. )
  400. // Make sure we haven't sent back an error response, and that we read the initial metadata
  401. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  402. XCTAssertEqual(
  403. try channel.readInbound(as: RPCRequestPart.self),
  404. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  405. )
  406. // Write back server's initial metadata
  407. let headers: HPACKHeaders = [
  408. "some-custom-header": "some-custom-value"
  409. ]
  410. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  411. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  412. // Make sure we wrote back the initial metadata
  413. let writtenHeaders = try channel.assertReadHeadersOutbound()
  414. XCTAssertEqual(
  415. writtenHeaders.headers,
  416. [
  417. GRPCHTTP2Keys.status.rawValue: "200",
  418. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  419. "some-custom-header": "some-custom-value",
  420. ]
  421. )
  422. // Receive client's message
  423. var buffer = ByteBuffer()
  424. buffer.writeInteger(UInt8(0)) // not compressed
  425. buffer.writeInteger(UInt32(42)) // message length
  426. buffer.writeRepeatingByte(0, count: 42) // message
  427. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  428. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  429. // Make sure we haven't sent back an error response, and that we read the message properly
  430. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  431. XCTAssertEqual(
  432. try channel.readInbound(as: RPCRequestPart.self),
  433. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  434. )
  435. // Write back response
  436. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  437. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  438. // Make sure we wrote back the right message
  439. let writtenMessage = try channel.assertReadDataOutbound()
  440. var expectedBuffer = ByteBuffer()
  441. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  442. expectedBuffer.writeInteger(UInt32(42)) // message length
  443. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  444. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  445. // Send back status to end RPC
  446. let trailers = RPCResponsePart.status(
  447. .init(code: .dataLoss, message: "Test data loss"),
  448. ["custom-header": "custom-value"]
  449. )
  450. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  451. // Make sure we wrote back the status and trailers
  452. let writtenStatus = try channel.assertReadHeadersOutbound()
  453. XCTAssertTrue(writtenStatus.endStream)
  454. XCTAssertEqual(
  455. writtenStatus.headers,
  456. [
  457. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  458. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  459. "custom-header": "custom-value",
  460. ]
  461. )
  462. // Try writing and assert it throws to make sure we don't allow writes
  463. // after closing.
  464. XCTAssertThrowsError(
  465. ofType: RPCError.self,
  466. try channel.writeOutbound(trailers)
  467. ) { error in
  468. XCTAssertEqual(error.code, .internalError)
  469. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  470. }
  471. }
  472. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  473. let channel = EmbeddedChannel()
  474. let handler = GRPCServerStreamHandler(
  475. scheme: .http,
  476. acceptedEncodings: [],
  477. maximumPayloadSize: 100,
  478. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  479. )
  480. try channel.pipeline.syncOperations.addHandler(handler)
  481. // Receive client's initial metadata
  482. let clientInitialMetadata: HPACKHeaders = [
  483. GRPCHTTP2Keys.path.rawValue: "/test/test",
  484. GRPCHTTP2Keys.scheme.rawValue: "http",
  485. GRPCHTTP2Keys.method.rawValue: "POST",
  486. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  487. GRPCHTTP2Keys.te.rawValue: "trailers",
  488. ]
  489. XCTAssertNoThrow(
  490. try channel.writeInbound(
  491. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  492. )
  493. )
  494. // Make sure we haven't sent back an error response, and that we read the initial metadata
  495. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  496. XCTAssertEqual(
  497. try channel.readInbound(as: RPCRequestPart.self),
  498. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  499. )
  500. // Write back server's initial metadata
  501. let headers: HPACKHeaders = [
  502. "some-custom-header": "some-custom-value"
  503. ]
  504. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  505. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  506. // Make sure we wrote back the initial metadata
  507. let writtenHeaders = try channel.assertReadHeadersOutbound()
  508. XCTAssertEqual(
  509. writtenHeaders.headers,
  510. [
  511. GRPCHTTP2Keys.status.rawValue: "200",
  512. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  513. "some-custom-header": "some-custom-value",
  514. ]
  515. )
  516. // Receive client's first message
  517. var buffer = ByteBuffer()
  518. buffer.writeInteger(UInt8(0)) // not compressed
  519. XCTAssertNoThrow(
  520. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  521. )
  522. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  523. buffer.clear()
  524. buffer.writeInteger(UInt32(30)) // message length
  525. XCTAssertNoThrow(
  526. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  527. )
  528. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  529. buffer.clear()
  530. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  531. XCTAssertNoThrow(
  532. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  533. )
  534. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  535. buffer.clear()
  536. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  537. XCTAssertNoThrow(
  538. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  539. )
  540. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  541. buffer.clear()
  542. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  543. XCTAssertNoThrow(
  544. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  545. )
  546. // Make sure we haven't sent back an error response, and that we read the message properly
  547. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  548. XCTAssertEqual(
  549. try channel.readInbound(as: RPCRequestPart.self),
  550. RPCRequestPart.message(
  551. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  552. + [UInt8](repeating: 2, count: 10)
  553. )
  554. )
  555. }
  556. func testReceiveMultipleHeaders() throws {
  557. let channel = EmbeddedChannel()
  558. let handler = GRPCServerStreamHandler(
  559. scheme: .http,
  560. acceptedEncodings: [],
  561. maximumPayloadSize: 100,
  562. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  563. )
  564. try channel.pipeline.syncOperations.addHandler(handler)
  565. // Receive client's initial metadata
  566. let clientInitialMetadata: HPACKHeaders = [
  567. GRPCHTTP2Keys.path.rawValue: "/test/test",
  568. GRPCHTTP2Keys.scheme.rawValue: "http",
  569. GRPCHTTP2Keys.method.rawValue: "POST",
  570. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  571. GRPCHTTP2Keys.te.rawValue: "trailers",
  572. ]
  573. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  574. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  575. // Receive them again. Should be a protocol violation.
  576. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  577. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  578. switch payload {
  579. case .rstStream(let errorCode):
  580. XCTAssertEqual(errorCode, .protocolError)
  581. default:
  582. XCTFail("Expected RST_STREAM, got \(payload)")
  583. }
  584. }
  585. func testSendMultipleMessagesInSingleBuffer() throws {
  586. let channel = EmbeddedChannel()
  587. let handler = GRPCServerStreamHandler(
  588. scheme: .http,
  589. acceptedEncodings: [],
  590. maximumPayloadSize: 100,
  591. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  592. )
  593. try channel.pipeline.syncOperations.addHandler(handler)
  594. // Receive client's initial metadata
  595. let clientInitialMetadata: HPACKHeaders = [
  596. GRPCHTTP2Keys.path.rawValue: "/test/test",
  597. GRPCHTTP2Keys.scheme.rawValue: "http",
  598. GRPCHTTP2Keys.method.rawValue: "POST",
  599. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  600. GRPCHTTP2Keys.te.rawValue: "trailers",
  601. ]
  602. XCTAssertNoThrow(
  603. try channel.writeInbound(
  604. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  605. )
  606. )
  607. // Make sure we haven't sent back an error response, and that we read the initial metadata
  608. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  609. XCTAssertEqual(
  610. try channel.readInbound(as: RPCRequestPart.self),
  611. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  612. )
  613. // Write back server's initial metadata
  614. let headers: HPACKHeaders = [
  615. "some-custom-header": "some-custom-value"
  616. ]
  617. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  618. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  619. // Read out the metadata
  620. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  621. // This is where this test actually begins. We want to write two messages
  622. // without flushing, and make sure that no messages are sent down the pipeline
  623. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  624. // Write back first message and make sure nothing's written in the channel.
  625. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  626. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  627. // Write back second message and make sure nothing's written in the channel.
  628. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  629. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  630. // Now flush and check we *do* write the data.
  631. channel.flush()
  632. let writtenMessage = try channel.assertReadDataOutbound()
  633. // Make sure both messages have been framed together in the ByteBuffer.
  634. XCTAssertEqual(
  635. writtenMessage.data,
  636. .byteBuffer(
  637. .init(bytes: [
  638. // First message
  639. 0, // Compression disabled
  640. 0, 0, 0, 4, // Message length
  641. 1, 1, 1, 1, // First message data
  642. // Second message
  643. 0, // Compression disabled
  644. 0, 0, 0, 4, // Message length
  645. 2, 2, 2, 2, // Second message data
  646. ])
  647. )
  648. )
  649. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  650. }
  651. func testMessageAndStatusAreNotReordered() throws {
  652. let channel = EmbeddedChannel()
  653. let handler = GRPCServerStreamHandler(
  654. scheme: .http,
  655. acceptedEncodings: [],
  656. maximumPayloadSize: 100,
  657. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  658. )
  659. try channel.pipeline.syncOperations.addHandler(handler)
  660. // Receive client's initial metadata
  661. let clientInitialMetadata: HPACKHeaders = [
  662. GRPCHTTP2Keys.path.rawValue: "/test/test",
  663. GRPCHTTP2Keys.scheme.rawValue: "http",
  664. GRPCHTTP2Keys.method.rawValue: "POST",
  665. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  666. GRPCHTTP2Keys.te.rawValue: "trailers",
  667. ]
  668. XCTAssertNoThrow(
  669. try channel.writeInbound(
  670. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  671. )
  672. )
  673. // Make sure we haven't sent back an error response, and that we read the initial metadata
  674. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  675. XCTAssertEqual(
  676. try channel.readInbound(as: RPCRequestPart.self),
  677. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  678. )
  679. // Write back server's initial metadata
  680. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  681. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  682. // Read out the metadata
  683. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  684. // This is where this test actually begins. We want to write a message followed
  685. // by status and trailers, and only flush after both writes.
  686. // Because messages are buffered and potentially bundled together in a single
  687. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  688. // and trailers won't be written before the messages.
  689. // Write back message and make sure nothing's written in the channel.
  690. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  691. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  692. // Write status + metadata and make sure nothing's written.
  693. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  694. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  695. // Now flush and check we *do* write the data in the right order: message first,
  696. // trailers second.
  697. channel.flush()
  698. let writtenMessage = try channel.assertReadDataOutbound()
  699. // Make sure we first get message.
  700. XCTAssertEqual(
  701. writtenMessage.data,
  702. .byteBuffer(
  703. .init(bytes: [
  704. // First message
  705. 0, // Compression disabled
  706. 0, 0, 0, 4, // Message length
  707. 1, 1, 1, 1, // First message data
  708. ])
  709. )
  710. )
  711. XCTAssertFalse(writtenMessage.endStream)
  712. // Make sure we get trailers.
  713. let writtenTrailers = try channel.assertReadHeadersOutbound()
  714. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  715. XCTAssertTrue(writtenTrailers.endStream)
  716. // Make sure we get nothing else.
  717. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  718. }
  719. func testMethodDescriptorPromiseSucceeds() throws {
  720. let channel = EmbeddedChannel()
  721. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  722. let handler = GRPCServerStreamHandler(
  723. scheme: .http,
  724. acceptedEncodings: [],
  725. maximumPayloadSize: 100,
  726. methodDescriptorPromise: promise,
  727. skipStateMachineAssertions: true
  728. )
  729. try channel.pipeline.syncOperations.addHandler(handler)
  730. // Receive client's initial metadata
  731. let clientInitialMetadata: HPACKHeaders = [
  732. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  733. GRPCHTTP2Keys.scheme.rawValue: "http",
  734. GRPCHTTP2Keys.method.rawValue: "POST",
  735. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  736. GRPCHTTP2Keys.te.rawValue: "trailers",
  737. ]
  738. XCTAssertNoThrow(
  739. try channel.writeInbound(
  740. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  741. )
  742. )
  743. // Make sure we haven't sent back an error response, and that we read the initial metadata
  744. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  745. XCTAssertEqual(
  746. try channel.readInbound(as: RPCRequestPart.self),
  747. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  748. )
  749. XCTAssertEqual(
  750. try promise.futureResult.wait(),
  751. MethodDescriptor(service: "SomeService", method: "SomeMethod")
  752. )
  753. }
  754. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  755. let channel = EmbeddedChannel()
  756. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  757. let handler = GRPCServerStreamHandler(
  758. scheme: .http,
  759. acceptedEncodings: [],
  760. maximumPayloadSize: 100,
  761. methodDescriptorPromise: promise,
  762. skipStateMachineAssertions: true
  763. )
  764. try channel.pipeline.syncOperations.addHandler(handler)
  765. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  766. XCTAssertThrowsError(
  767. ofType: RPCError.self,
  768. try promise.futureResult.wait()
  769. ) { error in
  770. XCTAssertEqual(error.code, .unavailable)
  771. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  772. }
  773. }
  774. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  775. let channel = EmbeddedChannel()
  776. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  777. let handler = GRPCServerStreamHandler(
  778. scheme: .http,
  779. acceptedEncodings: [],
  780. maximumPayloadSize: 100,
  781. methodDescriptorPromise: promise,
  782. skipStateMachineAssertions: true
  783. )
  784. try channel.pipeline.syncOperations.addHandler(handler)
  785. // Receive client's initial metadata
  786. let clientInitialMetadata: HPACKHeaders = [
  787. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  788. GRPCHTTP2Keys.scheme.rawValue: "http",
  789. GRPCHTTP2Keys.method.rawValue: "POST",
  790. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  791. GRPCHTTP2Keys.te.rawValue: "trailers",
  792. ]
  793. XCTAssertNoThrow(
  794. try channel.writeInbound(
  795. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  796. )
  797. )
  798. XCTAssertThrowsError(
  799. ofType: RPCError.self,
  800. try promise.futureResult.wait()
  801. ) { error in
  802. XCTAssertEqual(error.code, .unavailable)
  803. XCTAssertEqual(error.message, "RPC was rejected.")
  804. }
  805. }
  806. }
  807. extension EmbeddedChannel {
  808. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  809. guard
  810. case .headers(let writtenHeaders) = try XCTUnwrap(
  811. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  812. )
  813. else {
  814. throw TestError.assertionFailure("Expected to write headers")
  815. }
  816. return writtenHeaders
  817. }
  818. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  819. guard
  820. case .data(let writtenMessage) = try XCTUnwrap(
  821. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  822. )
  823. else {
  824. throw TestError.assertionFailure("Expected to write data")
  825. }
  826. return writtenMessage
  827. }
  828. }
  829. private enum TestError: Error {
  830. case assertionFailure(String)
  831. }