AsyncClientTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. /*
  2. * Copyright 2022, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import NIOCore
  21. import NIOPosix
  22. import XCTest
  23. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  24. final class AsyncClientCancellationTests: GRPCTestCase {
  25. private var server: Server!
  26. private var group: EventLoopGroup!
  27. private var pool: GRPCChannel!
  28. override func setUp() {
  29. super.setUp()
  30. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  31. }
  32. override func tearDown() async throws {
  33. if self.pool != nil {
  34. try self.pool.close().wait()
  35. self.pool = nil
  36. }
  37. if self.server != nil {
  38. try self.server.close().wait()
  39. self.server = nil
  40. }
  41. try self.group.syncShutdownGracefully()
  42. self.group = nil
  43. try await super.tearDown()
  44. }
  45. private func startServer(service: CallHandlerProvider) throws {
  46. precondition(self.server == nil)
  47. self.server = try Server.insecure(group: self.group)
  48. .withServiceProviders([service])
  49. .withLogger(self.serverLogger)
  50. .bind(host: "127.0.0.1", port: 0)
  51. .wait()
  52. }
  53. private func startServerAndClient(service: CallHandlerProvider) throws -> Echo_EchoAsyncClient {
  54. try self.startServer(service: service)
  55. return try self.makeClient(port: self.server.channel.localAddress!.port!)
  56. }
  57. private func makeClient(
  58. port: Int,
  59. configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in }
  60. ) throws -> Echo_EchoAsyncClient {
  61. precondition(self.pool == nil)
  62. self.pool = try GRPCChannelPool.with(
  63. target: .host("127.0.0.1", port: port),
  64. transportSecurity: .plaintext,
  65. eventLoopGroup: self.group
  66. ) {
  67. $0.backgroundActivityLogger = self.clientLogger
  68. configure(&$0)
  69. }
  70. return Echo_EchoAsyncClient(channel: self.pool)
  71. }
  72. func testCancelUnaryFailsResponse() async throws {
  73. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  74. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  75. let get = echo.makeGetCall(.with { $0.text = "foo bar baz" })
  76. get.cancel()
  77. do {
  78. _ = try await get.response
  79. XCTFail("Expected to throw a status with code .cancelled")
  80. } catch let status as GRPCStatus {
  81. XCTAssertEqual(status.code, .cancelled)
  82. } catch {
  83. XCTFail("Expected to throw a status with code .cancelled")
  84. }
  85. // Status should be 'cancelled'.
  86. let status = await get.status
  87. XCTAssertEqual(status.code, .cancelled)
  88. }
  89. func testCancelFailsUnaryResponseForWrappedCall() async throws {
  90. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  91. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  92. let task = Task {
  93. try await echo.get(.with { $0.text = "I'll be cancelled" })
  94. }
  95. task.cancel()
  96. do {
  97. _ = try await task.value
  98. XCTFail("Expected to throw a status with code .cancelled")
  99. } catch let status as GRPCStatus {
  100. XCTAssertEqual(status.code, .cancelled)
  101. } catch {
  102. XCTFail("Expected to throw a status with code .cancelled")
  103. }
  104. }
  105. func testCancelServerStreamingClosesResponseStream() async throws {
  106. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  107. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  108. let expand = echo.makeExpandCall(.with { $0.text = "foo bar baz" })
  109. expand.cancel()
  110. var responseStream = expand.responseStream.makeAsyncIterator()
  111. do {
  112. _ = try await responseStream.next()
  113. XCTFail("Expected to throw a status with code .cancelled")
  114. } catch let status as GRPCStatus {
  115. XCTAssertEqual(status.code, .cancelled)
  116. } catch {
  117. XCTFail("Expected to throw a status with code .cancelled")
  118. }
  119. // Status should be 'cancelled'.
  120. let status = await expand.status
  121. XCTAssertEqual(status.code, .cancelled)
  122. }
  123. func testCancelServerStreamingClosesResponseStreamForWrappedCall() async throws {
  124. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  125. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  126. let task = Task {
  127. let responseStream = echo.expand(.with { $0.text = "foo bar baz" })
  128. var responseIterator = responseStream.makeAsyncIterator()
  129. do {
  130. _ = try await responseIterator.next()
  131. XCTFail("Expected to throw a status with code .cancelled")
  132. } catch let status as GRPCStatus {
  133. XCTAssertEqual(status.code, .cancelled)
  134. } catch {
  135. XCTFail("Expected to throw a status with code .cancelled")
  136. }
  137. }
  138. task.cancel()
  139. await task.value
  140. }
  141. func testCancelClientStreamingClosesRequestStreamAndFailsResponse() async throws {
  142. let echo = try self.startServerAndClient(service: EchoProvider())
  143. let collect = echo.makeCollectCall()
  144. // Make sure the stream is up before we cancel it.
  145. try await collect.requestStream.send(.with { $0.text = "foo" })
  146. collect.cancel()
  147. // Cancellation is async so loop until we error.
  148. while true {
  149. do {
  150. try await collect.requestStream.send(.with { $0.text = "foo" })
  151. try await Task.sleep(nanoseconds: 1000)
  152. } catch {
  153. break
  154. }
  155. }
  156. // There should be no response.
  157. await XCTAssertThrowsError(try await collect.response)
  158. // Status should be 'cancelled'.
  159. let status = await collect.status
  160. XCTAssertEqual(status.code, .cancelled)
  161. }
  162. func testCancelClientStreamingClosesRequestStreamAndFailsResponseForWrappedCall() async throws {
  163. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  164. let requests = (0 ..< 10).map { i in
  165. Echo_EchoRequest.with {
  166. $0.text = String(i)
  167. }
  168. }
  169. let task = Task {
  170. do {
  171. let _ = try await echo.collect(requests)
  172. XCTFail("Expected to throw a status with code .cancelled")
  173. } catch let status as GRPCStatus {
  174. XCTAssertEqual(status.code, .cancelled)
  175. } catch {
  176. XCTFail("Expected to throw a status with code .cancelled")
  177. }
  178. }
  179. task.cancel()
  180. await task.value
  181. }
  182. func testClientStreamingClosesRequestStreamOnEnd() async throws {
  183. let echo = try self.startServerAndClient(service: EchoProvider())
  184. let collect = echo.makeCollectCall()
  185. // Send and close.
  186. try await collect.requestStream.send(.with { $0.text = "foo" })
  187. try await collect.requestStream.finish()
  188. // Await the response and status.
  189. _ = try await collect.response
  190. let status = await collect.status
  191. XCTAssert(status.isOk)
  192. // Sending should fail.
  193. await XCTAssertThrowsError(
  194. try await collect.requestStream.send(.with { $0.text = "should throw" })
  195. )
  196. }
  197. func testCancelBidiStreamingClosesRequestStreamAndResponseStream() async throws {
  198. let echo = try self.startServerAndClient(service: EchoProvider())
  199. let update = echo.makeUpdateCall()
  200. // Make sure the stream is up before we cancel it.
  201. try await update.requestStream.send(.with { $0.text = "foo" })
  202. // Wait for the response.
  203. var responseStream = update.responseStream.makeAsyncIterator()
  204. _ = try await responseStream.next()
  205. update.cancel()
  206. // Cancellation is async so loop until we error.
  207. while true {
  208. do {
  209. try await update.requestStream.send(.with { $0.text = "foo" })
  210. try await Task.sleep(nanoseconds: 1000)
  211. } catch {
  212. break
  213. }
  214. }
  215. // Status should be 'cancelled'.
  216. let status = await update.status
  217. XCTAssertEqual(status.code, .cancelled)
  218. }
  219. func testCancelBidiStreamingClosesRequestStreamAndResponseStreamForWrappedCall() async throws {
  220. let echo = try self.startServerAndClient(service: EchoProvider())
  221. let requests = (0 ..< 10).map { i in
  222. Echo_EchoRequest.with {
  223. $0.text = String(i)
  224. }
  225. }
  226. let task = Task {
  227. let responseStream = echo.update(requests)
  228. var responseIterator = responseStream.makeAsyncIterator()
  229. do {
  230. _ = try await responseIterator.next()
  231. XCTFail("Expected to throw a status with code .cancelled")
  232. } catch let status as GRPCStatus {
  233. XCTAssertEqual(status.code, .cancelled)
  234. } catch {
  235. XCTFail("Expected to throw a status with code .cancelled")
  236. }
  237. }
  238. task.cancel()
  239. await task.value
  240. }
  241. func testBidiStreamingClosesRequestStreamOnEnd() async throws {
  242. let echo = try self.startServerAndClient(service: EchoProvider())
  243. let update = echo.makeUpdateCall()
  244. // Send and close.
  245. try await update.requestStream.send(.with { $0.text = "foo" })
  246. try await update.requestStream.finish()
  247. // Await the response and status.
  248. let responseCount = try await update.responseStream.count()
  249. XCTAssertEqual(responseCount, 1)
  250. let status = await update.status
  251. XCTAssert(status.isOk)
  252. // Sending should fail.
  253. await XCTAssertThrowsError(
  254. try await update.requestStream.send(.with { $0.text = "should throw" })
  255. )
  256. }
  257. private enum RequestStreamingRPC {
  258. typealias Request = Echo_EchoRequest
  259. typealias Response = Echo_EchoResponse
  260. case clientStreaming(GRPCAsyncClientStreamingCall<Request, Response>)
  261. case bidirectionalStreaming(GRPCAsyncBidirectionalStreamingCall<Request, Response>)
  262. func sendRequest(_ text: String) async throws {
  263. switch self {
  264. case let .clientStreaming(call):
  265. try await call.requestStream.send(.with { $0.text = text })
  266. case let .bidirectionalStreaming(call):
  267. try await call.requestStream.send(.with { $0.text = text })
  268. }
  269. }
  270. func cancel() {
  271. switch self {
  272. case let .clientStreaming(call):
  273. call.cancel()
  274. case let .bidirectionalStreaming(call):
  275. call.cancel()
  276. }
  277. }
  278. }
  279. private func testSendingRequestsSuspendsWhileStreamIsNotReady(
  280. makeRPC: @escaping () -> RequestStreamingRPC
  281. ) async throws {
  282. // The strategy for this test is to race two different tasks. The first will attempt to send a
  283. // message on a request stream on a connection which will never establish. The second will sleep
  284. // for a little while. Each task returns a `SendOrTimedOut` event. If the message is sent then
  285. // the test definitely failed; it should not be possible to send a message on a stream which is
  286. // not open. If the time out happens first then it probably did not fail.
  287. enum SentOrTimedOut: Equatable, Sendable {
  288. case messageSent
  289. case timedOut
  290. }
  291. await withThrowingTaskGroup(of: SentOrTimedOut.self) { group in
  292. group.addTask {
  293. let rpc = makeRPC()
  294. return try await withTaskCancellationHandler {
  295. // This should suspend until we cancel it: we're never going to start a server so it
  296. // should never succeed.
  297. try await rpc.sendRequest("I should suspend")
  298. return .messageSent
  299. } onCancel: {
  300. rpc.cancel()
  301. }
  302. }
  303. group.addTask {
  304. // Wait for 100ms.
  305. try await Task.sleep(nanoseconds: 100_000_000)
  306. return .timedOut
  307. }
  308. do {
  309. let event = try await group.next()
  310. // If this isn't timed out then the message was sent before the stream was ready.
  311. XCTAssertEqual(event, .timedOut)
  312. } catch {
  313. XCTFail("Unexpected error \(error)")
  314. }
  315. // Cancel the other task.
  316. group.cancelAll()
  317. }
  318. }
  319. func testClientStreamingSuspendsWritesUntilStreamIsUp() async throws {
  320. // Make a client for a server which isn't up yet. It will continually fail to establish a
  321. // connection.
  322. let echo = try self.makeClient(port: 0)
  323. try await self.testSendingRequestsSuspendsWhileStreamIsNotReady {
  324. return .clientStreaming(echo.makeCollectCall())
  325. }
  326. }
  327. func testBidirectionalStreamingSuspendsWritesUntilStreamIsUp() async throws {
  328. // Make a client for a server which isn't up yet. It will continually fail to establish a
  329. // connection.
  330. let echo = try self.makeClient(port: 0)
  331. try await self.testSendingRequestsSuspendsWhileStreamIsNotReady {
  332. return .bidirectionalStreaming(echo.makeUpdateCall())
  333. }
  334. }
  335. func testConnectionFailureCancelsRequestStreamWithError() async throws {
  336. let echo = try self.makeClient(port: 0) {
  337. // Configure a short wait time; we will not start a server so fail quickly.
  338. $0.connectionPool.maxWaitTime = .milliseconds(10)
  339. }
  340. let update = echo.makeUpdateCall()
  341. await XCTAssertThrowsError(try await update.requestStream.send(.init())) { error in
  342. XCTAssertFalse(error is CancellationError)
  343. }
  344. let collect = echo.makeCollectCall()
  345. await XCTAssertThrowsError(try await collect.requestStream.send(.init())) { error in
  346. XCTAssertFalse(error is CancellationError)
  347. }
  348. }
  349. }
  350. #endif // compiler(>=5.6)