GRPCClientStreamHandlerTests.swift 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import XCTest
  23. @testable import GRPCHTTP2Core
  24. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  25. final class GRPCClientStreamHandlerTests: XCTestCase {
  26. func testH2FramesAreIgnored() throws {
  27. let handler = GRPCClientStreamHandler(
  28. methodDescriptor: .init(service: "test", method: "test"),
  29. scheme: .http,
  30. outboundEncoding: .none,
  31. acceptedEncodings: [],
  32. maximumPayloadSize: 1
  33. )
  34. let channel = EmbeddedChannel(handler: handler)
  35. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  36. .ping(.init(), ack: false),
  37. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  38. // TODO: uncomment when it's possible to build a `StreamPriorityData`.
  39. // .priority(
  40. // HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  41. // ),
  42. .settings(.ack),
  43. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  44. .windowUpdate(windowSizeIncrement: 4),
  45. .alternativeService(origin: nil, field: nil),
  46. .origin([]),
  47. ]
  48. for toBeIgnored in framesToBeIgnored {
  49. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  50. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  51. }
  52. }
  53. func testServerInitialMetadataMissingHTTPStatusCodeResultsInFinishedRPC() throws {
  54. let handler = GRPCClientStreamHandler(
  55. methodDescriptor: .init(service: "test", method: "test"),
  56. scheme: .http,
  57. outboundEncoding: .none,
  58. acceptedEncodings: [],
  59. maximumPayloadSize: 1,
  60. skipStateMachineAssertions: true
  61. )
  62. let channel = EmbeddedChannel(handler: handler)
  63. // Send client's initial metadata
  64. let request = RPCRequestPart.metadata([:])
  65. XCTAssertNoThrow(try channel.writeOutbound(request))
  66. // Receive server's initial metadata without :status
  67. let serverInitialMetadata: HPACKHeaders = [
  68. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue
  69. ]
  70. XCTAssertNoThrow(
  71. try channel.writeInbound(
  72. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  73. )
  74. )
  75. XCTAssertEqual(
  76. try channel.readInbound(as: RPCResponsePart.self),
  77. .status(
  78. .init(code: .unknown, message: "HTTP Status Code is missing."),
  79. Metadata(headers: serverInitialMetadata)
  80. )
  81. )
  82. }
  83. func testServerInitialMetadata1xxHTTPStatusCodeResultsInNothingRead() throws {
  84. let handler = GRPCClientStreamHandler(
  85. methodDescriptor: .init(service: "test", method: "test"),
  86. scheme: .http,
  87. outboundEncoding: .none,
  88. acceptedEncodings: [],
  89. maximumPayloadSize: 1,
  90. skipStateMachineAssertions: true
  91. )
  92. let channel = EmbeddedChannel(handler: handler)
  93. // Send client's initial metadata
  94. let request = RPCRequestPart.metadata([:])
  95. XCTAssertNoThrow(try channel.writeOutbound(request))
  96. // Receive server's initial metadata with 1xx status
  97. let serverInitialMetadata: HPACKHeaders = [
  98. GRPCHTTP2Keys.status.rawValue: "104",
  99. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  100. ]
  101. XCTAssertNoThrow(
  102. try channel.writeInbound(
  103. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  104. )
  105. )
  106. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  107. }
  108. func testServerInitialMetadataOtherNon200HTTPStatusCodeResultsInFinishedRPC() throws {
  109. let handler = GRPCClientStreamHandler(
  110. methodDescriptor: .init(service: "test", method: "test"),
  111. scheme: .http,
  112. outboundEncoding: .none,
  113. acceptedEncodings: [],
  114. maximumPayloadSize: 1,
  115. skipStateMachineAssertions: true
  116. )
  117. let channel = EmbeddedChannel(handler: handler)
  118. // Send client's initial metadata
  119. let request = RPCRequestPart.metadata([:])
  120. XCTAssertNoThrow(try channel.writeOutbound(request))
  121. // Receive server's initial metadata with non-200 and non-1xx :status
  122. let serverInitialMetadata: HPACKHeaders = [
  123. GRPCHTTP2Keys.status.rawValue: String(HTTPResponseStatus.tooManyRequests.code),
  124. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  125. ]
  126. XCTAssertNoThrow(
  127. try channel.writeInbound(
  128. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  129. )
  130. )
  131. XCTAssertEqual(
  132. try channel.readInbound(as: RPCResponsePart.self),
  133. .status(
  134. .init(code: .unavailable, message: "Unexpected non-200 HTTP Status Code."),
  135. Metadata(headers: serverInitialMetadata)
  136. )
  137. )
  138. }
  139. func testServerInitialMetadataMissingContentTypeResultsInFinishedRPC() throws {
  140. let handler = GRPCClientStreamHandler(
  141. methodDescriptor: .init(service: "test", method: "test"),
  142. scheme: .http,
  143. outboundEncoding: .none,
  144. acceptedEncodings: [],
  145. maximumPayloadSize: 1,
  146. skipStateMachineAssertions: true
  147. )
  148. let channel = EmbeddedChannel(handler: handler)
  149. // Send client's initial metadata
  150. let request = RPCRequestPart.metadata([:])
  151. XCTAssertNoThrow(try channel.writeOutbound(request))
  152. // Receive server's initial metadata without content-type
  153. let serverInitialMetadata: HPACKHeaders = [
  154. GRPCHTTP2Keys.status.rawValue: "200"
  155. ]
  156. XCTAssertNoThrow(
  157. try channel.writeInbound(
  158. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  159. )
  160. )
  161. XCTAssertEqual(
  162. try channel.readInbound(as: RPCResponsePart.self),
  163. .status(
  164. .init(code: .internalError, message: "Missing content-type header"),
  165. Metadata(headers: serverInitialMetadata)
  166. )
  167. )
  168. }
  169. func testNotAcceptedEncodingResultsInFinishedRPC() throws {
  170. let handler = GRPCClientStreamHandler(
  171. methodDescriptor: .init(service: "test", method: "test"),
  172. scheme: .http,
  173. outboundEncoding: .deflate,
  174. acceptedEncodings: [.deflate],
  175. maximumPayloadSize: 1
  176. )
  177. let channel = EmbeddedChannel(handler: handler)
  178. // Send client's initial metadata
  179. XCTAssertNoThrow(
  180. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  181. )
  182. // Make sure we have sent right metadata.
  183. let writtenMetadata = try channel.assertReadHeadersOutbound()
  184. XCTAssertEqual(
  185. writtenMetadata.headers,
  186. [
  187. GRPCHTTP2Keys.method.rawValue: "POST",
  188. GRPCHTTP2Keys.scheme.rawValue: "http",
  189. GRPCHTTP2Keys.path.rawValue: "/test/test",
  190. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  191. GRPCHTTP2Keys.te.rawValue: "trailers",
  192. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  193. GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
  194. ]
  195. )
  196. // Server sends initial metadata with unsupported encoding
  197. let serverInitialMetadata: HPACKHeaders = [
  198. GRPCHTTP2Keys.status.rawValue: "200",
  199. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  200. GRPCHTTP2Keys.encoding.rawValue: "gzip",
  201. ]
  202. XCTAssertNoThrow(
  203. try channel.writeInbound(
  204. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  205. )
  206. )
  207. XCTAssertEqual(
  208. try channel.readInbound(as: RPCResponsePart.self),
  209. .status(
  210. .init(
  211. code: .internalError,
  212. message:
  213. "The server picked a compression algorithm ('gzip') the client does not know about."
  214. ),
  215. Metadata(headers: serverInitialMetadata)
  216. )
  217. )
  218. }
  219. func testOverMaximumPayloadSize() throws {
  220. let handler = GRPCClientStreamHandler(
  221. methodDescriptor: .init(service: "test", method: "test"),
  222. scheme: .http,
  223. outboundEncoding: .none,
  224. acceptedEncodings: [],
  225. maximumPayloadSize: 1,
  226. skipStateMachineAssertions: true
  227. )
  228. let channel = EmbeddedChannel(handler: handler)
  229. // Send client's initial metadata
  230. XCTAssertNoThrow(
  231. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  232. )
  233. // Make sure we have sent right metadata.
  234. let writtenMetadata = try channel.assertReadHeadersOutbound()
  235. XCTAssertEqual(
  236. writtenMetadata.headers,
  237. [
  238. GRPCHTTP2Keys.method.rawValue: "POST",
  239. GRPCHTTP2Keys.scheme.rawValue: "http",
  240. GRPCHTTP2Keys.path.rawValue: "/test/test",
  241. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  242. GRPCHTTP2Keys.te.rawValue: "trailers",
  243. ]
  244. )
  245. // Server sends initial metadata
  246. let serverInitialMetadata: HPACKHeaders = [
  247. GRPCHTTP2Keys.status.rawValue: "200",
  248. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  249. ]
  250. XCTAssertNoThrow(
  251. try channel.writeInbound(
  252. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  253. )
  254. )
  255. XCTAssertEqual(
  256. try channel.readInbound(as: RPCResponsePart.self),
  257. .metadata(Metadata(headers: serverInitialMetadata))
  258. )
  259. // Server sends message over payload limit
  260. var buffer = ByteBuffer()
  261. buffer.writeInteger(UInt8(0)) // not compressed
  262. buffer.writeInteger(UInt32(42)) // message length
  263. buffer.writeRepeatingByte(0, count: 42) // message
  264. let clientDataPayload = HTTP2Frame.FramePayload.Data(
  265. data: .byteBuffer(buffer),
  266. endStream: false
  267. )
  268. XCTAssertThrowsError(
  269. ofType: RPCError.self,
  270. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  271. ) { error in
  272. XCTAssertEqual(error.code, .resourceExhausted)
  273. XCTAssertEqual(
  274. error.message,
  275. "Message has exceeded the configured maximum payload size (max: 1, actual: 42)"
  276. )
  277. }
  278. // Make sure we didn't read the received message
  279. XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
  280. }
  281. func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
  282. let handler = GRPCClientStreamHandler(
  283. methodDescriptor: .init(service: "test", method: "test"),
  284. scheme: .http,
  285. outboundEncoding: .none,
  286. acceptedEncodings: [],
  287. maximumPayloadSize: 100,
  288. skipStateMachineAssertions: true
  289. )
  290. let channel = EmbeddedChannel(handler: handler)
  291. // Send client's initial metadata
  292. XCTAssertNoThrow(
  293. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  294. )
  295. // Make sure we have sent right metadata.
  296. let writtenMetadata = try channel.assertReadHeadersOutbound()
  297. XCTAssertEqual(
  298. writtenMetadata.headers,
  299. [
  300. GRPCHTTP2Keys.method.rawValue: "POST",
  301. GRPCHTTP2Keys.scheme.rawValue: "http",
  302. GRPCHTTP2Keys.path.rawValue: "/test/test",
  303. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  304. GRPCHTTP2Keys.te.rawValue: "trailers",
  305. ]
  306. )
  307. // Server sends initial metadata
  308. let serverInitialMetadata: HPACKHeaders = [
  309. GRPCHTTP2Keys.status.rawValue: "200",
  310. GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
  311. ]
  312. XCTAssertNoThrow(
  313. try channel.writeInbound(
  314. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  315. )
  316. )
  317. XCTAssertEqual(
  318. try channel.readInbound(as: RPCResponsePart.self),
  319. .metadata(Metadata(headers: serverInitialMetadata))
  320. )
  321. // Server sends message with EOS set.
  322. var buffer = ByteBuffer()
  323. buffer.writeInteger(UInt8(0)) // not compressed
  324. buffer.writeInteger(UInt32(42)) // message length
  325. buffer.writeRepeatingByte(0, count: 42) // message
  326. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  327. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  328. // Make sure we got status + trailers with the right error.
  329. XCTAssertEqual(
  330. try channel.readInbound(as: RPCResponsePart.self),
  331. .status(
  332. Status(
  333. code: .internalError,
  334. message:
  335. "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
  336. ),
  337. [:]
  338. )
  339. )
  340. }
  341. func testServerEndsStream() throws {
  342. let handler = GRPCClientStreamHandler(
  343. methodDescriptor: .init(service: "test", method: "test"),
  344. scheme: .http,
  345. outboundEncoding: .none,
  346. acceptedEncodings: [],
  347. maximumPayloadSize: 1,
  348. skipStateMachineAssertions: true
  349. )
  350. let channel = EmbeddedChannel(handler: handler)
  351. // Write client's initial metadata
  352. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  353. let clientInitialMetadata: HPACKHeaders = [
  354. GRPCHTTP2Keys.path.rawValue: "/test/test",
  355. GRPCHTTP2Keys.scheme.rawValue: "http",
  356. GRPCHTTP2Keys.method.rawValue: "POST",
  357. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  358. GRPCHTTP2Keys.te.rawValue: "trailers",
  359. ]
  360. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  361. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  362. // Receive server's initial metadata with end stream set
  363. let serverInitialMetadata: HPACKHeaders = [
  364. GRPCHTTP2Keys.status.rawValue: "200",
  365. GRPCHTTP2Keys.grpcStatus.rawValue: "0",
  366. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  367. ]
  368. XCTAssertNoThrow(
  369. try channel.writeInbound(
  370. HTTP2Frame.FramePayload.headers(
  371. .init(
  372. headers: serverInitialMetadata,
  373. endStream: true
  374. )
  375. )
  376. )
  377. )
  378. XCTAssertEqual(
  379. try channel.readInbound(as: RPCResponsePart.self),
  380. .status(
  381. .init(code: .ok, message: ""),
  382. [
  383. GRPCHTTP2Keys.status.rawValue: "200",
  384. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  385. ]
  386. )
  387. )
  388. // We should throw if the server sends another message, since it's closed the stream already.
  389. var buffer = ByteBuffer()
  390. buffer.writeInteger(UInt8(0)) // not compressed
  391. buffer.writeInteger(UInt32(42)) // message length
  392. buffer.writeRepeatingByte(0, count: 42) // message
  393. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  394. XCTAssertThrowsError(
  395. ofType: RPCError.self,
  396. try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload))
  397. ) { error in
  398. XCTAssertEqual(error.code, .internalError)
  399. XCTAssertEqual(error.message, "Cannot have received anything from a closed server.")
  400. }
  401. }
  402. func testNormalFlow() throws {
  403. let handler = GRPCClientStreamHandler(
  404. methodDescriptor: .init(service: "test", method: "test"),
  405. scheme: .http,
  406. outboundEncoding: .none,
  407. acceptedEncodings: [],
  408. maximumPayloadSize: 100,
  409. skipStateMachineAssertions: true
  410. )
  411. let channel = EmbeddedChannel(handler: handler)
  412. // Send client's initial metadata
  413. let request = RPCRequestPart.metadata([:])
  414. XCTAssertNoThrow(try channel.writeOutbound(request))
  415. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  416. let writtenHeaders = try channel.assertReadHeadersOutbound()
  417. XCTAssertEqual(
  418. writtenHeaders.headers,
  419. [
  420. GRPCHTTP2Keys.method.rawValue: "POST",
  421. GRPCHTTP2Keys.scheme.rawValue: "http",
  422. GRPCHTTP2Keys.path.rawValue: "/test/test",
  423. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  424. GRPCHTTP2Keys.te.rawValue: "trailers",
  425. ]
  426. )
  427. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  428. // Receive server's initial metadata
  429. let serverInitialMetadata: HPACKHeaders = [
  430. GRPCHTTP2Keys.status.rawValue: "200",
  431. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  432. "some-custom-header": "some-custom-value",
  433. ]
  434. XCTAssertNoThrow(
  435. try channel.writeInbound(
  436. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  437. )
  438. )
  439. XCTAssertEqual(
  440. try channel.readInbound(as: RPCResponsePart.self),
  441. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  442. )
  443. // Send a message
  444. XCTAssertNoThrow(
  445. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  446. )
  447. // Assert we wrote it successfully into the channel
  448. let writtenMessage = try channel.assertReadDataOutbound()
  449. var expectedBuffer = ByteBuffer()
  450. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  451. expectedBuffer.writeInteger(UInt32(42)) // message length
  452. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  453. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  454. // Half-close the outbound end: this would be triggered by finishing the client's writer.
  455. XCTAssertNoThrow(channel.close(mode: .output, promise: nil))
  456. // Flush to make sure the EOS is written.
  457. channel.flush()
  458. // Make sure the EOS frame was sent
  459. let emptyEOSFrame = try channel.assertReadDataOutbound()
  460. XCTAssertEqual(emptyEOSFrame.data, .byteBuffer(.init()))
  461. XCTAssertTrue(emptyEOSFrame.endStream)
  462. // Make sure we cannot write anymore because client's closed.
  463. XCTAssertThrowsError(
  464. ofType: RPCError.self,
  465. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  466. ) { error in
  467. XCTAssertEqual(error.code, .internalError)
  468. XCTAssertEqual(error.message, "Client is closed, cannot send a message.")
  469. }
  470. // This is needed to clear the EmbeddedChannel's stored error, otherwise
  471. // it will be thrown when writing inbound.
  472. try? channel.throwIfErrorCaught()
  473. // Server sends back response message
  474. var buffer = ByteBuffer()
  475. buffer.writeInteger(UInt8(0)) // not compressed
  476. buffer.writeInteger(UInt32(42)) // message length
  477. buffer.writeRepeatingByte(0, count: 42) // message
  478. let serverDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer))
  479. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(serverDataPayload)))
  480. // Make sure we read the message properly
  481. XCTAssertEqual(
  482. try channel.readInbound(as: RPCResponsePart.self),
  483. RPCResponsePart.message([UInt8](repeating: 0, count: 42))
  484. )
  485. // Server sends status to end RPC
  486. XCTAssertNoThrow(
  487. try channel.writeInbound(
  488. HTTP2Frame.FramePayload.headers(
  489. .init(headers: [
  490. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  491. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  492. "custom-header": "custom-value",
  493. ])
  494. )
  495. )
  496. )
  497. XCTAssertEqual(
  498. try channel.readInbound(as: RPCResponsePart.self),
  499. .status(.init(code: .dataLoss, message: "Test data loss"), ["custom-header": "custom-value"])
  500. )
  501. }
  502. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  503. let handler = GRPCClientStreamHandler(
  504. methodDescriptor: .init(service: "test", method: "test"),
  505. scheme: .http,
  506. outboundEncoding: .none,
  507. acceptedEncodings: [],
  508. maximumPayloadSize: 100,
  509. skipStateMachineAssertions: true
  510. )
  511. let channel = EmbeddedChannel(handler: handler)
  512. // Send client's initial metadata
  513. let request = RPCRequestPart.metadata([:])
  514. XCTAssertNoThrow(try channel.writeOutbound(request))
  515. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  516. let writtenHeaders = try channel.assertReadHeadersOutbound()
  517. XCTAssertEqual(
  518. writtenHeaders.headers,
  519. [
  520. GRPCHTTP2Keys.method.rawValue: "POST",
  521. GRPCHTTP2Keys.scheme.rawValue: "http",
  522. GRPCHTTP2Keys.path.rawValue: "/test/test",
  523. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  524. GRPCHTTP2Keys.te.rawValue: "trailers",
  525. ]
  526. )
  527. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  528. // Receive server's initial metadata
  529. let serverInitialMetadata: HPACKHeaders = [
  530. GRPCHTTP2Keys.status.rawValue: "200",
  531. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  532. "some-custom-header": "some-custom-value",
  533. ]
  534. XCTAssertNoThrow(
  535. try channel.writeInbound(
  536. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  537. )
  538. )
  539. XCTAssertEqual(
  540. try channel.readInbound(as: RPCResponsePart.self),
  541. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  542. )
  543. // Send a message
  544. XCTAssertNoThrow(
  545. try channel.writeOutbound(RPCRequestPart.message(.init(repeating: 1, count: 42)))
  546. )
  547. // Assert we wrote it successfully into the channel
  548. let writtenMessage = try channel.assertReadDataOutbound()
  549. var expectedBuffer = ByteBuffer()
  550. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  551. expectedBuffer.writeInteger(UInt32(42)) // message length
  552. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  553. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  554. // Receive server's first message
  555. var buffer = ByteBuffer()
  556. buffer.writeInteger(UInt8(0)) // not compressed
  557. XCTAssertNoThrow(
  558. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  559. )
  560. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  561. buffer.clear()
  562. buffer.writeInteger(UInt32(30)) // message length
  563. XCTAssertNoThrow(
  564. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  565. )
  566. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  567. buffer.clear()
  568. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  569. XCTAssertNoThrow(
  570. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  571. )
  572. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  573. buffer.clear()
  574. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  575. XCTAssertNoThrow(
  576. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  577. )
  578. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  579. buffer.clear()
  580. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  581. XCTAssertNoThrow(
  582. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  583. )
  584. // Make sure we read the message properly
  585. XCTAssertEqual(
  586. try channel.readInbound(as: RPCResponsePart.self),
  587. RPCResponsePart.message(
  588. [UInt8](repeating: 0, count: 10) + [UInt8](repeating: 1, count: 10)
  589. + [UInt8](repeating: 2, count: 10)
  590. )
  591. )
  592. }
  593. func testSendMultipleMessagesInSingleBuffer() throws {
  594. let handler = GRPCClientStreamHandler(
  595. methodDescriptor: .init(service: "test", method: "test"),
  596. scheme: .http,
  597. outboundEncoding: .none,
  598. acceptedEncodings: [],
  599. maximumPayloadSize: 100,
  600. skipStateMachineAssertions: true
  601. )
  602. let channel = EmbeddedChannel(handler: handler)
  603. // Send client's initial metadata
  604. let request = RPCRequestPart.metadata([:])
  605. XCTAssertNoThrow(try channel.writeOutbound(request))
  606. // Make sure we have sent the corresponding frame, and that nothing has been written back.
  607. let writtenHeaders = try channel.assertReadHeadersOutbound()
  608. XCTAssertEqual(
  609. writtenHeaders.headers,
  610. [
  611. GRPCHTTP2Keys.method.rawValue: "POST",
  612. GRPCHTTP2Keys.scheme.rawValue: "http",
  613. GRPCHTTP2Keys.path.rawValue: "/test/test",
  614. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  615. GRPCHTTP2Keys.te.rawValue: "trailers",
  616. ]
  617. )
  618. XCTAssertNil(try channel.readInbound(as: RPCResponsePart.self))
  619. // Receive server's initial metadata
  620. let serverInitialMetadata: HPACKHeaders = [
  621. GRPCHTTP2Keys.status.rawValue: "200",
  622. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  623. "some-custom-header": "some-custom-value",
  624. ]
  625. XCTAssertNoThrow(
  626. try channel.writeInbound(
  627. HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
  628. )
  629. )
  630. XCTAssertEqual(
  631. try channel.readInbound(as: RPCResponsePart.self),
  632. RPCResponsePart.metadata(Metadata(headers: serverInitialMetadata))
  633. )
  634. // This is where this test actually begins. We want to write two messages
  635. // without flushing, and make sure that no messages are sent down the pipeline
  636. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  637. // Write back first message and make sure nothing's written in the channel.
  638. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 1, count: 4))))
  639. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  640. // Write back second message and make sure nothing's written in the channel.
  641. XCTAssertNoThrow(channel.write(RPCRequestPart.message([UInt8](repeating: 2, count: 4))))
  642. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  643. // Now flush and check we *do* write the data.
  644. channel.flush()
  645. let writtenMessage = try channel.assertReadDataOutbound()
  646. // Make sure both messages have been framed together in the ByteBuffer.
  647. XCTAssertEqual(
  648. writtenMessage.data,
  649. .byteBuffer(
  650. .init(bytes: [
  651. // First message
  652. 0, // Compression disabled
  653. 0, 0, 0, 4, // Message length
  654. 1, 1, 1, 1, // First message data
  655. // Second message
  656. 0, // Compression disabled
  657. 0, 0, 0, 4, // Message length
  658. 2, 2, 2, 2, // Second message data
  659. ])
  660. )
  661. )
  662. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  663. }
  664. func testUnexpectedStreamClose_ErrorFired() throws {
  665. let handler = GRPCClientStreamHandler(
  666. methodDescriptor: .init(service: "test", method: "test"),
  667. scheme: .http,
  668. outboundEncoding: .none,
  669. acceptedEncodings: [],
  670. maximumPayloadSize: 1,
  671. skipStateMachineAssertions: true
  672. )
  673. let channel = EmbeddedChannel(handler: handler)
  674. // Write client's initial metadata
  675. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  676. let clientInitialMetadata: HPACKHeaders = [
  677. GRPCHTTP2Keys.path.rawValue: "/test/test",
  678. GRPCHTTP2Keys.scheme.rawValue: "http",
  679. GRPCHTTP2Keys.method.rawValue: "POST",
  680. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  681. GRPCHTTP2Keys.te.rawValue: "trailers",
  682. ]
  683. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  684. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  685. // An error is fired down the pipeline
  686. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  687. channel.pipeline.fireErrorCaught(thrownError)
  688. // The client receives a status explaining the stream was closed because of the thrown error.
  689. XCTAssertEqual(
  690. try channel.readInbound(as: RPCResponsePart.self),
  691. .status(
  692. .init(
  693. code: .unavailable,
  694. message: "Stream unexpectedly closed with error."
  695. ),
  696. [:]
  697. )
  698. )
  699. // We should now be closed: check we can't write anymore.
  700. XCTAssertThrowsError(
  701. ofType: RPCError.self,
  702. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  703. ) { error in
  704. XCTAssertEqual(error.code, .internalError)
  705. XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
  706. }
  707. }
  708. func testUnexpectedStreamClose_ChannelInactive() throws {
  709. let handler = GRPCClientStreamHandler(
  710. methodDescriptor: .init(service: "test", method: "test"),
  711. scheme: .http,
  712. outboundEncoding: .none,
  713. acceptedEncodings: [],
  714. maximumPayloadSize: 1,
  715. skipStateMachineAssertions: true
  716. )
  717. let channel = EmbeddedChannel(handler: handler)
  718. // Write client's initial metadata
  719. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  720. let clientInitialMetadata: HPACKHeaders = [
  721. GRPCHTTP2Keys.path.rawValue: "/test/test",
  722. GRPCHTTP2Keys.scheme.rawValue: "http",
  723. GRPCHTTP2Keys.method.rawValue: "POST",
  724. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  725. GRPCHTTP2Keys.te.rawValue: "trailers",
  726. ]
  727. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  728. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  729. // Channel becomes inactive
  730. channel.pipeline.fireChannelInactive()
  731. // The client receives a status explaining the stream was closed.
  732. XCTAssertEqual(
  733. try channel.readInbound(as: RPCResponsePart.self),
  734. .status(
  735. .init(code: .unavailable, message: "Stream unexpectedly closed."),
  736. [:]
  737. )
  738. )
  739. // We should now be closed: check we can't write anymore.
  740. XCTAssertThrowsError(
  741. ofType: RPCError.self,
  742. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  743. ) { error in
  744. XCTAssertEqual(error.code, .internalError)
  745. XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
  746. }
  747. }
  748. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  749. let handler = GRPCClientStreamHandler(
  750. methodDescriptor: .init(service: "test", method: "test"),
  751. scheme: .http,
  752. outboundEncoding: .none,
  753. acceptedEncodings: [],
  754. maximumPayloadSize: 1,
  755. skipStateMachineAssertions: true
  756. )
  757. let channel = EmbeddedChannel(handler: handler)
  758. // Write client's initial metadata
  759. XCTAssertNoThrow(try channel.writeOutbound(RPCRequestPart.metadata(Metadata())))
  760. let clientInitialMetadata: HPACKHeaders = [
  761. GRPCHTTP2Keys.path.rawValue: "/test/test",
  762. GRPCHTTP2Keys.scheme.rawValue: "http",
  763. GRPCHTTP2Keys.method.rawValue: "POST",
  764. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  765. GRPCHTTP2Keys.te.rawValue: "trailers",
  766. ]
  767. let writtenInitialMetadata = try channel.assertReadHeadersOutbound()
  768. XCTAssertEqual(writtenInitialMetadata.headers, clientInitialMetadata)
  769. // Receive RST_STREAM
  770. XCTAssertNoThrow(
  771. try channel.writeInbound(
  772. HTTP2Frame.FramePayload.rstStream(.internalError)
  773. )
  774. )
  775. // The client receives a status explaining RST_STREAM was sent.
  776. XCTAssertEqual(
  777. try channel.readInbound(as: RPCResponsePart.self),
  778. .status(
  779. .init(
  780. code: .unavailable,
  781. message: "Stream unexpectedly closed: a RST_STREAM frame was received."
  782. ),
  783. [:]
  784. )
  785. )
  786. // We should now be closed: check we can't write anymore.
  787. XCTAssertThrowsError(
  788. ofType: RPCError.self,
  789. try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
  790. ) { error in
  791. XCTAssertEqual(error.code, .internalError)
  792. XCTAssertEqual(error.message, "Client is closed: can't send metadata.")
  793. }
  794. }
  795. }
  796. extension EmbeddedChannel {
  797. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  798. guard
  799. case .headers(let writtenHeaders) = try XCTUnwrap(
  800. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  801. )
  802. else {
  803. throw TestError.assertionFailure("Expected to write headers")
  804. }
  805. return writtenHeaders
  806. }
  807. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  808. guard
  809. case .data(let writtenMessage) = try XCTUnwrap(
  810. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  811. )
  812. else {
  813. throw TestError.assertionFailure("Expected to write data")
  814. }
  815. return writtenMessage
  816. }
  817. }
  818. private enum TestError: Error {
  819. case assertionFailure(String)
  820. }