AsyncClientTests.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. * Copyright 2022, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import NIOCore
  21. import NIOPosix
  22. import XCTest
  23. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  24. final class AsyncClientCancellationTests: GRPCTestCase {
  25. private var server: Server!
  26. private var group: EventLoopGroup!
  27. private var pool: GRPCChannel!
  28. override func setUp() {
  29. super.setUp()
  30. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  31. }
  32. override func tearDown() async throws {
  33. if self.pool != nil {
  34. try self.pool.close().wait()
  35. self.pool = nil
  36. }
  37. if self.server != nil {
  38. try self.server.close().wait()
  39. self.server = nil
  40. }
  41. try self.group.syncShutdownGracefully()
  42. self.group = nil
  43. try await super.tearDown()
  44. }
  45. private func startServer(service: CallHandlerProvider) throws {
  46. precondition(self.server == nil)
  47. self.server = try Server.insecure(group: self.group)
  48. .withServiceProviders([service])
  49. .withLogger(self.serverLogger)
  50. .bind(host: "127.0.0.1", port: 0)
  51. .wait()
  52. }
  53. private func startServerAndClient(service: CallHandlerProvider) throws -> Echo_EchoAsyncClient {
  54. try self.startServer(service: service)
  55. return try self.makeClient(port: self.server.channel.localAddress!.port!)
  56. }
  57. private func makeClient(port: Int) throws -> Echo_EchoAsyncClient {
  58. precondition(self.pool == nil)
  59. self.pool = try GRPCChannelPool.with(
  60. target: .host("127.0.0.1", port: port),
  61. transportSecurity: .plaintext,
  62. eventLoopGroup: self.group
  63. ) {
  64. $0.backgroundActivityLogger = self.clientLogger
  65. }
  66. return Echo_EchoAsyncClient(channel: self.pool)
  67. }
  68. func testCancelUnaryFailsResponse() async throws {
  69. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  70. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  71. let get = echo.makeGetCall(.with { $0.text = "foo bar baz" })
  72. get.cancel()
  73. do {
  74. _ = try await get.response
  75. XCTFail("Expected to throw a status with code .cancelled")
  76. } catch let status as GRPCStatus {
  77. XCTAssertEqual(status.code, .cancelled)
  78. } catch {
  79. XCTFail("Expected to throw a status with code .cancelled")
  80. }
  81. // Status should be 'cancelled'.
  82. let status = await get.status
  83. XCTAssertEqual(status.code, .cancelled)
  84. }
  85. func testCancelFailsUnaryResponseForWrappedCall() async throws {
  86. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  87. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  88. let task = Task {
  89. try await echo.get(.with { $0.text = "I'll be cancelled" })
  90. }
  91. task.cancel()
  92. do {
  93. _ = try await task.value
  94. XCTFail("Expected to throw a status with code .cancelled")
  95. } catch let status as GRPCStatus {
  96. XCTAssertEqual(status.code, .cancelled)
  97. } catch {
  98. XCTFail("Expected to throw a status with code .cancelled")
  99. }
  100. }
  101. func testCancelServerStreamingClosesResponseStream() async throws {
  102. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  103. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  104. let expand = echo.makeExpandCall(.with { $0.text = "foo bar baz" })
  105. expand.cancel()
  106. var responseStream = expand.responseStream.makeAsyncIterator()
  107. do {
  108. _ = try await responseStream.next()
  109. XCTFail("Expected to throw a status with code .cancelled")
  110. } catch let status as GRPCStatus {
  111. XCTAssertEqual(status.code, .cancelled)
  112. } catch {
  113. XCTFail("Expected to throw a status with code .cancelled")
  114. }
  115. // Status should be 'cancelled'.
  116. let status = await expand.status
  117. XCTAssertEqual(status.code, .cancelled)
  118. }
  119. func testCancelServerStreamingClosesResponseStreamForWrappedCall() async throws {
  120. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  121. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  122. let task = Task {
  123. let responseStream = echo.expand(.with { $0.text = "foo bar baz" })
  124. var responseIterator = responseStream.makeAsyncIterator()
  125. do {
  126. _ = try await responseIterator.next()
  127. XCTFail("Expected to throw a status with code .cancelled")
  128. } catch let status as GRPCStatus {
  129. XCTAssertEqual(status.code, .cancelled)
  130. } catch {
  131. XCTFail("Expected to throw a status with code .cancelled")
  132. }
  133. }
  134. task.cancel()
  135. await task.value
  136. }
  137. func testCancelClientStreamingClosesRequestStreamAndFailsResponse() async throws {
  138. let echo = try self.startServerAndClient(service: EchoProvider())
  139. let collect = echo.makeCollectCall()
  140. // Make sure the stream is up before we cancel it.
  141. try await collect.requestStream.send(.with { $0.text = "foo" })
  142. collect.cancel()
  143. // Cancellation is async so loop until we error.
  144. while true {
  145. do {
  146. try await collect.requestStream.send(.with { $0.text = "foo" })
  147. try await Task.sleep(nanoseconds: 1000)
  148. } catch {
  149. break
  150. }
  151. }
  152. // There should be no response.
  153. await XCTAssertThrowsError(try await collect.response)
  154. // Status should be 'cancelled'.
  155. let status = await collect.status
  156. XCTAssertEqual(status.code, .cancelled)
  157. }
  158. func testCancelClientStreamingClosesRequestStreamAndFailsResponseForWrappedCall() async throws {
  159. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  160. let requests = (0 ..< 10).map { i in
  161. Echo_EchoRequest.with {
  162. $0.text = String(i)
  163. }
  164. }
  165. let task = Task {
  166. do {
  167. let _ = try await echo.collect(requests)
  168. XCTFail("Expected to throw a status with code .cancelled")
  169. } catch let status as GRPCStatus {
  170. XCTAssertEqual(status.code, .cancelled)
  171. } catch {
  172. XCTFail("Expected to throw a status with code .cancelled")
  173. }
  174. }
  175. task.cancel()
  176. await task.value
  177. }
  178. func testClientStreamingClosesRequestStreamOnEnd() async throws {
  179. let echo = try self.startServerAndClient(service: EchoProvider())
  180. let collect = echo.makeCollectCall()
  181. // Send and close.
  182. try await collect.requestStream.send(.with { $0.text = "foo" })
  183. try await collect.requestStream.finish()
  184. // Await the response and status.
  185. _ = try await collect.response
  186. let status = await collect.status
  187. XCTAssert(status.isOk)
  188. // Sending should fail.
  189. await XCTAssertThrowsError(
  190. try await collect.requestStream.send(.with { $0.text = "should throw" })
  191. )
  192. }
  193. func testCancelBidiStreamingClosesRequestStreamAndResponseStream() async throws {
  194. let echo = try self.startServerAndClient(service: EchoProvider())
  195. let update = echo.makeUpdateCall()
  196. // Make sure the stream is up before we cancel it.
  197. try await update.requestStream.send(.with { $0.text = "foo" })
  198. // Wait for the response.
  199. var responseStream = update.responseStream.makeAsyncIterator()
  200. _ = try await responseStream.next()
  201. update.cancel()
  202. // Cancellation is async so loop until we error.
  203. while true {
  204. do {
  205. try await update.requestStream.send(.with { $0.text = "foo" })
  206. try await Task.sleep(nanoseconds: 1000)
  207. } catch {
  208. break
  209. }
  210. }
  211. // Status should be 'cancelled'.
  212. let status = await update.status
  213. XCTAssertEqual(status.code, .cancelled)
  214. }
  215. func testCancelBidiStreamingClosesRequestStreamAndResponseStreamForWrappedCall() async throws {
  216. let echo = try self.startServerAndClient(service: EchoProvider())
  217. let requests = (0 ..< 10).map { i in
  218. Echo_EchoRequest.with {
  219. $0.text = String(i)
  220. }
  221. }
  222. let task = Task {
  223. let responseStream = echo.update(requests)
  224. var responseIterator = responseStream.makeAsyncIterator()
  225. do {
  226. _ = try await responseIterator.next()
  227. XCTFail("Expected to throw a status with code .cancelled")
  228. } catch let status as GRPCStatus {
  229. XCTAssertEqual(status.code, .cancelled)
  230. } catch {
  231. XCTFail("Expected to throw a status with code .cancelled")
  232. }
  233. }
  234. task.cancel()
  235. await task.value
  236. }
  237. func testBidiStreamingClosesRequestStreamOnEnd() async throws {
  238. let echo = try self.startServerAndClient(service: EchoProvider())
  239. let update = echo.makeUpdateCall()
  240. // Send and close.
  241. try await update.requestStream.send(.with { $0.text = "foo" })
  242. try await update.requestStream.finish()
  243. // Await the response and status.
  244. let responseCount = try await update.responseStream.count()
  245. XCTAssertEqual(responseCount, 1)
  246. let status = await update.status
  247. XCTAssert(status.isOk)
  248. // Sending should fail.
  249. await XCTAssertThrowsError(
  250. try await update.requestStream.send(.with { $0.text = "should throw" })
  251. )
  252. }
  253. private enum RequestStreamingRPC {
  254. typealias Request = Echo_EchoRequest
  255. typealias Response = Echo_EchoResponse
  256. case clientStreaming(GRPCAsyncClientStreamingCall<Request, Response>)
  257. case bidirectionalStreaming(GRPCAsyncBidirectionalStreamingCall<Request, Response>)
  258. func sendRequest(_ text: String) async throws {
  259. switch self {
  260. case let .clientStreaming(call):
  261. try await call.requestStream.send(.with { $0.text = text })
  262. case let .bidirectionalStreaming(call):
  263. try await call.requestStream.send(.with { $0.text = text })
  264. }
  265. }
  266. func cancel() {
  267. switch self {
  268. case let .clientStreaming(call):
  269. call.cancel()
  270. case let .bidirectionalStreaming(call):
  271. call.cancel()
  272. }
  273. }
  274. }
  275. private func testSendingRequestsSuspendsWhileStreamIsNotReady(
  276. makeRPC: @escaping () -> RequestStreamingRPC
  277. ) async throws {
  278. // The strategy for this test is to race two different tasks. The first will attempt to send a
  279. // message on a request stream on a connection which will never establish. The second will sleep
  280. // for a little while. Each task returns a `SendOrTimedOut` event. If the message is sent then
  281. // the test definitely failed; it should not be possible to send a message on a stream which is
  282. // not open. If the time out happens first then it probably did not fail.
  283. enum SentOrTimedOut: Equatable, Sendable {
  284. case messageSent
  285. case timedOut
  286. }
  287. await withThrowingTaskGroup(of: SentOrTimedOut.self) { group in
  288. group.addTask {
  289. let rpc = makeRPC()
  290. return try await withTaskCancellationHandler {
  291. // This should suspend until we cancel it: we're never going to start a server so it
  292. // should never succeed.
  293. try await rpc.sendRequest("I should suspend")
  294. return .messageSent
  295. } onCancel: {
  296. rpc.cancel()
  297. }
  298. }
  299. group.addTask {
  300. // Wait for 100ms.
  301. try await Task.sleep(nanoseconds: 100_000_000)
  302. return .timedOut
  303. }
  304. do {
  305. let event = try await group.next()
  306. // If this isn't timed out then the message was sent before the stream was ready.
  307. XCTAssertEqual(event, .timedOut)
  308. } catch {
  309. XCTFail("Unexpected error \(error)")
  310. }
  311. // Cancel the other task.
  312. group.cancelAll()
  313. }
  314. }
  315. func testClientStreamingSuspendsWritesUntilStreamIsUp() async throws {
  316. // Make a client for a server which isn't up yet. It will continually fail to establish a
  317. // connection.
  318. let echo = try self.makeClient(port: 0)
  319. try await self.testSendingRequestsSuspendsWhileStreamIsNotReady {
  320. return .clientStreaming(echo.makeCollectCall())
  321. }
  322. }
  323. func testBidirectionalStreamingSuspendsWritesUntilStreamIsUp() async throws {
  324. // Make a client for a server which isn't up yet. It will continually fail to establish a
  325. // connection.
  326. let echo = try self.makeClient(port: 0)
  327. try await self.testSendingRequestsSuspendsWhileStreamIsNotReady {
  328. return .bidirectionalStreaming(echo.makeUpdateCall())
  329. }
  330. }
  331. }
  332. #endif // compiler(>=5.6)