GRPCServerStreamHandlerTests.swift 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import XCTest
  22. @testable import GRPCHTTP2Core
  23. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. func testH2FramesAreIgnored() throws {
  26. let channel = EmbeddedChannel()
  27. let handler = GRPCServerStreamHandler(
  28. scheme: .http,
  29. acceptedEncodings: [],
  30. maximumPayloadSize: 1,
  31. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  32. )
  33. try channel.pipeline.syncOperations.addHandler(handler)
  34. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  35. .ping(.init(), ack: false),
  36. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  37. // TODO: add .priority(StreamPriorityData) - right now, StreamPriorityData's
  38. // initialiser is internal, so I can't create one of these frames.
  39. .rstStream(.cancel),
  40. .settings(.ack),
  41. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  42. .windowUpdate(windowSizeIncrement: 4),
  43. .alternativeService(origin: nil, field: nil),
  44. .origin([]),
  45. ]
  46. for toBeIgnored in framesToBeIgnored {
  47. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  48. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  49. }
  50. }
  51. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  52. let channel = EmbeddedChannel()
  53. let handler = GRPCServerStreamHandler(
  54. scheme: .http,
  55. acceptedEncodings: [],
  56. maximumPayloadSize: 1,
  57. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  58. )
  59. try channel.pipeline.syncOperations.addHandler(handler)
  60. // Receive client's initial metadata without content-type
  61. let clientInitialMetadata: HPACKHeaders = [
  62. GRPCHTTP2Keys.path.rawValue: "/test/test",
  63. GRPCHTTP2Keys.scheme.rawValue: "http",
  64. GRPCHTTP2Keys.method.rawValue: "POST",
  65. GRPCHTTP2Keys.te.rawValue: "trailers",
  66. ]
  67. XCTAssertNoThrow(
  68. try channel.writeInbound(
  69. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  70. )
  71. )
  72. // Make sure we have sent a trailers-only response
  73. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  74. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  75. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  76. }
  77. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  78. let channel = EmbeddedChannel()
  79. let handler = GRPCServerStreamHandler(
  80. scheme: .http,
  81. acceptedEncodings: [],
  82. maximumPayloadSize: 1,
  83. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  84. )
  85. try channel.pipeline.syncOperations.addHandler(handler)
  86. // Receive client's initial metadata without :method
  87. let clientInitialMetadata: HPACKHeaders = [
  88. GRPCHTTP2Keys.path.rawValue: "/test/test",
  89. GRPCHTTP2Keys.scheme.rawValue: "http",
  90. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  91. GRPCHTTP2Keys.te.rawValue: "trailers",
  92. ]
  93. XCTAssertNoThrow(
  94. try channel.writeInbound(
  95. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  96. )
  97. )
  98. // Make sure we have sent a trailers-only response
  99. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  100. XCTAssertEqual(
  101. writtenTrailersOnlyResponse.headers,
  102. [
  103. GRPCHTTP2Keys.status.rawValue: "200",
  104. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  105. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  106. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  107. ":method header is expected to be present and have a value of \"POST\".",
  108. ]
  109. )
  110. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  111. }
  112. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  113. let channel = EmbeddedChannel()
  114. let handler = GRPCServerStreamHandler(
  115. scheme: .http,
  116. acceptedEncodings: [],
  117. maximumPayloadSize: 1,
  118. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  119. )
  120. try channel.pipeline.syncOperations.addHandler(handler)
  121. // Receive client's initial metadata without :scheme
  122. let clientInitialMetadata: HPACKHeaders = [
  123. GRPCHTTP2Keys.path.rawValue: "/test/test",
  124. GRPCHTTP2Keys.method.rawValue: "POST",
  125. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  126. GRPCHTTP2Keys.te.rawValue: "trailers",
  127. ]
  128. XCTAssertNoThrow(
  129. try channel.writeInbound(
  130. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  131. )
  132. )
  133. // Make sure we have sent a trailers-only response
  134. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  135. XCTAssertEqual(
  136. writtenTrailersOnlyResponse.headers,
  137. [
  138. GRPCHTTP2Keys.status.rawValue: "200",
  139. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  140. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  141. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  142. ":scheme header must be present and one of \"http\" or \"https\".",
  143. ]
  144. )
  145. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  146. }
  147. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  148. let channel = EmbeddedChannel()
  149. let handler = GRPCServerStreamHandler(
  150. scheme: .http,
  151. acceptedEncodings: [],
  152. maximumPayloadSize: 1,
  153. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  154. )
  155. try channel.pipeline.syncOperations.addHandler(handler)
  156. // Receive client's initial metadata without :path
  157. let clientInitialMetadata: HPACKHeaders = [
  158. GRPCHTTP2Keys.scheme.rawValue: "http",
  159. GRPCHTTP2Keys.method.rawValue: "POST",
  160. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  161. GRPCHTTP2Keys.te.rawValue: "trailers",
  162. ]
  163. XCTAssertNoThrow(
  164. try channel.writeInbound(
  165. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  166. )
  167. )
  168. // Make sure we have sent a trailers-only response
  169. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  170. XCTAssertEqual(
  171. writtenTrailersOnlyResponse.headers,
  172. [
  173. GRPCHTTP2Keys.status.rawValue: "200",
  174. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  175. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  176. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  177. ]
  178. )
  179. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  180. }
  181. func testClientInitialMetadataWithoutTEResultsInRejectedRPC() throws {
  182. let channel = EmbeddedChannel()
  183. let handler = GRPCServerStreamHandler(
  184. scheme: .http,
  185. acceptedEncodings: [],
  186. maximumPayloadSize: 1,
  187. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  188. )
  189. try channel.pipeline.syncOperations.addHandler(handler)
  190. // Receive client's initial metadata without TE
  191. let clientInitialMetadata: HPACKHeaders = [
  192. GRPCHTTP2Keys.path.rawValue: "/test/test",
  193. GRPCHTTP2Keys.scheme.rawValue: "http",
  194. GRPCHTTP2Keys.method.rawValue: "POST",
  195. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  196. ]
  197. XCTAssertNoThrow(
  198. try channel.writeInbound(
  199. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  200. )
  201. )
  202. // Make sure we have sent a trailers-only response
  203. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  204. XCTAssertEqual(
  205. writtenTrailersOnlyResponse.headers,
  206. [
  207. GRPCHTTP2Keys.status.rawValue: "200",
  208. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  209. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  210. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  211. "\"te\" header is expected to be present and have a value of \"trailers\".",
  212. ]
  213. )
  214. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  215. }
  216. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  217. let channel = EmbeddedChannel()
  218. let handler = GRPCServerStreamHandler(
  219. scheme: .http,
  220. acceptedEncodings: [],
  221. maximumPayloadSize: 100,
  222. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  223. )
  224. try channel.pipeline.syncOperations.addHandler(handler)
  225. // Receive client's initial metadata
  226. let clientInitialMetadata: HPACKHeaders = [
  227. GRPCHTTP2Keys.path.rawValue: "/test/test",
  228. GRPCHTTP2Keys.scheme.rawValue: "http",
  229. GRPCHTTP2Keys.method.rawValue: "POST",
  230. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  231. GRPCHTTP2Keys.te.rawValue: "trailers",
  232. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  233. ]
  234. XCTAssertNoThrow(
  235. try channel.writeInbound(
  236. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  237. )
  238. )
  239. // Make sure we have sent a trailers-only response
  240. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  241. XCTAssertEqual(
  242. writtenTrailersOnlyResponse.headers,
  243. [
  244. GRPCHTTP2Keys.status.rawValue: "200",
  245. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  246. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  247. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  248. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  249. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  250. ]
  251. )
  252. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  253. }
  254. func testOverMaximumPayloadSize() throws {
  255. let channel = EmbeddedChannel()
  256. let handler = GRPCServerStreamHandler(
  257. scheme: .http,
  258. acceptedEncodings: [],
  259. maximumPayloadSize: 1,
  260. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  261. )
  262. try channel.pipeline.syncOperations.addHandler(handler)
  263. // Receive client's initial metadata
  264. let clientInitialMetadata: HPACKHeaders = [
  265. GRPCHTTP2Keys.path.rawValue: "/test/test",
  266. GRPCHTTP2Keys.scheme.rawValue: "http",
  267. GRPCHTTP2Keys.method.rawValue: "POST",
  268. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  269. GRPCHTTP2Keys.te.rawValue: "trailers",
  270. ]
  271. XCTAssertNoThrow(
  272. try channel.writeInbound(
  273. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  274. )
  275. )
  276. // Make sure we haven't sent back an error response, and that we read the initial metadata
  277. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  278. XCTAssertEqual(
  279. try channel.readInbound(as: RPCRequestPart.self),
  280. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  281. )
  282. // Write back server's initial metadata
  283. let headers: HPACKHeaders = [
  284. "some-custom-header": "some-custom-value"
  285. ]
  286. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  287. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  288. // Make sure we wrote back the initial metadata
  289. let writtenHeaders = try channel.assertReadHeadersOutbound()
  290. XCTAssertEqual(
  291. writtenHeaders.headers,
  292. [
  293. GRPCHTTP2Keys.status.rawValue: "200",
  294. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  295. "some-custom-header": "some-custom-value",
  296. ]
  297. )
  298. // Receive client's message
  299. var buffer = ByteBuffer()
  300. buffer.writeInteger(UInt8(0)) // not compressed
  301. buffer.writeInteger(UInt32(42)) // message length
  302. buffer.writeRepeatingByte(0, count: 42) // message
  303. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  304. XCTAssertThrowsError(
  305. ofType: RPCError.self,
  306. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  307. ) { error in
  308. XCTAssertEqual(error.code, .resourceExhausted)
  309. XCTAssertEqual(
  310. error.message,
  311. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  312. )
  313. }
  314. // Make sure we haven't sent a response back and that we didn't read the received message
  315. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  316. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  317. }
  318. func testClientEndsStream() throws {
  319. let channel = EmbeddedChannel()
  320. let handler = GRPCServerStreamHandler(
  321. scheme: .http,
  322. acceptedEncodings: [],
  323. maximumPayloadSize: 1,
  324. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  325. skipStateMachineAssertions: true
  326. )
  327. try channel.pipeline.syncOperations.addHandler(handler)
  328. // Receive client's initial metadata with end stream set
  329. let clientInitialMetadata: HPACKHeaders = [
  330. GRPCHTTP2Keys.path.rawValue: "/test/test",
  331. GRPCHTTP2Keys.scheme.rawValue: "http",
  332. GRPCHTTP2Keys.method.rawValue: "POST",
  333. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  334. GRPCHTTP2Keys.te.rawValue: "trailers",
  335. ]
  336. XCTAssertNoThrow(
  337. try channel.writeInbound(
  338. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  339. )
  340. )
  341. // Make sure we haven't sent back an error response, and that we read the initial metadata
  342. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  343. XCTAssertEqual(
  344. try channel.readInbound(as: RPCRequestPart.self),
  345. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  346. )
  347. // Write back server's initial metadata
  348. let headers: HPACKHeaders = [
  349. "some-custom-header": "some-custom-value"
  350. ]
  351. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  352. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  353. // Make sure we wrote back the initial metadata
  354. let writtenHeaders = try channel.assertReadHeadersOutbound()
  355. XCTAssertEqual(
  356. writtenHeaders.headers,
  357. [
  358. GRPCHTTP2Keys.status.rawValue: "200",
  359. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  360. "some-custom-header": "some-custom-value",
  361. ]
  362. )
  363. // We should throw if the client sends another message, since it's closed the stream already.
  364. var buffer = ByteBuffer()
  365. buffer.writeInteger(UInt8(0)) // not compressed
  366. buffer.writeInteger(UInt32(42)) // message length
  367. buffer.writeRepeatingByte(0, count: 42) // message
  368. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  369. XCTAssertThrowsError(
  370. ofType: RPCError.self,
  371. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  372. ) { error in
  373. XCTAssertEqual(error.code, .internalError)
  374. XCTAssertEqual(error.message, "Client can't send a message if closed.")
  375. }
  376. }
  377. func testNormalFlow() throws {
  378. let channel = EmbeddedChannel()
  379. let handler = GRPCServerStreamHandler(
  380. scheme: .http,
  381. acceptedEncodings: [],
  382. maximumPayloadSize: 42,
  383. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self),
  384. skipStateMachineAssertions: true
  385. )
  386. try channel.pipeline.syncOperations.addHandler(handler)
  387. // Receive client's initial metadata
  388. let clientInitialMetadata: HPACKHeaders = [
  389. GRPCHTTP2Keys.path.rawValue: "/test/test",
  390. GRPCHTTP2Keys.scheme.rawValue: "http",
  391. GRPCHTTP2Keys.method.rawValue: "POST",
  392. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  393. GRPCHTTP2Keys.te.rawValue: "trailers",
  394. ]
  395. XCTAssertNoThrow(
  396. try channel.writeInbound(
  397. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  398. )
  399. )
  400. // Make sure we haven't sent back an error response, and that we read the initial metadata
  401. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  402. XCTAssertEqual(
  403. try channel.readInbound(as: RPCRequestPart.self),
  404. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  405. )
  406. // Write back server's initial metadata
  407. let headers: HPACKHeaders = [
  408. "some-custom-header": "some-custom-value"
  409. ]
  410. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  411. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  412. // Make sure we wrote back the initial metadata
  413. let writtenHeaders = try channel.assertReadHeadersOutbound()
  414. XCTAssertEqual(
  415. writtenHeaders.headers,
  416. [
  417. GRPCHTTP2Keys.status.rawValue: "200",
  418. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  419. "some-custom-header": "some-custom-value",
  420. ]
  421. )
  422. // Receive client's message
  423. var buffer = ByteBuffer()
  424. buffer.writeInteger(UInt8(0)) // not compressed
  425. buffer.writeInteger(UInt32(42)) // message length
  426. buffer.writeRepeatingByte(0, count: 42) // message
  427. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  428. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  429. // Make sure we haven't sent back an error response, and that we read the message properly
  430. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  431. XCTAssertEqual(
  432. try channel.readInbound(as: RPCRequestPart.self),
  433. RPCRequestPart.message([UInt8](repeating: 0, count: 42))
  434. )
  435. // Write back response
  436. let serverDataPayload = RPCResponsePart.message([UInt8](repeating: 1, count: 42))
  437. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  438. // Make sure we wrote back the right message
  439. let writtenMessage = try channel.assertReadDataOutbound()
  440. var expectedBuffer = ByteBuffer()
  441. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  442. expectedBuffer.writeInteger(UInt32(42)) // message length
  443. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  444. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  445. // Send back status to end RPC
  446. let trailers = RPCResponsePart.status(
  447. .init(code: .dataLoss, message: "Test data loss"),
  448. ["custom-header": "custom-value"]
  449. )
  450. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  451. // Make sure we wrote back the status and trailers
  452. let writtenStatus = try channel.assertReadHeadersOutbound()
  453. XCTAssertTrue(writtenStatus.endStream)
  454. XCTAssertEqual(
  455. writtenStatus.headers,
  456. [
  457. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  458. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  459. "custom-header": "custom-value",
  460. ]
  461. )
  462. // Try writing and assert it throws to make sure we don't allow writes
  463. // after closing.
  464. XCTAssertThrowsError(
  465. ofType: RPCError.self,
  466. try channel.writeOutbound(trailers)
  467. ) { error in
  468. XCTAssertEqual(error.code, .internalError)
  469. XCTAssertEqual(error.message, "Server can't send anything if closed.")
  470. }
  471. }
  472. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  473. let channel = EmbeddedChannel()
  474. let handler = GRPCServerStreamHandler(
  475. scheme: .http,
  476. acceptedEncodings: [],
  477. maximumPayloadSize: 100,
  478. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  479. )
  480. try channel.pipeline.syncOperations.addHandler(handler)
  481. // Receive client's initial metadata
  482. let clientInitialMetadata: HPACKHeaders = [
  483. GRPCHTTP2Keys.path.rawValue: "/test/test",
  484. GRPCHTTP2Keys.scheme.rawValue: "http",
  485. GRPCHTTP2Keys.method.rawValue: "POST",
  486. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  487. GRPCHTTP2Keys.te.rawValue: "trailers",
  488. ]
  489. XCTAssertNoThrow(
  490. try channel.writeInbound(
  491. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  492. )
  493. )
  494. // Make sure we haven't sent back an error response, and that we read the initial metadata
  495. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  496. XCTAssertEqual(
  497. try channel.readInbound(as: RPCRequestPart.self),
  498. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  499. )
  500. // Write back server's initial metadata
  501. let headers: HPACKHeaders = [
  502. "some-custom-header": "some-custom-value"
  503. ]
  504. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  505. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  506. // Make sure we wrote back the initial metadata
  507. let writtenHeaders = try channel.assertReadHeadersOutbound()
  508. XCTAssertEqual(
  509. writtenHeaders.headers,
  510. [
  511. GRPCHTTP2Keys.status.rawValue: "200",
  512. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  513. "some-custom-header": "some-custom-value",
  514. ]
  515. )
  516. // Receive client's first message
  517. var buffer = ByteBuffer()
  518. buffer.writeInteger(UInt8(0)) // not compressed
  519. XCTAssertNoThrow(
  520. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  521. )
  522. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  523. buffer.clear()
  524. buffer.writeInteger(UInt32(30)) // message length
  525. XCTAssertNoThrow(
  526. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  527. )
  528. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  529. buffer.clear()
  530. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  531. XCTAssertNoThrow(
  532. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  533. )
  534. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  535. buffer.clear()
  536. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  537. XCTAssertNoThrow(
  538. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  539. )
  540. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  541. buffer.clear()
  542. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  543. XCTAssertNoThrow(
  544. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  545. )
  546. // Make sure we haven't sent back an error response, and that we read the message properly
  547. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  548. XCTAssertEqual(
  549. try channel.readInbound(as: RPCRequestPart.self),
  550. RPCRequestPart.message(
  551. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  552. + [UInt8](repeating: 2, count: 10)
  553. )
  554. )
  555. }
  556. func testSendMultipleMessagesInSingleBuffer() throws {
  557. let channel = EmbeddedChannel()
  558. let handler = GRPCServerStreamHandler(
  559. scheme: .http,
  560. acceptedEncodings: [],
  561. maximumPayloadSize: 100,
  562. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  563. )
  564. try channel.pipeline.syncOperations.addHandler(handler)
  565. // Receive client's initial metadata
  566. let clientInitialMetadata: HPACKHeaders = [
  567. GRPCHTTP2Keys.path.rawValue: "/test/test",
  568. GRPCHTTP2Keys.scheme.rawValue: "http",
  569. GRPCHTTP2Keys.method.rawValue: "POST",
  570. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  571. GRPCHTTP2Keys.te.rawValue: "trailers",
  572. ]
  573. XCTAssertNoThrow(
  574. try channel.writeInbound(
  575. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  576. )
  577. )
  578. // Make sure we haven't sent back an error response, and that we read the initial metadata
  579. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  580. XCTAssertEqual(
  581. try channel.readInbound(as: RPCRequestPart.self),
  582. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  583. )
  584. // Write back server's initial metadata
  585. let headers: HPACKHeaders = [
  586. "some-custom-header": "some-custom-value"
  587. ]
  588. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: headers))
  589. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  590. // Read out the metadata
  591. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  592. // This is where this test actually begins. We want to write two messages
  593. // without flushing, and make sure that no messages are sent down the pipeline
  594. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  595. // Write back first message and make sure nothing's written in the channel.
  596. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  597. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  598. // Write back second message and make sure nothing's written in the channel.
  599. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 2, count: 4))))
  600. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  601. // Now flush and check we *do* write the data.
  602. channel.flush()
  603. let writtenMessage = try channel.assertReadDataOutbound()
  604. // Make sure both messages have been framed together in the ByteBuffer.
  605. XCTAssertEqual(
  606. writtenMessage.data,
  607. .byteBuffer(
  608. .init(bytes: [
  609. // First message
  610. 0, // Compression disabled
  611. 0, 0, 0, 4, // Message length
  612. 1, 1, 1, 1, // First message data
  613. // Second message
  614. 0, // Compression disabled
  615. 0, 0, 0, 4, // Message length
  616. 2, 2, 2, 2, // Second message data
  617. ])
  618. )
  619. )
  620. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  621. }
  622. func testMessageAndStatusAreNotReordered() throws {
  623. let channel = EmbeddedChannel()
  624. let handler = GRPCServerStreamHandler(
  625. scheme: .http,
  626. acceptedEncodings: [],
  627. maximumPayloadSize: 100,
  628. methodDescriptorPromise: channel.eventLoop.makePromise(of: MethodDescriptor.self)
  629. )
  630. try channel.pipeline.syncOperations.addHandler(handler)
  631. // Receive client's initial metadata
  632. let clientInitialMetadata: HPACKHeaders = [
  633. GRPCHTTP2Keys.path.rawValue: "/test/test",
  634. GRPCHTTP2Keys.scheme.rawValue: "http",
  635. GRPCHTTP2Keys.method.rawValue: "POST",
  636. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  637. GRPCHTTP2Keys.te.rawValue: "trailers",
  638. ]
  639. XCTAssertNoThrow(
  640. try channel.writeInbound(
  641. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  642. )
  643. )
  644. // Make sure we haven't sent back an error response, and that we read the initial metadata
  645. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  646. XCTAssertEqual(
  647. try channel.readInbound(as: RPCRequestPart.self),
  648. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  649. )
  650. // Write back server's initial metadata
  651. let serverInitialMetadata = RPCResponsePart.metadata(Metadata(headers: [:]))
  652. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  653. // Read out the metadata
  654. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  655. // This is where this test actually begins. We want to write a message followed
  656. // by status and trailers, and only flush after both writes.
  657. // Because messages are buffered and potentially bundled together in a single
  658. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  659. // and trailers won't be written before the messages.
  660. // Write back message and make sure nothing's written in the channel.
  661. XCTAssertNoThrow(channel.write(RPCResponsePart.message([UInt8](repeating: 1, count: 4))))
  662. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  663. // Write status + metadata and make sure nothing's written.
  664. XCTAssertNoThrow(channel.write(RPCResponsePart.status(.init(code: .ok, message: ""), [:])))
  665. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  666. // Now flush and check we *do* write the data in the right order: message first,
  667. // trailers second.
  668. channel.flush()
  669. let writtenMessage = try channel.assertReadDataOutbound()
  670. // Make sure we first get message.
  671. XCTAssertEqual(
  672. writtenMessage.data,
  673. .byteBuffer(
  674. .init(bytes: [
  675. // First message
  676. 0, // Compression disabled
  677. 0, 0, 0, 4, // Message length
  678. 1, 1, 1, 1, // First message data
  679. ])
  680. )
  681. )
  682. XCTAssertFalse(writtenMessage.endStream)
  683. // Make sure we get trailers.
  684. let writtenTrailers = try channel.assertReadHeadersOutbound()
  685. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  686. XCTAssertTrue(writtenTrailers.endStream)
  687. // Make sure we get nothing else.
  688. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  689. }
  690. func testMethodDescriptorPromiseSucceeds() throws {
  691. let channel = EmbeddedChannel()
  692. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  693. let handler = GRPCServerStreamHandler(
  694. scheme: .http,
  695. acceptedEncodings: [],
  696. maximumPayloadSize: 100,
  697. methodDescriptorPromise: promise,
  698. skipStateMachineAssertions: true
  699. )
  700. try channel.pipeline.syncOperations.addHandler(handler)
  701. // Receive client's initial metadata
  702. let clientInitialMetadata: HPACKHeaders = [
  703. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  704. GRPCHTTP2Keys.scheme.rawValue: "http",
  705. GRPCHTTP2Keys.method.rawValue: "POST",
  706. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  707. GRPCHTTP2Keys.te.rawValue: "trailers",
  708. ]
  709. XCTAssertNoThrow(
  710. try channel.writeInbound(
  711. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  712. )
  713. )
  714. // Make sure we haven't sent back an error response, and that we read the initial metadata
  715. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  716. XCTAssertEqual(
  717. try channel.readInbound(as: RPCRequestPart.self),
  718. RPCRequestPart.metadata(Metadata(headers: clientInitialMetadata))
  719. )
  720. XCTAssertEqual(
  721. try promise.futureResult.wait(),
  722. MethodDescriptor(service: "SomeService", method: "SomeMethod")
  723. )
  724. }
  725. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  726. let channel = EmbeddedChannel()
  727. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  728. let handler = GRPCServerStreamHandler(
  729. scheme: .http,
  730. acceptedEncodings: [],
  731. maximumPayloadSize: 100,
  732. methodDescriptorPromise: promise,
  733. skipStateMachineAssertions: true
  734. )
  735. try channel.pipeline.syncOperations.addHandler(handler)
  736. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  737. XCTAssertThrowsError(
  738. ofType: RPCError.self,
  739. try promise.futureResult.wait()
  740. ) { error in
  741. XCTAssertEqual(error.code, .unavailable)
  742. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  743. }
  744. }
  745. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  746. let channel = EmbeddedChannel()
  747. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  748. let handler = GRPCServerStreamHandler(
  749. scheme: .http,
  750. acceptedEncodings: [],
  751. maximumPayloadSize: 100,
  752. methodDescriptorPromise: promise,
  753. skipStateMachineAssertions: true
  754. )
  755. try channel.pipeline.syncOperations.addHandler(handler)
  756. // Receive client's initial metadata
  757. let clientInitialMetadata: HPACKHeaders = [
  758. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  759. GRPCHTTP2Keys.scheme.rawValue: "http",
  760. GRPCHTTP2Keys.method.rawValue: "POST",
  761. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  762. GRPCHTTP2Keys.te.rawValue: "trailers",
  763. ]
  764. XCTAssertNoThrow(
  765. try channel.writeInbound(
  766. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  767. )
  768. )
  769. XCTAssertThrowsError(
  770. ofType: RPCError.self,
  771. try promise.futureResult.wait()
  772. ) { error in
  773. XCTAssertEqual(error.code, .unavailable)
  774. XCTAssertEqual(error.message, "RPC was rejected.")
  775. }
  776. }
  777. }
  778. extension EmbeddedChannel {
  779. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  780. guard
  781. case .headers(let writtenHeaders) = try XCTUnwrap(
  782. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  783. )
  784. else {
  785. throw TestError.assertionFailure("Expected to write headers")
  786. }
  787. return writtenHeaders
  788. }
  789. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  790. guard
  791. case .data(let writtenMessage) = try XCTUnwrap(
  792. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  793. )
  794. else {
  795. throw TestError.assertionFailure("Expected to write data")
  796. }
  797. return writtenMessage
  798. }
  799. }
  800. private enum TestError: Error {
  801. case assertionFailure(String)
  802. }