CompressionTests.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOPosix
  23. import XCTest
  24. class MessageCompressionTests: GRPCTestCase {
  25. var group: EventLoopGroup!
  26. var server: Server!
  27. var client: ClientConnection!
  28. var defaultTimeout: TimeInterval = 1.0
  29. var echo: Echo_EchoNIOClient!
  30. override func setUp() {
  31. super.setUp()
  32. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  33. }
  34. override func tearDown() {
  35. XCTAssertNoThrow(try self.client.close().wait())
  36. XCTAssertNoThrow(try self.server.close().wait())
  37. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  38. super.tearDown()
  39. }
  40. func setupServer(encoding: ServerMessageEncoding) throws {
  41. self.server = try Server.insecure(group: self.group)
  42. .withServiceProviders([EchoProvider()])
  43. .withMessageCompression(encoding)
  44. .withLogger(self.serverLogger)
  45. .bind(host: "localhost", port: 0)
  46. .wait()
  47. }
  48. func setupClient(encoding: ClientMessageEncoding) {
  49. self.client = ClientConnection.insecure(group: self.group)
  50. .withBackgroundActivityLogger(self.clientLogger)
  51. .connect(host: "localhost", port: self.server.channel.localAddress!.port!)
  52. self.echo = Echo_EchoNIOClient(
  53. channel: self.client,
  54. defaultCallOptions: CallOptions(messageEncoding: encoding, logger: self.clientLogger)
  55. )
  56. }
  57. func testCompressedRequestsUncompressedResponses() throws {
  58. // Enable compression, but don't advertise that it's enabled.
  59. // The spec says that servers should handle compression they support but don't advertise.
  60. try self
  61. .setupServer(encoding: .enabled(.init(enabledAlgorithms: [], decompressionLimit: .ratio(10))))
  62. self.setupClient(encoding: .enabled(.init(
  63. forRequests: .gzip,
  64. acceptableForResponses: [.deflate, .gzip],
  65. decompressionLimit: .ratio(10)
  66. )))
  67. let get = self.echo.get(.with { $0.text = "foo" })
  68. let initialMetadata = self.expectation(description: "received initial metadata")
  69. get.initialMetadata.map {
  70. $0.contains(name: "grpc-encoding")
  71. }.assertEqual(false, fulfill: initialMetadata)
  72. let status = self.expectation(description: "received status")
  73. get.status.map {
  74. $0.code
  75. }.assertEqual(.ok, fulfill: status)
  76. self.wait(for: [initialMetadata, status], timeout: self.defaultTimeout)
  77. }
  78. func testUncompressedRequestsCompressedResponses() throws {
  79. try self.setupServer(encoding: .enabled(.init(decompressionLimit: .ratio(10))))
  80. self.setupClient(encoding: .enabled(.init(
  81. forRequests: .none,
  82. acceptableForResponses: [.deflate, .gzip],
  83. decompressionLimit: .ratio(10)
  84. )))
  85. let get = self.echo.get(.with { $0.text = "foo" })
  86. let initialMetadata = self.expectation(description: "received initial metadata")
  87. get.initialMetadata.map {
  88. $0.first(name: "grpc-encoding")
  89. }.assertEqual("deflate", fulfill: initialMetadata)
  90. let status = self.expectation(description: "received status")
  91. get.status.map {
  92. $0.code
  93. }.assertEqual(.ok, fulfill: status)
  94. self.wait(for: [initialMetadata, status], timeout: self.defaultTimeout)
  95. }
  96. func testServerCanDecompressNonAdvertisedButSupportedCompression() throws {
  97. // Server should be able to decompress a format it supports but does not advertise. In doing
  98. // so it must also return a "grpc-accept-encoding" header which includes the value it did not
  99. // advertise.
  100. try self
  101. .setupServer(encoding: .enabled(.init(
  102. enabledAlgorithms: [.gzip],
  103. decompressionLimit: .ratio(10)
  104. )))
  105. self
  106. .setupClient(encoding: .enabled(.init(
  107. forRequests: .deflate,
  108. acceptableForResponses: [],
  109. decompressionLimit: .ratio(10)
  110. )))
  111. let get = self.echo.get(.with { $0.text = "foo" })
  112. let initialMetadata = self.expectation(description: "received initial metadata")
  113. get.initialMetadata.map {
  114. $0[canonicalForm: "grpc-accept-encoding"]
  115. }.assertEqual(["gzip", "deflate"], fulfill: initialMetadata)
  116. let status = self.expectation(description: "received status")
  117. get.status.map {
  118. $0.code
  119. }.assertEqual(.ok, fulfill: status)
  120. self.wait(for: [initialMetadata, status], timeout: self.defaultTimeout)
  121. }
  122. func testServerCompressesResponseWithDifferentAlgorithmToRequest() throws {
  123. // Server should be able to compress responses with a different method to the client, providing
  124. // the client supports it.
  125. try self
  126. .setupServer(encoding: .enabled(.init(
  127. enabledAlgorithms: [.gzip],
  128. decompressionLimit: .ratio(10)
  129. )))
  130. self.setupClient(encoding: .enabled(.init(
  131. forRequests: .deflate,
  132. acceptableForResponses: [.deflate, .gzip],
  133. decompressionLimit: .ratio(10)
  134. )))
  135. let get = self.echo.get(.with { $0.text = "foo" })
  136. let initialMetadata = self.expectation(description: "received initial metadata")
  137. get.initialMetadata.map {
  138. $0.first(name: "grpc-encoding")
  139. }.assertEqual("gzip", fulfill: initialMetadata)
  140. let status = self.expectation(description: "received status")
  141. get.status.map {
  142. $0.code
  143. }.assertEqual(.ok, fulfill: status)
  144. self.wait(for: [initialMetadata, status], timeout: self.defaultTimeout)
  145. }
  146. func testCompressedRequestWithCompressionNotSupportedOnServer() throws {
  147. try self
  148. .setupServer(encoding: .enabled(.init(
  149. enabledAlgorithms: [.gzip, .deflate],
  150. decompressionLimit: .ratio(10)
  151. )))
  152. // We can't specify a compression we don't support, so we'll specify no compression and then
  153. // send a 'grpc-encoding' with our initial metadata.
  154. self.setupClient(encoding: .enabled(.init(
  155. forRequests: .none,
  156. acceptableForResponses: [.deflate, .gzip],
  157. decompressionLimit: .ratio(10)
  158. )))
  159. let headers: HPACKHeaders = ["grpc-encoding": "you-don't-support-this"]
  160. let get = self.echo.get(
  161. .with { $0.text = "foo" },
  162. callOptions: CallOptions(customMetadata: headers)
  163. )
  164. let response = self.expectation(description: "received response")
  165. get.response.assertError(fulfill: response)
  166. let trailers = self.expectation(description: "received trailing metadata")
  167. get.trailingMetadata.map {
  168. $0[canonicalForm: "grpc-accept-encoding"]
  169. }.assertEqual(["gzip", "deflate"], fulfill: trailers)
  170. let status = self.expectation(description: "received status")
  171. get.status.map {
  172. $0.code
  173. }.assertEqual(.unimplemented, fulfill: status)
  174. self.wait(for: [response, trailers, status], timeout: self.defaultTimeout)
  175. }
  176. func testDecompressionLimitIsRespectedByServerForUnaryCall() throws {
  177. try self.setupServer(encoding: .enabled(.init(decompressionLimit: .absolute(1))))
  178. self
  179. .setupClient(encoding: .enabled(.init(
  180. forRequests: .gzip,
  181. decompressionLimit: .absolute(1024)
  182. )))
  183. let get = self.echo.get(.with { $0.text = "foo" })
  184. let status = self.expectation(description: "received status")
  185. get.status.map {
  186. $0.code
  187. }.assertEqual(.resourceExhausted, fulfill: status)
  188. self.wait(for: [status], timeout: self.defaultTimeout)
  189. }
  190. func testDecompressionLimitIsRespectedByServerForStreamingCall() throws {
  191. try self.setupServer(encoding: .enabled(.init(decompressionLimit: .absolute(1024))))
  192. self
  193. .setupClient(encoding: .enabled(.init(
  194. forRequests: .gzip,
  195. decompressionLimit: .absolute(2048)
  196. )))
  197. let collect = self.echo.collect()
  198. let status = self.expectation(description: "received status")
  199. // Smaller than limit.
  200. collect.sendMessage(.with { $0.text = "foo" }, promise: nil)
  201. // Should be just over the limit.
  202. collect.sendMessage(.with { $0.text = String(repeating: "x", count: 1024) }, promise: nil)
  203. collect.sendEnd(promise: nil)
  204. collect.status.map {
  205. $0.code
  206. }.assertEqual(.resourceExhausted, fulfill: status)
  207. self.wait(for: [status], timeout: self.defaultTimeout)
  208. }
  209. func testDecompressionLimitIsRespectedByClientForUnaryCall() throws {
  210. try self
  211. .setupServer(encoding: .enabled(.init(
  212. enabledAlgorithms: [.gzip],
  213. decompressionLimit: .absolute(1024)
  214. )))
  215. self.setupClient(encoding: .enabled(.responsesOnly(decompressionLimit: .absolute(1))))
  216. let get = self.echo.get(.with { $0.text = "foo" })
  217. let status = self.expectation(description: "received status")
  218. get.status.map {
  219. $0.code
  220. }.assertEqual(.resourceExhausted, fulfill: status)
  221. self.wait(for: [status], timeout: self.defaultTimeout)
  222. }
  223. func testDecompressionLimitIsRespectedByClientForStreamingCall() throws {
  224. try self.setupServer(encoding: .enabled(.init(decompressionLimit: .absolute(2048))))
  225. self.setupClient(
  226. encoding: .enabled(.init(forRequests: .gzip, decompressionLimit: .absolute(1024)))
  227. )
  228. let responsePromise = self.group.next().makePromise(of: Echo_EchoResponse.self)
  229. let lock = NIOLock()
  230. var responseCount = 0
  231. let update = self.echo.update {
  232. lock.withLock {
  233. responseCount += 1
  234. }
  235. responsePromise.succeed($0)
  236. }
  237. let status = self.expectation(description: "received status")
  238. // Smaller than limit.
  239. update.sendMessage(.with { $0.text = "foo" }, promise: nil)
  240. XCTAssertNoThrow(try responsePromise.futureResult.wait())
  241. // Should be just over the limit.
  242. update.sendMessage(.with { $0.text = String(repeating: "x", count: 1024) }, promise: nil)
  243. update.sendEnd(promise: nil)
  244. update.status.map {
  245. $0.code
  246. }.assertEqual(.resourceExhausted, fulfill: status)
  247. self.wait(for: [status], timeout: self.defaultTimeout)
  248. let receivedResponses = lock.withLock { responseCount }
  249. XCTAssertEqual(receivedResponses, 1)
  250. }
  251. func testIdentityCompressionIsntCompression() throws {
  252. // The client offers "identity" compression, the server doesn't support compression. We should
  253. // tolerate this, as "identity" is no compression at all.
  254. try self
  255. .setupServer(encoding: .disabled)
  256. // We can't specify a compression we don't support, like identity, so we'll specify no compression and then
  257. // send a 'grpc-encoding' with our initial metadata.
  258. self.setupClient(encoding: .disabled)
  259. let headers: HPACKHeaders = ["grpc-encoding": "identity"]
  260. let get = self.echo.get(
  261. .with { $0.text = "foo" },
  262. callOptions: CallOptions(customMetadata: headers)
  263. )
  264. let initialMetadata = self.expectation(description: "received initial metadata")
  265. get.initialMetadata.map {
  266. $0.contains(name: "grpc-encoding")
  267. }.assertEqual(false, fulfill: initialMetadata)
  268. let status = self.expectation(description: "received status")
  269. get.status.map {
  270. $0.code
  271. }.assertEqual(.ok, fulfill: status)
  272. self.wait(for: [initialMetadata, status], timeout: self.defaultTimeout)
  273. }
  274. func testCompressedRequestWithDisabledServerCompressionAndUnknownCompressionAlgorithm() throws {
  275. try self.setupServer(encoding: .disabled)
  276. // We can't specify a compression we don't support, so we'll specify no compression and then
  277. // send a 'grpc-encoding' with our initial metadata.
  278. self.setupClient(encoding: .enabled(.init(
  279. forRequests: .none,
  280. acceptableForResponses: [.deflate, .gzip],
  281. decompressionLimit: .ratio(10)
  282. )))
  283. let headers: HPACKHeaders = ["grpc-encoding": "you-don't-support-this"]
  284. let get = self.echo.get(
  285. .with { $0.text = "foo" },
  286. callOptions: CallOptions(customMetadata: headers)
  287. )
  288. let response = self.expectation(description: "received response")
  289. get.response.assertError(fulfill: response)
  290. let trailers = self.expectation(description: "received trailing metadata")
  291. get.trailingMetadata.map {
  292. $0.contains(name: "grpc-accept-encoding")
  293. }.assertEqual(false, fulfill: trailers)
  294. let status = self.expectation(description: "received status")
  295. get.status.map {
  296. $0.code
  297. }.assertEqual(.unimplemented, fulfill: status)
  298. self.wait(for: [response, trailers, status], timeout: self.defaultTimeout)
  299. }
  300. }