GRPCServerStreamHandlerTests.swift 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCCore
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import NIOHTTP2
  21. import Testing
  22. import XCTest
  23. @testable import GRPCNIOTransportCore
  24. final class GRPCServerStreamHandlerTests: XCTestCase {
  25. private func makeServerStreamHandler(
  26. channel: any Channel,
  27. scheme: Scheme = .http,
  28. acceptedEncodings: CompressionAlgorithmSet = [],
  29. maxPayloadSize: Int = .max,
  30. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  31. disableAssertions: Bool = false
  32. ) -> GRPCServerStreamHandler {
  33. let serverConnectionManagementHandler = ServerConnectionManagementHandler(
  34. eventLoop: channel.eventLoop,
  35. maxIdleTime: nil,
  36. maxAge: nil,
  37. maxGraceTime: nil,
  38. keepaliveTime: nil,
  39. keepaliveTimeout: nil,
  40. allowKeepaliveWithoutCalls: false,
  41. minPingIntervalWithoutCalls: .minutes(5),
  42. requireALPN: false
  43. )
  44. return GRPCServerStreamHandler(
  45. scheme: scheme,
  46. acceptedEncodings: acceptedEncodings,
  47. maxPayloadSize: maxPayloadSize,
  48. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  49. eventLoop: channel.eventLoop,
  50. connectionManagementHandler: serverConnectionManagementHandler.syncView,
  51. skipStateMachineAssertions: disableAssertions
  52. )
  53. }
  54. func testH2FramesAreIgnored() throws {
  55. let channel = EmbeddedChannel()
  56. let handler = self.makeServerStreamHandler(channel: channel)
  57. try channel.pipeline.syncOperations.addHandler(handler)
  58. let framesToBeIgnored: [HTTP2Frame.FramePayload] = [
  59. .ping(.init(), ack: false),
  60. .goAway(lastStreamID: .rootStream, errorCode: .cancel, opaqueData: nil),
  61. .priority(
  62. HTTP2Frame.StreamPriorityData(exclusive: false, dependency: .rootStream, weight: 4)
  63. ),
  64. .settings(.ack),
  65. .pushPromise(.init(pushedStreamID: .maxID, headers: [:])),
  66. .windowUpdate(windowSizeIncrement: 4),
  67. .alternativeService(origin: nil, field: nil),
  68. .origin([]),
  69. ]
  70. for toBeIgnored in framesToBeIgnored {
  71. XCTAssertNoThrow(try channel.writeInbound(toBeIgnored))
  72. XCTAssertNil(try channel.readInbound(as: HTTP2Frame.FramePayload.self))
  73. }
  74. }
  75. func testClientInitialMetadataWithoutContentTypeResultsInRejectedRPC() throws {
  76. let channel = EmbeddedChannel()
  77. let handler = self.makeServerStreamHandler(channel: channel)
  78. try channel.pipeline.syncOperations.addHandler(handler)
  79. // Receive client's initial metadata without content-type
  80. let clientInitialMetadata: HPACKHeaders = [
  81. GRPCHTTP2Keys.path.rawValue: "/test/test",
  82. GRPCHTTP2Keys.scheme.rawValue: "http",
  83. GRPCHTTP2Keys.method.rawValue: "POST",
  84. GRPCHTTP2Keys.te.rawValue: "trailers",
  85. ]
  86. XCTAssertNoThrow(
  87. try channel.writeInbound(
  88. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  89. )
  90. )
  91. // Make sure we have sent a trailers-only response
  92. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  93. XCTAssertEqual(writtenTrailersOnlyResponse.headers, [":status": "415"])
  94. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  95. }
  96. func testClientInitialMetadataWithoutMethodResultsInRejectedRPC() throws {
  97. let channel = EmbeddedChannel()
  98. let handler = self.makeServerStreamHandler(channel: channel)
  99. try channel.pipeline.syncOperations.addHandler(handler)
  100. // Receive client's initial metadata without :method
  101. let clientInitialMetadata: HPACKHeaders = [
  102. GRPCHTTP2Keys.path.rawValue: "/test/test",
  103. GRPCHTTP2Keys.scheme.rawValue: "http",
  104. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  105. GRPCHTTP2Keys.te.rawValue: "trailers",
  106. ]
  107. XCTAssertNoThrow(
  108. try channel.writeInbound(
  109. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  110. )
  111. )
  112. // Make sure we have sent a trailers-only response
  113. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  114. XCTAssertEqual(
  115. writtenTrailersOnlyResponse.headers,
  116. [
  117. GRPCHTTP2Keys.status.rawValue: "200",
  118. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  119. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  120. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  121. ":method header is expected to be present and have a value of \"POST\".",
  122. ]
  123. )
  124. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  125. }
  126. func testClientInitialMetadataWithoutSchemeResultsInRejectedRPC() throws {
  127. let channel = EmbeddedChannel()
  128. let handler = self.makeServerStreamHandler(channel: channel)
  129. try channel.pipeline.syncOperations.addHandler(handler)
  130. // Receive client's initial metadata without :scheme
  131. let clientInitialMetadata: HPACKHeaders = [
  132. GRPCHTTP2Keys.path.rawValue: "/test/test",
  133. GRPCHTTP2Keys.method.rawValue: "POST",
  134. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  135. GRPCHTTP2Keys.te.rawValue: "trailers",
  136. ]
  137. XCTAssertNoThrow(
  138. try channel.writeInbound(
  139. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  140. )
  141. )
  142. // Make sure we have sent a trailers-only response
  143. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  144. XCTAssertEqual(
  145. writtenTrailersOnlyResponse.headers,
  146. [
  147. GRPCHTTP2Keys.status.rawValue: "200",
  148. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  149. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  150. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  151. ":scheme header must be present and one of \"http\" or \"https\".",
  152. ]
  153. )
  154. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  155. }
  156. func testClientInitialMetadataWithoutPathResultsInRejectedRPC() throws {
  157. let channel = EmbeddedChannel()
  158. let handler = self.makeServerStreamHandler(channel: channel)
  159. try channel.pipeline.syncOperations.addHandler(handler)
  160. // Receive client's initial metadata without :path
  161. let clientInitialMetadata: HPACKHeaders = [
  162. GRPCHTTP2Keys.scheme.rawValue: "http",
  163. GRPCHTTP2Keys.method.rawValue: "POST",
  164. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  165. GRPCHTTP2Keys.te.rawValue: "trailers",
  166. ]
  167. XCTAssertNoThrow(
  168. try channel.writeInbound(
  169. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  170. )
  171. )
  172. // Make sure we have sent a trailers-only response
  173. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  174. XCTAssertEqual(
  175. writtenTrailersOnlyResponse.headers,
  176. [
  177. GRPCHTTP2Keys.status.rawValue: "200",
  178. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  179. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.invalidArgument.rawValue),
  180. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "No :path header has been set.",
  181. ]
  182. )
  183. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  184. }
  185. func testNotAcceptedEncodingResultsInRejectedRPC() throws {
  186. let channel = EmbeddedChannel()
  187. let handler = self.makeServerStreamHandler(channel: channel)
  188. try channel.pipeline.syncOperations.addHandler(handler)
  189. // Receive client's initial metadata
  190. let clientInitialMetadata: HPACKHeaders = [
  191. GRPCHTTP2Keys.path.rawValue: "/test/test",
  192. GRPCHTTP2Keys.scheme.rawValue: "http",
  193. GRPCHTTP2Keys.method.rawValue: "POST",
  194. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  195. GRPCHTTP2Keys.te.rawValue: "trailers",
  196. GRPCHTTP2Keys.encoding.rawValue: "deflate",
  197. ]
  198. XCTAssertNoThrow(
  199. try channel.writeInbound(
  200. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  201. )
  202. )
  203. // Make sure we have sent a trailers-only response
  204. let writtenTrailersOnlyResponse = try channel.assertReadHeadersOutbound()
  205. XCTAssertEqual(
  206. writtenTrailersOnlyResponse.headers,
  207. [
  208. GRPCHTTP2Keys.status.rawValue: "200",
  209. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  210. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.unimplemented.rawValue),
  211. GRPCHTTP2Keys.grpcStatusMessage.rawValue:
  212. "deflate compression is not supported; supported algorithms are listed in grpc-accept-encoding",
  213. GRPCHTTP2Keys.acceptEncoding.rawValue: "identity",
  214. ]
  215. )
  216. XCTAssertTrue(writtenTrailersOnlyResponse.endStream)
  217. }
  218. func testOverMaximumPayloadSize() throws {
  219. let channel = EmbeddedChannel()
  220. let handler = self.makeServerStreamHandler(channel: channel, maxPayloadSize: 1)
  221. try channel.pipeline.syncOperations.addHandler(handler)
  222. // Receive client's initial metadata
  223. let clientInitialMetadata: HPACKHeaders = [
  224. GRPCHTTP2Keys.path.rawValue: "/test/test",
  225. GRPCHTTP2Keys.scheme.rawValue: "http",
  226. GRPCHTTP2Keys.method.rawValue: "POST",
  227. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  228. GRPCHTTP2Keys.te.rawValue: "trailers",
  229. ]
  230. XCTAssertNoThrow(
  231. try channel.writeInbound(
  232. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  233. )
  234. )
  235. // Make sure we haven't sent back an error response, and that we read the initial metadata
  236. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  237. XCTAssertEqual(
  238. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  239. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  240. )
  241. // Write back server's initial metadata
  242. let headers: HPACKHeaders = [
  243. "some-custom-header": "some-custom-value"
  244. ]
  245. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  246. Metadata(headers: headers)
  247. )
  248. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  249. // Make sure we wrote back the initial metadata
  250. let writtenHeaders = try channel.assertReadHeadersOutbound()
  251. XCTAssertEqual(
  252. writtenHeaders.headers,
  253. [
  254. GRPCHTTP2Keys.status.rawValue: "200",
  255. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  256. "some-custom-header": "some-custom-value",
  257. ]
  258. )
  259. // Receive client's message
  260. var buffer = ByteBuffer()
  261. buffer.writeInteger(UInt8(0)) // not compressed
  262. buffer.writeInteger(UInt32(42)) // message length
  263. buffer.writeRepeatingByte(0, count: 42) // message
  264. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  265. XCTAssertThrowsError(
  266. ofType: RPCError.self,
  267. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  268. ) { error in
  269. XCTAssertEqual(error.code, .internalError)
  270. XCTAssertEqual(error.message, "Failed to decode message")
  271. }
  272. // Make sure we haven't sent a response back and that we didn't read the received message
  273. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  274. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  275. }
  276. func testClientEndsStream() throws {
  277. let channel = EmbeddedChannel()
  278. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  279. try channel.pipeline.syncOperations.addHandler(handler)
  280. // Receive client's initial metadata with end stream set
  281. let clientInitialMetadata: HPACKHeaders = [
  282. GRPCHTTP2Keys.path.rawValue: "/test/test",
  283. GRPCHTTP2Keys.scheme.rawValue: "http",
  284. GRPCHTTP2Keys.method.rawValue: "POST",
  285. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  286. GRPCHTTP2Keys.te.rawValue: "trailers",
  287. ]
  288. XCTAssertNoThrow(
  289. try channel.writeInbound(
  290. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata, endStream: true))
  291. )
  292. )
  293. // Make sure we haven't sent back an error response, and that we read the initial metadata
  294. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  295. XCTAssertEqual(
  296. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  297. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  298. )
  299. // Write back server's initial metadata
  300. let headers: HPACKHeaders = [
  301. "some-custom-header": "some-custom-value"
  302. ]
  303. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  304. Metadata(headers: headers)
  305. )
  306. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  307. // Make sure we wrote back the initial metadata
  308. let writtenHeaders = try channel.assertReadHeadersOutbound()
  309. XCTAssertEqual(
  310. writtenHeaders.headers,
  311. [
  312. GRPCHTTP2Keys.status.rawValue: "200",
  313. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  314. "some-custom-header": "some-custom-value",
  315. ]
  316. )
  317. // We should throw if the client sends another message, since it's closed the stream already.
  318. var buffer = ByteBuffer()
  319. buffer.writeInteger(UInt8(0)) // not compressed
  320. buffer.writeInteger(UInt32(42)) // message length
  321. buffer.writeRepeatingByte(0, count: 42) // message
  322. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  323. XCTAssertThrowsError(
  324. ofType: RPCError.self,
  325. try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
  326. ) { error in
  327. XCTAssertEqual(error.code, .internalError)
  328. XCTAssertEqual(error.message, "Invalid state")
  329. }
  330. }
  331. func testNormalFlow() throws {
  332. let channel = EmbeddedChannel()
  333. let handler = self.makeServerStreamHandler(channel: channel, disableAssertions: true)
  334. try channel.pipeline.syncOperations.addHandler(handler)
  335. // Receive client's initial metadata
  336. let clientInitialMetadata: HPACKHeaders = [
  337. GRPCHTTP2Keys.path.rawValue: "/test/test",
  338. GRPCHTTP2Keys.scheme.rawValue: "http",
  339. GRPCHTTP2Keys.method.rawValue: "POST",
  340. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  341. GRPCHTTP2Keys.te.rawValue: "trailers",
  342. ]
  343. XCTAssertNoThrow(
  344. try channel.writeInbound(
  345. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  346. )
  347. )
  348. // Make sure we haven't sent back an error response, and that we read the initial metadata
  349. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  350. XCTAssertEqual(
  351. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  352. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  353. )
  354. // Write back server's initial metadata
  355. let headers: HPACKHeaders = [
  356. "some-custom-header": "some-custom-value"
  357. ]
  358. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  359. Metadata(headers: headers)
  360. )
  361. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  362. // Make sure we wrote back the initial metadata
  363. let writtenHeaders = try channel.assertReadHeadersOutbound()
  364. XCTAssertEqual(
  365. writtenHeaders.headers,
  366. [
  367. GRPCHTTP2Keys.status.rawValue: "200",
  368. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  369. "some-custom-header": "some-custom-value",
  370. ]
  371. )
  372. // Receive client's message
  373. var buffer = ByteBuffer()
  374. buffer.writeInteger(UInt8(0)) // not compressed
  375. buffer.writeInteger(UInt32(42)) // message length
  376. buffer.writeRepeatingByte(0, count: 42) // message
  377. let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
  378. XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))
  379. // Make sure we haven't sent back an error response, and that we read the message properly
  380. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  381. XCTAssertEqual(
  382. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  383. RPCRequestPart.message(GRPCNIOTransportBytes(repeating: 0, count: 42))
  384. )
  385. // Write back response
  386. let serverDataPayload = RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 42))
  387. XCTAssertNoThrow(try channel.writeOutbound(serverDataPayload))
  388. // Make sure we wrote back the right message
  389. let writtenMessage = try channel.assertReadDataOutbound()
  390. var expectedBuffer = ByteBuffer()
  391. expectedBuffer.writeInteger(UInt8(0)) // not compressed
  392. expectedBuffer.writeInteger(UInt32(42)) // message length
  393. expectedBuffer.writeRepeatingByte(1, count: 42) // message
  394. XCTAssertEqual(writtenMessage.data, .byteBuffer(expectedBuffer))
  395. // Send back status to end RPC
  396. let trailers = RPCResponsePart<GRPCNIOTransportBytes>.status(
  397. .init(code: .dataLoss, message: "Test data loss"),
  398. ["custom-header": "custom-value"]
  399. )
  400. XCTAssertNoThrow(try channel.writeOutbound(trailers))
  401. // Make sure we wrote back the status and trailers
  402. let writtenStatus = try channel.assertReadHeadersOutbound()
  403. XCTAssertTrue(writtenStatus.endStream)
  404. XCTAssertEqual(
  405. writtenStatus.headers,
  406. [
  407. GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.dataLoss.rawValue),
  408. GRPCHTTP2Keys.grpcStatusMessage.rawValue: "Test data loss",
  409. "custom-header": "custom-value",
  410. ]
  411. )
  412. // Try writing and assert it throws to make sure we don't allow writes
  413. // after closing.
  414. XCTAssertThrowsError(
  415. ofType: RPCError.self,
  416. try channel.writeOutbound(trailers)
  417. ) { error in
  418. XCTAssertEqual(error.code, .internalError)
  419. XCTAssertEqual(error.message, "Invalid state")
  420. }
  421. }
  422. func testReceiveMessageSplitAcrossMultipleBuffers() throws {
  423. let channel = EmbeddedChannel()
  424. let handler = self.makeServerStreamHandler(channel: channel)
  425. try channel.pipeline.syncOperations.addHandler(handler)
  426. // Receive client's initial metadata
  427. let clientInitialMetadata: HPACKHeaders = [
  428. GRPCHTTP2Keys.path.rawValue: "/test/test",
  429. GRPCHTTP2Keys.scheme.rawValue: "http",
  430. GRPCHTTP2Keys.method.rawValue: "POST",
  431. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  432. GRPCHTTP2Keys.te.rawValue: "trailers",
  433. ]
  434. XCTAssertNoThrow(
  435. try channel.writeInbound(
  436. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  437. )
  438. )
  439. // Make sure we haven't sent back an error response, and that we read the initial metadata
  440. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  441. XCTAssertEqual(
  442. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  443. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  444. )
  445. // Write back server's initial metadata
  446. let headers: HPACKHeaders = [
  447. "some-custom-header": "some-custom-value"
  448. ]
  449. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  450. Metadata(headers: headers)
  451. )
  452. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  453. // Make sure we wrote back the initial metadata
  454. let writtenHeaders = try channel.assertReadHeadersOutbound()
  455. XCTAssertEqual(
  456. writtenHeaders.headers,
  457. [
  458. GRPCHTTP2Keys.status.rawValue: "200",
  459. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  460. "some-custom-header": "some-custom-value",
  461. ]
  462. )
  463. // Receive client's first message
  464. var buffer = ByteBuffer()
  465. buffer.writeInteger(UInt8(0)) // not compressed
  466. XCTAssertNoThrow(
  467. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  468. )
  469. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  470. buffer.clear()
  471. buffer.writeInteger(UInt32(30)) // message length
  472. XCTAssertNoThrow(
  473. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  474. )
  475. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  476. buffer.clear()
  477. buffer.writeRepeatingByte(0, count: 10) // first part of the message
  478. XCTAssertNoThrow(
  479. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  480. )
  481. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  482. buffer.clear()
  483. buffer.writeRepeatingByte(1, count: 10) // second part of the message
  484. XCTAssertNoThrow(
  485. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  486. )
  487. XCTAssertNil(try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self))
  488. buffer.clear()
  489. buffer.writeRepeatingByte(2, count: 10) // third part of the message
  490. XCTAssertNoThrow(
  491. try channel.writeInbound(HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))))
  492. )
  493. var expected = ByteBuffer()
  494. expected.writeRepeatingByte(0, count: 10)
  495. expected.writeRepeatingByte(1, count: 10)
  496. expected.writeRepeatingByte(2, count: 10)
  497. // Make sure we haven't sent back an error response, and that we read the message properly
  498. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  499. XCTAssertEqual(
  500. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  501. RPCRequestPart.message(GRPCNIOTransportBytes(expected))
  502. )
  503. }
  504. func testReceiveMultipleHeaders() throws {
  505. let channel = EmbeddedChannel()
  506. let handler = self.makeServerStreamHandler(channel: channel)
  507. try channel.pipeline.syncOperations.addHandler(handler)
  508. // Receive client's initial metadata
  509. let clientInitialMetadata: HPACKHeaders = [
  510. GRPCHTTP2Keys.path.rawValue: "/test/test",
  511. GRPCHTTP2Keys.scheme.rawValue: "http",
  512. GRPCHTTP2Keys.method.rawValue: "POST",
  513. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  514. GRPCHTTP2Keys.te.rawValue: "trailers",
  515. ]
  516. try channel.writeInbound(HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata)))
  517. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  518. // Receive them again. Should be a protocol violation.
  519. XCTAssertThrowsError(
  520. ofType: RPCError.self,
  521. try channel.writeInbound(
  522. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  523. )
  524. ) { error in
  525. XCTAssertEqual(error.code, .unavailable)
  526. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  527. }
  528. let payload = try XCTUnwrap(channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  529. switch payload {
  530. case .rstStream(let errorCode):
  531. XCTAssertEqual(errorCode, .protocolError)
  532. default:
  533. XCTFail("Expected RST_STREAM, got \(payload)")
  534. }
  535. }
  536. func testSendMultipleMessagesInSingleBuffer() throws {
  537. let channel = EmbeddedChannel()
  538. let handler = self.makeServerStreamHandler(channel: channel)
  539. try channel.pipeline.syncOperations.addHandler(handler)
  540. // Receive client's initial metadata
  541. let clientInitialMetadata: HPACKHeaders = [
  542. GRPCHTTP2Keys.path.rawValue: "/test/test",
  543. GRPCHTTP2Keys.scheme.rawValue: "http",
  544. GRPCHTTP2Keys.method.rawValue: "POST",
  545. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  546. GRPCHTTP2Keys.te.rawValue: "trailers",
  547. ]
  548. XCTAssertNoThrow(
  549. try channel.writeInbound(
  550. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  551. )
  552. )
  553. // Make sure we haven't sent back an error response, and that we read the initial metadata
  554. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  555. XCTAssertEqual(
  556. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  557. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  558. )
  559. // Write back server's initial metadata
  560. let headers: HPACKHeaders = [
  561. "some-custom-header": "some-custom-value"
  562. ]
  563. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  564. Metadata(headers: headers)
  565. )
  566. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  567. // Read out the metadata
  568. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  569. // This is where this test actually begins. We want to write two messages
  570. // without flushing, and make sure that no messages are sent down the pipeline
  571. // until we flush. Once we flush, both messages should be sent in the same ByteBuffer.
  572. // Write back first message and make sure nothing's written in the channel.
  573. XCTAssertNoThrow(
  574. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  575. )
  576. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  577. // Write back second message and make sure nothing's written in the channel.
  578. XCTAssertNoThrow(
  579. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 2, count: 4)))
  580. )
  581. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  582. // Now flush and check we *do* write the data.
  583. channel.flush()
  584. let writtenMessage = try channel.assertReadDataOutbound()
  585. // Make sure both messages have been framed together in the ByteBuffer.
  586. XCTAssertEqual(
  587. writtenMessage.data,
  588. .byteBuffer(
  589. .init(bytes: [
  590. // First message
  591. 0, // Compression disabled
  592. 0, 0, 0, 4, // Message length
  593. 1, 1, 1, 1, // First message data
  594. // Second message
  595. 0, // Compression disabled
  596. 0, 0, 0, 4, // Message length
  597. 2, 2, 2, 2, // Second message data
  598. ])
  599. )
  600. )
  601. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  602. }
  603. func testMessageAndStatusAreNotReordered() throws {
  604. let channel = EmbeddedChannel()
  605. let handler = self.makeServerStreamHandler(channel: channel)
  606. try channel.pipeline.syncOperations.addHandler(handler)
  607. // Receive client's initial metadata
  608. let clientInitialMetadata: HPACKHeaders = [
  609. GRPCHTTP2Keys.path.rawValue: "/test/test",
  610. GRPCHTTP2Keys.scheme.rawValue: "http",
  611. GRPCHTTP2Keys.method.rawValue: "POST",
  612. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  613. GRPCHTTP2Keys.te.rawValue: "trailers",
  614. ]
  615. XCTAssertNoThrow(
  616. try channel.writeInbound(
  617. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  618. )
  619. )
  620. // Make sure we haven't sent back an error response, and that we read the initial metadata
  621. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  622. XCTAssertEqual(
  623. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  624. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  625. )
  626. // Write back server's initial metadata
  627. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata(
  628. Metadata(headers: [:])
  629. )
  630. XCTAssertNoThrow(try channel.writeOutbound(serverInitialMetadata))
  631. // Read out the metadata
  632. _ = try channel.readOutbound(as: HTTP2Frame.FramePayload.self)
  633. // This is where this test actually begins. We want to write a message followed
  634. // by status and trailers, and only flush after both writes.
  635. // Because messages are buffered and potentially bundled together in a single
  636. // ByteBuffer by the GPRCMessageFramer, we want to make sure that the status
  637. // and trailers won't be written before the messages.
  638. // Write back message and make sure nothing's written in the channel.
  639. XCTAssertNoThrow(
  640. channel.write(RPCResponsePart.message(GRPCNIOTransportBytes(repeating: 1, count: 4)))
  641. )
  642. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  643. // Write status + metadata and make sure nothing's written.
  644. XCTAssertNoThrow(
  645. channel.write(
  646. RPCResponsePart<GRPCNIOTransportBytes>.status(.init(code: .ok, message: ""), [:])
  647. )
  648. )
  649. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  650. // Now flush and check we *do* write the data in the right order: message first,
  651. // trailers second.
  652. channel.flush()
  653. let writtenMessage = try channel.assertReadDataOutbound()
  654. // Make sure we first get message.
  655. XCTAssertEqual(
  656. writtenMessage.data,
  657. .byteBuffer(
  658. .init(bytes: [
  659. // First message
  660. 0, // Compression disabled
  661. 0, 0, 0, 4, // Message length
  662. 1, 1, 1, 1, // First message data
  663. ])
  664. )
  665. )
  666. XCTAssertFalse(writtenMessage.endStream)
  667. // Make sure we get trailers.
  668. let writtenTrailers = try channel.assertReadHeadersOutbound()
  669. XCTAssertEqual(writtenTrailers.headers, ["grpc-status": "0"])
  670. XCTAssertTrue(writtenTrailers.endStream)
  671. // Make sure we get nothing else.
  672. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  673. }
  674. func testMethodDescriptorPromiseSucceeds() throws {
  675. let channel = EmbeddedChannel()
  676. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  677. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  678. try channel.pipeline.syncOperations.addHandler(handler)
  679. // Receive client's initial metadata
  680. let clientInitialMetadata: HPACKHeaders = [
  681. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  682. GRPCHTTP2Keys.scheme.rawValue: "http",
  683. GRPCHTTP2Keys.method.rawValue: "POST",
  684. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  685. GRPCHTTP2Keys.te.rawValue: "trailers",
  686. ]
  687. XCTAssertNoThrow(
  688. try channel.writeInbound(
  689. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  690. )
  691. )
  692. // Make sure we haven't sent back an error response, and that we read the initial metadata
  693. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  694. XCTAssertEqual(
  695. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  696. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  697. )
  698. XCTAssertEqual(
  699. try promise.futureResult.wait(),
  700. MethodDescriptor(fullyQualifiedService: "SomeService", method: "SomeMethod")
  701. )
  702. }
  703. func testMethodDescriptorPromiseIsFailedWhenHandlerRemoved() throws {
  704. let channel = EmbeddedChannel()
  705. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  706. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  707. try channel.pipeline.syncOperations.addHandler(handler)
  708. try channel.pipeline.syncOperations.removeHandler(handler).wait()
  709. XCTAssertThrowsError(
  710. ofType: RPCError.self,
  711. try promise.futureResult.wait()
  712. ) { error in
  713. XCTAssertEqual(error.code, .unavailable)
  714. XCTAssertEqual(error.message, "RPC stream was closed before we got any Metadata.")
  715. }
  716. }
  717. func testMethodDescriptorPromiseIsFailedIfRPCRejected() throws {
  718. let channel = EmbeddedChannel()
  719. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  720. let handler = self.makeServerStreamHandler(channel: channel, descriptorPromise: promise)
  721. try channel.pipeline.syncOperations.addHandler(handler)
  722. // Receive client's initial metadata
  723. let clientInitialMetadata: HPACKHeaders = [
  724. GRPCHTTP2Keys.path.rawValue: "SomeService/SomeMethod",
  725. GRPCHTTP2Keys.scheme.rawValue: "http",
  726. GRPCHTTP2Keys.method.rawValue: "POST",
  727. GRPCHTTP2Keys.contentType.rawValue: "application/not-valid-contenttype",
  728. GRPCHTTP2Keys.te.rawValue: "trailers",
  729. ]
  730. XCTAssertNoThrow(
  731. try channel.writeInbound(
  732. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  733. )
  734. )
  735. XCTAssertThrowsError(
  736. ofType: RPCError.self,
  737. try promise.futureResult.wait()
  738. ) { error in
  739. XCTAssertEqual(error.code, .unavailable)
  740. XCTAssertEqual(error.message, "RPC was rejected.")
  741. }
  742. }
  743. func testUnexpectedStreamClose_ErrorFired() throws {
  744. let channel = EmbeddedChannel()
  745. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  746. let handler = self.makeServerStreamHandler(
  747. channel: channel,
  748. descriptorPromise: promise,
  749. disableAssertions: true
  750. )
  751. try channel.pipeline.syncOperations.addHandler(handler)
  752. // Receive client's initial metadata
  753. let clientInitialMetadata: HPACKHeaders = [
  754. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  755. GRPCHTTP2Keys.scheme.rawValue: "http",
  756. GRPCHTTP2Keys.method.rawValue: "POST",
  757. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  758. GRPCHTTP2Keys.te.rawValue: "trailers",
  759. ]
  760. XCTAssertNoThrow(
  761. try channel.writeInbound(
  762. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  763. )
  764. )
  765. // Make sure we haven't sent back an error response, and that we read the initial metadata
  766. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  767. XCTAssertEqual(
  768. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  769. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  770. )
  771. // An error is fired down the pipeline
  772. let thrownError = ChannelError.connectTimeout(.milliseconds(100))
  773. channel.pipeline.fireErrorCaught(thrownError)
  774. // The server handler simply forwards the error.
  775. XCTAssertThrowsError(
  776. ofType: type(of: thrownError),
  777. try channel.throwIfErrorCaught()
  778. ) { error in
  779. XCTAssertEqual(error, thrownError)
  780. }
  781. // We should now be closed: check we can't write anymore.
  782. XCTAssertThrowsError(
  783. ofType: RPCError.self,
  784. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  785. ) { error in
  786. XCTAssertEqual(error.code, .internalError)
  787. XCTAssertEqual(error.message, "Invalid state")
  788. }
  789. }
  790. func testUnexpectedStreamClose_ChannelInactive() throws {
  791. let channel = EmbeddedChannel()
  792. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  793. let handler = self.makeServerStreamHandler(
  794. channel: channel,
  795. descriptorPromise: promise,
  796. disableAssertions: true
  797. )
  798. try channel.pipeline.syncOperations.addHandler(handler)
  799. // Receive client's initial metadata
  800. let clientInitialMetadata: HPACKHeaders = [
  801. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  802. GRPCHTTP2Keys.scheme.rawValue: "http",
  803. GRPCHTTP2Keys.method.rawValue: "POST",
  804. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  805. GRPCHTTP2Keys.te.rawValue: "trailers",
  806. ]
  807. XCTAssertNoThrow(
  808. try channel.writeInbound(
  809. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  810. )
  811. )
  812. // Make sure we haven't sent back an error response, and that we read the initial metadata
  813. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  814. XCTAssertEqual(
  815. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  816. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  817. )
  818. // Channel becomes inactive
  819. channel.pipeline.fireChannelInactive()
  820. // The server handler fires an error
  821. XCTAssertThrowsError(
  822. ofType: RPCError.self,
  823. try channel.throwIfErrorCaught()
  824. ) { error in
  825. XCTAssertEqual(error.code, .unavailable)
  826. XCTAssertEqual(error.message, "Stream unexpectedly closed.")
  827. }
  828. // We should now be closed: check we can't write anymore.
  829. XCTAssertThrowsError(
  830. ofType: RPCError.self,
  831. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata(Metadata()))
  832. ) { error in
  833. XCTAssertEqual(error.code, .internalError)
  834. XCTAssertEqual(error.message, "Invalid state")
  835. }
  836. }
  837. func testUnexpectedStreamClose_ResetStreamFrame() throws {
  838. let channel = EmbeddedChannel()
  839. let promise = channel.eventLoop.makePromise(of: MethodDescriptor.self)
  840. let handler = self.makeServerStreamHandler(
  841. channel: channel,
  842. descriptorPromise: promise,
  843. disableAssertions: true
  844. )
  845. try channel.pipeline.syncOperations.addHandler(handler)
  846. // Receive client's initial metadata
  847. let clientInitialMetadata: HPACKHeaders = [
  848. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  849. GRPCHTTP2Keys.scheme.rawValue: "http",
  850. GRPCHTTP2Keys.method.rawValue: "POST",
  851. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  852. GRPCHTTP2Keys.te.rawValue: "trailers",
  853. ]
  854. XCTAssertNoThrow(
  855. try channel.writeInbound(
  856. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  857. )
  858. )
  859. // Make sure we haven't sent back an error response, and that we read the initial metadata
  860. XCTAssertNil(try channel.readOutbound(as: HTTP2Frame.FramePayload.self))
  861. XCTAssertEqual(
  862. try channel.readInbound(as: RPCRequestPart<GRPCNIOTransportBytes>.self),
  863. RPCRequestPart<GRPCNIOTransportBytes>.metadata(Metadata(headers: clientInitialMetadata))
  864. )
  865. // We receive RST_STREAM frame
  866. // Assert the server handler fires an error
  867. XCTAssertThrowsError(
  868. ofType: RPCError.self,
  869. try channel.writeInbound(
  870. HTTP2Frame.FramePayload.rstStream(.internalError)
  871. )
  872. ) { error in
  873. XCTAssertEqual(error.code, .unavailable)
  874. XCTAssertEqual(error.message, "Stream unexpectedly closed: a RST_STREAM frame was received.")
  875. }
  876. // We should now be closed: check we can't write anymore.
  877. XCTAssertThrowsError(
  878. ofType: RPCError.self,
  879. try channel.writeOutbound(RPCResponsePart<GRPCNIOTransportBytes>.metadata([:]))
  880. ) { error in
  881. XCTAssertEqual(error.code, .internalError)
  882. XCTAssertEqual(error.message, "Invalid state")
  883. }
  884. }
  885. }
  886. struct ServerStreamHandlerTests {
  887. struct ConnectionAndStreamHandlers {
  888. let streamHandler: GRPCServerStreamHandler
  889. let connectionHandler: ServerConnectionManagementHandler
  890. }
  891. private func makeServerConnectionAndStreamHandlers(
  892. channel: any Channel,
  893. scheme: Scheme = .http,
  894. acceptedEncodings: CompressionAlgorithmSet = [],
  895. maxPayloadSize: Int = .max,
  896. descriptorPromise: EventLoopPromise<MethodDescriptor>? = nil,
  897. disableAssertions: Bool = false
  898. ) -> ConnectionAndStreamHandlers {
  899. let connectionManagementHandler = ServerConnectionManagementHandler(
  900. eventLoop: channel.eventLoop,
  901. maxIdleTime: nil,
  902. maxAge: nil,
  903. maxGraceTime: nil,
  904. keepaliveTime: nil,
  905. keepaliveTimeout: nil,
  906. allowKeepaliveWithoutCalls: false,
  907. minPingIntervalWithoutCalls: .minutes(5),
  908. requireALPN: false
  909. )
  910. let streamHandler = GRPCServerStreamHandler(
  911. scheme: scheme,
  912. acceptedEncodings: acceptedEncodings,
  913. maxPayloadSize: maxPayloadSize,
  914. methodDescriptorPromise: descriptorPromise ?? channel.eventLoop.makePromise(),
  915. eventLoop: channel.eventLoop,
  916. connectionManagementHandler: connectionManagementHandler.syncView,
  917. skipStateMachineAssertions: disableAssertions
  918. )
  919. return ConnectionAndStreamHandlers(
  920. streamHandler: streamHandler,
  921. connectionHandler: connectionManagementHandler
  922. )
  923. }
  924. @Test("ChannelShouldQuiesceEvent is buffered and turns into RPC cancellation")
  925. func shouldQuiesceEventIsBufferedBeforeHandleIsSet() async throws {
  926. let channel = EmbeddedChannel()
  927. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  928. try channel.pipeline.syncOperations.addHandler(handler)
  929. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  930. await withServerContextRPCCancellationHandle { handle in
  931. handler.setCancellationHandle(handle)
  932. #expect(handle.isCancelled)
  933. }
  934. // Throwing is fine: the channel is closed abruptly, errors are expected.
  935. _ = try? channel.finish()
  936. }
  937. @Test("ChannelShouldQuiesceEvent turns into RPC cancellation")
  938. func shouldQuiesceEventTriggersCancellation() async throws {
  939. let channel = EmbeddedChannel()
  940. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  941. try channel.pipeline.syncOperations.addHandler(handler)
  942. await withServerContextRPCCancellationHandle { handle in
  943. handler.setCancellationHandle(handle)
  944. #expect(!handle.isCancelled)
  945. channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
  946. #expect(handle.isCancelled)
  947. }
  948. // Throwing is fine: the channel is closed abruptly, errors are expected.
  949. _ = try? channel.finish()
  950. }
  951. @Test("RST_STREAM turns into RPC cancellation")
  952. func rstStreamTriggersCancellation() async throws {
  953. let channel = EmbeddedChannel()
  954. let handler = self.makeServerConnectionAndStreamHandlers(channel: channel).streamHandler
  955. try channel.pipeline.syncOperations.addHandler(handler)
  956. await withServerContextRPCCancellationHandle { handle in
  957. handler.setCancellationHandle(handle)
  958. #expect(!handle.isCancelled)
  959. let rstStream: HTTP2Frame.FramePayload = .rstStream(.cancel)
  960. channel.pipeline.fireChannelRead(rstStream)
  961. #expect(handle.isCancelled)
  962. }
  963. // Throwing is fine: the channel is closed abruptly, errors are expected.
  964. _ = try? channel.finish()
  965. }
  966. @Test("Connection FrameStats are updated when writing headers or data frames")
  967. func connectionFrameStatsAreUpdatedAccordingly() async throws {
  968. let channel = EmbeddedChannel()
  969. let handlers = self.makeServerConnectionAndStreamHandlers(channel: channel)
  970. try channel.pipeline.syncOperations.addHandler(handlers.streamHandler)
  971. // We have written nothing yet, so expect FrameStats/didWriteHeadersOrData to be false
  972. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  973. // FrameStats aren't affected by pings received
  974. channel.pipeline.fireChannelRead(
  975. HTTP2Frame.FramePayload.ping(.init(withInteger: 42), ack: false)
  976. )
  977. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  978. // Now write back headers and make sure FrameStats are updated accordingly:
  979. // To do that, we first need to receive client's initial metadata...
  980. let clientInitialMetadata: HPACKHeaders = [
  981. GRPCHTTP2Keys.path.rawValue: "/SomeService/SomeMethod",
  982. GRPCHTTP2Keys.scheme.rawValue: "http",
  983. GRPCHTTP2Keys.method.rawValue: "POST",
  984. GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
  985. GRPCHTTP2Keys.te.rawValue: "trailers",
  986. ]
  987. try channel.writeInbound(
  988. HTTP2Frame.FramePayload.headers(.init(headers: clientInitialMetadata))
  989. )
  990. // Now we write back server's initial metadata...
  991. let serverInitialMetadata = RPCResponsePart<GRPCNIOTransportBytes>.metadata([:])
  992. try channel.writeOutbound(serverInitialMetadata)
  993. // And this should have updated the FrameStats
  994. #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  995. // Manually reset the FrameStats to make sure that writing data also updates it correctly.
  996. handlers.connectionHandler.frameStats.reset()
  997. #expect(!handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  998. try channel.writeOutbound(RPCResponsePart.message(GRPCNIOTransportBytes([42])))
  999. #expect(handlers.connectionHandler.frameStats.didWriteHeadersOrData)
  1000. // Clean up.
  1001. // Throwing is fine: the channel is closed abruptly, errors are expected.
  1002. _ = try? channel.finish()
  1003. }
  1004. }
  1005. extension EmbeddedChannel {
  1006. fileprivate func assertReadHeadersOutbound() throws -> HTTP2Frame.FramePayload.Headers {
  1007. guard
  1008. case .headers(let writtenHeaders) = try XCTUnwrap(
  1009. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  1010. )
  1011. else {
  1012. throw TestError.assertionFailure("Expected to write headers")
  1013. }
  1014. return writtenHeaders
  1015. }
  1016. fileprivate func assertReadDataOutbound() throws -> HTTP2Frame.FramePayload.Data {
  1017. guard
  1018. case .data(let writtenMessage) = try XCTUnwrap(
  1019. try self.readOutbound(as: HTTP2Frame.FramePayload.self)
  1020. )
  1021. else {
  1022. throw TestError.assertionFailure("Expected to write data")
  1023. }
  1024. return writtenMessage
  1025. }
  1026. }
  1027. private enum TestError: Error {
  1028. case assertionFailure(String)
  1029. }