AsyncClientTests.swift 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. /*
  2. * Copyright 2022, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. import EchoImplementation
  18. import EchoModel
  19. import GRPC
  20. import NIOCore
  21. import NIOPosix
  22. import XCTest
  23. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  24. final class AsyncClientCancellationTests: GRPCTestCase {
  25. private var server: Server!
  26. private var group: EventLoopGroup!
  27. private var pool: GRPCChannel!
  28. override func setUp() {
  29. super.setUp()
  30. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  31. }
  32. override func tearDown() async throws {
  33. if self.pool != nil {
  34. try self.pool.close().wait()
  35. self.pool = nil
  36. }
  37. if self.server != nil {
  38. try self.server.close().wait()
  39. self.server = nil
  40. }
  41. try self.group.syncShutdownGracefully()
  42. self.group = nil
  43. try await super.tearDown()
  44. }
  45. private func startServer(service: CallHandlerProvider) throws {
  46. precondition(self.server == nil)
  47. self.server = try Server.insecure(group: self.group)
  48. .withServiceProviders([service])
  49. .withLogger(self.serverLogger)
  50. .bind(host: "127.0.0.1", port: 0)
  51. .wait()
  52. }
  53. private func startServerAndClient(service: CallHandlerProvider) throws -> Echo_EchoAsyncClient {
  54. try self.startServer(service: service)
  55. return try self.makeClient(port: self.server.channel.localAddress!.port!)
  56. }
  57. private func makeClient(port: Int) throws -> Echo_EchoAsyncClient {
  58. precondition(self.pool == nil)
  59. self.pool = try GRPCChannelPool.with(
  60. target: .host("127.0.0.1", port: port),
  61. transportSecurity: .plaintext,
  62. eventLoopGroup: self.group
  63. ) {
  64. $0.backgroundActivityLogger = self.clientLogger
  65. }
  66. return Echo_EchoAsyncClient(channel: self.pool)
  67. }
  68. func testCancelUnaryFailsResponse() async throws {
  69. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  70. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  71. let get = echo.makeGetCall(.with { $0.text = "foo bar baz" })
  72. try await get.cancel()
  73. await XCTAssertThrowsError(try await get.response)
  74. // Status should be 'cancelled'.
  75. let status = await get.status
  76. XCTAssertEqual(status.code, .cancelled)
  77. }
  78. func testCancelServerStreamingClosesResponseStream() async throws {
  79. // We don't want the RPC to complete before we cancel it so use the never resolving service.
  80. let echo = try self.startServerAndClient(service: NeverResolvingEchoProvider())
  81. let expand = echo.makeExpandCall(.with { $0.text = "foo bar baz" })
  82. try await expand.cancel()
  83. var responseStream = expand.responseStream.makeAsyncIterator()
  84. await XCTAssertThrowsError(try await responseStream.next())
  85. // Status should be 'cancelled'.
  86. let status = await expand.status
  87. XCTAssertEqual(status.code, .cancelled)
  88. }
  89. func testCancelClientStreamingClosesRequestStreamAndFailsResponse() async throws {
  90. let echo = try self.startServerAndClient(service: EchoProvider())
  91. let collect = echo.makeCollectCall()
  92. // Make sure the stream is up before we cancel it.
  93. try await collect.requestStream.send(.with { $0.text = "foo" })
  94. try await collect.cancel()
  95. // The next send should fail.
  96. await XCTAssertThrowsError(try await collect.requestStream.send(.with { $0.text = "foo" }))
  97. // There should be no response.
  98. await XCTAssertThrowsError(try await collect.response)
  99. // Status should be 'cancelled'.
  100. let status = await collect.status
  101. XCTAssertEqual(status.code, .cancelled)
  102. }
  103. func testClientStreamingClosesRequestStreamOnEnd() async throws {
  104. let echo = try self.startServerAndClient(service: EchoProvider())
  105. let collect = echo.makeCollectCall()
  106. // Send and close.
  107. try await collect.requestStream.send(.with { $0.text = "foo" })
  108. try await collect.requestStream.finish()
  109. // Await the response and status.
  110. _ = try await collect.response
  111. let status = await collect.status
  112. XCTAssert(status.isOk)
  113. // Sending should fail.
  114. await XCTAssertThrowsError(
  115. try await collect.requestStream.send(.with { $0.text = "should throw" })
  116. )
  117. }
  118. func testCancelBidiStreamingClosesRequestStreamAndResponseStream() async throws {
  119. let echo = try self.startServerAndClient(service: EchoProvider())
  120. let update = echo.makeUpdateCall()
  121. // Make sure the stream is up before we cancel it.
  122. try await update.requestStream.send(.with { $0.text = "foo" })
  123. // Wait for the response.
  124. var responseStream = update.responseStream.makeAsyncIterator()
  125. _ = try await responseStream.next()
  126. // Now cancel. The next send should fail and we shouldn't receive any more responses.
  127. try await update.cancel()
  128. await XCTAssertThrowsError(try await update.requestStream.send(.with { $0.text = "foo" }))
  129. await XCTAssertThrowsError(try await responseStream.next())
  130. // Status should be 'cancelled'.
  131. let status = await update.status
  132. XCTAssertEqual(status.code, .cancelled)
  133. }
  134. func testBidiStreamingClosesRequestStreamOnEnd() async throws {
  135. let echo = try self.startServerAndClient(service: EchoProvider())
  136. let update = echo.makeUpdateCall()
  137. // Send and close.
  138. try await update.requestStream.send(.with { $0.text = "foo" })
  139. try await update.requestStream.finish()
  140. // Await the response and status.
  141. let responseCount = try await update.responseStream.count()
  142. XCTAssertEqual(responseCount, 1)
  143. let status = await update.status
  144. XCTAssert(status.isOk)
  145. // Sending should fail.
  146. await XCTAssertThrowsError(
  147. try await update.requestStream.send(.with { $0.text = "should throw" })
  148. )
  149. }
  150. private enum RequestStreamingRPC {
  151. typealias Request = Echo_EchoRequest
  152. typealias Response = Echo_EchoResponse
  153. case clientStreaming(GRPCAsyncClientStreamingCall<Request, Response>)
  154. case bidirectionalStreaming(GRPCAsyncBidirectionalStreamingCall<Request, Response>)
  155. func sendRequest(_ text: String) async throws {
  156. switch self {
  157. case let .clientStreaming(call):
  158. try await call.requestStream.send(.with { $0.text = text })
  159. case let .bidirectionalStreaming(call):
  160. try await call.requestStream.send(.with { $0.text = text })
  161. }
  162. }
  163. func cancel() {
  164. switch self {
  165. case let .clientStreaming(call):
  166. // TODO: this should be async
  167. Task { try await call.cancel() }
  168. case let .bidirectionalStreaming(call):
  169. // TODO: this should be async
  170. Task { try await call.cancel() }
  171. }
  172. }
  173. }
  174. private func testSendingRequestsSuspendsWhileStreamIsNotReady(
  175. makeRPC: @escaping () -> RequestStreamingRPC
  176. ) async throws {
  177. // The strategy for this test is to race two different tasks. The first will attempt to send a
  178. // message on a request stream on a connection which will never establish. The second will sleep
  179. // for a little while. Each task returns a `SendOrTimedOut` event. If the message is sent then
  180. // the test definitely failed; it should not be possible to send a message on a stream which is
  181. // not open. If the time out happens first then it probably did not fail.
  182. enum SentOrTimedOut: Equatable, Sendable {
  183. case messageSent
  184. case timedOut
  185. }
  186. await withThrowingTaskGroup(of: SentOrTimedOut.self) { group in
  187. group.addTask {
  188. let rpc = makeRPC()
  189. return try await withTaskCancellationHandler {
  190. // This should suspend until we cancel it: we're never going to start a server so it
  191. // should never succeed.
  192. try await rpc.sendRequest("I should suspend")
  193. return .messageSent
  194. } onCancel: {
  195. rpc.cancel()
  196. }
  197. }
  198. group.addTask {
  199. // Wait for 100ms.
  200. try await Task.sleep(nanoseconds: 100_000_000)
  201. return .timedOut
  202. }
  203. do {
  204. let event = try await group.next()
  205. // If this isn't timed out then the message was sent before the stream was ready.
  206. XCTAssertEqual(event, .timedOut)
  207. } catch {
  208. XCTFail("Unexpected error \(error)")
  209. }
  210. // Cancel the other task.
  211. group.cancelAll()
  212. }
  213. }
  214. func testClientStreamingSuspendsWritesUntilStreamIsUp() async throws {
  215. // Make a client for a server which isn't up yet. It will continually fail to establish a
  216. // connection.
  217. let echo = try self.makeClient(port: 0)
  218. try await self.testSendingRequestsSuspendsWhileStreamIsNotReady {
  219. return .clientStreaming(echo.makeCollectCall())
  220. }
  221. }
  222. func testBidirectionalStreamingSuspendsWritesUntilStreamIsUp() async throws {
  223. // Make a client for a server which isn't up yet. It will continually fail to establish a
  224. // connection.
  225. let echo = try self.makeClient(port: 0)
  226. try await self.testSendingRequestsSuspendsWhileStreamIsNotReady {
  227. return .bidirectionalStreaming(echo.makeUpdateCall())
  228. }
  229. }
  230. }
  231. #endif // compiler(>=5.6)