2
0

ConcurrencyTests.swift 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. //
  2. // ConcurrencyTests.swift
  3. //
  4. // Copyright (c) 2021 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. #if compiler(>=5.6.0) && canImport(_Concurrency)
  25. import Alamofire
  26. import XCTest
  27. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  28. final class DataRequestConcurrencyTests: BaseTestCase {
  29. func testThatDataTaskSerializesResponseUsingSerializer() async throws {
  30. // Given
  31. let session = stored(Session())
  32. // When
  33. let value = try await session.request(.get)
  34. .serializingResponse(using: .data)
  35. .value
  36. // Then
  37. XCTAssertNotNil(value)
  38. }
  39. func testThatDataTaskSerializesDecodable() async throws {
  40. // Given
  41. let session = stored(Session())
  42. // When
  43. let value = try await session.request(.get).serializingDecodable(TestResponse.self).value
  44. // Then
  45. XCTAssertNotNil(value)
  46. }
  47. func testThatDataTaskSerializesString() async throws {
  48. // Given
  49. let session = stored(Session())
  50. // When
  51. let value = try await session.request(.get).serializingString().value
  52. // Then
  53. XCTAssertNotNil(value)
  54. }
  55. func testThatDataTaskSerializesData() async throws {
  56. // Given
  57. let session = stored(Session())
  58. // When
  59. let value = try await session.request(.get).serializingData().value
  60. // Then
  61. XCTAssertNotNil(value)
  62. }
  63. func testThatDataTaskProducesResult() async {
  64. // Given
  65. let session = stored(Session())
  66. // When
  67. let result = await session.request(.get).serializingDecodable(TestResponse.self).result
  68. // Then
  69. XCTAssertNotNil(result.success)
  70. }
  71. func testThatDataTaskProducesValue() async throws {
  72. // Given
  73. let session = stored(Session())
  74. // When
  75. let value = try await session.request(.get).serializingDecodable(TestResponse.self).value
  76. // Then
  77. XCTAssertNotNil(value)
  78. }
  79. func testThatDataTaskProperlySupportsConcurrentRequests() async {
  80. // Given
  81. let session = stored(Session())
  82. // When
  83. async let first = session.request(.get).serializingDecodable(TestResponse.self).response
  84. async let second = session.request(.get).serializingDecodable(TestResponse.self).response
  85. async let third = session.request(.get).serializingDecodable(TestResponse.self).response
  86. // Then
  87. let responses = await [first, second, third]
  88. XCTAssertEqual(responses.count, 3)
  89. XCTAssertTrue(responses.allSatisfy(\.result.isSuccess))
  90. }
  91. func testThatDataTaskCancellationCancelsRequest() async {
  92. // Given
  93. let session = stored(Session())
  94. let request = session.request(.get)
  95. let task = request.serializingDecodable(TestResponse.self)
  96. // When
  97. task.cancel()
  98. let response = await task.response
  99. // Then
  100. XCTAssertTrue(response.error?.isExplicitlyCancelledError == true)
  101. XCTAssertTrue(request.isCancelled, "Underlying DataRequest should be cancelled.")
  102. }
  103. func testThatDataTaskIsAutomaticallyCancelledInTask() async {
  104. // Given
  105. let session = stored(Session())
  106. let request = session.request(.get)
  107. // When
  108. let task = Task {
  109. await request.serializingDecodable(TestResponse.self).result
  110. }
  111. task.cancel()
  112. let result = await task.value
  113. // Then
  114. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  115. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  116. XCTAssertTrue(request.isCancelled, "Underlying DataRequest should be cancelled.")
  117. }
  118. func testThatDataTaskIsNotAutomaticallyCancelledInTaskWhenDisabled() async {
  119. // Given
  120. let session = stored(Session())
  121. let request = session.request(.get)
  122. // When
  123. let task = Task {
  124. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  125. }
  126. task.cancel()
  127. let result = await task.value
  128. // Then
  129. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  130. XCTAssertFalse(request.isCancelled, "Underlying DataRequest should not be cancelled.")
  131. XCTAssertTrue(result.isSuccess, "DataRequest should succeed.")
  132. }
  133. func testThatDataTaskIsAutomaticallyCancelledInTaskGroup() async {
  134. // Given
  135. let session = stored(Session())
  136. let request = session.request(.get)
  137. // When
  138. let task = Task {
  139. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  140. group.addTask {
  141. await request.serializingDecodable(TestResponse.self).result
  142. }
  143. return await group.first(where: { _ in true })!
  144. }
  145. }
  146. task.cancel()
  147. let result = await task.value
  148. // Then
  149. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  150. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  151. XCTAssertTrue(request.isCancelled, "Underlying DataRequest should be cancelled.")
  152. }
  153. func testThatDataTaskIsNotAutomaticallyCancelledInTaskGroupWhenDisabled() async {
  154. // Given
  155. let session = stored(Session())
  156. let request = session.request(.get)
  157. // When
  158. let task = Task {
  159. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  160. group.addTask {
  161. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  162. }
  163. return await group.first(where: { _ in true })!
  164. }
  165. }
  166. task.cancel()
  167. let result = await task.value
  168. // Then
  169. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  170. XCTAssertFalse(request.isCancelled, "Underlying DataRequest should not be cancelled.")
  171. XCTAssertTrue(result.isSuccess, "DataRequest should succeed.")
  172. }
  173. }
  174. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  175. final class DownloadConcurrencyTests: BaseTestCase {
  176. func testThatDownloadTaskSerializesResponseFromSerializer() async throws {
  177. // Given
  178. let session = stored(Session())
  179. // When
  180. let value = try await session.download(.get)
  181. .serializingDownload(using: .data)
  182. .value
  183. // Then
  184. XCTAssertNotNil(value)
  185. }
  186. func testThatDownloadTaskSerializesDecodable() async throws {
  187. // Given
  188. let session = stored(Session())
  189. // When
  190. let value = try await session.download(.get).serializingDecodable(TestResponse.self).value
  191. // Then
  192. XCTAssertNotNil(value)
  193. }
  194. func testThatDownloadTaskSerializesString() async throws {
  195. // Given
  196. let session = stored(Session())
  197. // When
  198. let value = try await session.download(.get).serializingString().value
  199. // Then
  200. XCTAssertNotNil(value)
  201. }
  202. func testThatDownloadTaskSerializesData() async throws {
  203. // Given
  204. let session = stored(Session())
  205. // When
  206. let value = try await session.download(.get).serializingData().value
  207. // Then
  208. XCTAssertNotNil(value)
  209. }
  210. func testThatDownloadTaskSerializesURL() async throws {
  211. // Given
  212. let session = stored(Session())
  213. // When
  214. let value = try await session.download(.get).serializingDownloadedFileURL().value
  215. // Then
  216. XCTAssertNotNil(value)
  217. }
  218. func testThatDownloadTaskProducesResult() async {
  219. // Given
  220. let session = stored(Session())
  221. // When
  222. let result = await session.download(.get).serializingDecodable(TestResponse.self).result
  223. // Then
  224. XCTAssertNotNil(result.success)
  225. }
  226. func testThatDownloadTaskProducesValue() async throws {
  227. // Given
  228. let session = stored(Session())
  229. // When
  230. let value = try await session.download(.get).serializingDecodable(TestResponse.self).value
  231. // Then
  232. XCTAssertNotNil(value)
  233. }
  234. func testThatDownloadTaskProperlySupportsConcurrentRequests() async {
  235. // Given
  236. let session = stored(Session())
  237. // When
  238. async let first = session.download(.get).serializingDecodable(TestResponse.self).response
  239. async let second = session.download(.get).serializingDecodable(TestResponse.self).response
  240. async let third = session.download(.get).serializingDecodable(TestResponse.self).response
  241. // Then
  242. let responses = await [first, second, third]
  243. XCTAssertEqual(responses.count, 3)
  244. XCTAssertTrue(responses.allSatisfy(\.result.isSuccess))
  245. }
  246. func testThatDownloadTaskCancelsRequest() async {
  247. // Given
  248. let session = stored(Session())
  249. let request = session.download(.get)
  250. let task = request.serializingDecodable(TestResponse.self)
  251. // When
  252. task.cancel()
  253. let response = await task.response
  254. // Then
  255. XCTAssertTrue(response.error?.isExplicitlyCancelledError == true)
  256. }
  257. func testThatDownloadTaskIsAutomaticallyCancelledInTask() async {
  258. // Given
  259. let session = stored(Session())
  260. let request = session.download(.get)
  261. // When
  262. let task = Task {
  263. await request.serializingDecodable(TestResponse.self).result
  264. }
  265. task.cancel()
  266. let result = await task.value
  267. // Then
  268. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  269. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  270. XCTAssertTrue(request.isCancelled, "Underlying DownloadRequest should be cancelled.")
  271. }
  272. func testThatDownloadTaskIsNotAutomaticallyCancelledInTaskWhenDisabled() async {
  273. // Given
  274. let session = stored(Session())
  275. let request = session.download(.get)
  276. // When
  277. let task = Task {
  278. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  279. }
  280. task.cancel()
  281. let result = await task.value
  282. // Then
  283. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  284. XCTAssertFalse(request.isCancelled, "Underlying DownloadRequest should not be cancelled.")
  285. XCTAssertTrue(result.isSuccess, "DownloadRequest should succeed.")
  286. }
  287. func testThatDownloadTaskIsAutomaticallyCancelledInTaskGroup() async {
  288. // Given
  289. let session = stored(Session())
  290. let request = session.download(.get)
  291. // When
  292. let task = Task {
  293. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  294. group.addTask {
  295. await request.serializingDecodable(TestResponse.self).result
  296. }
  297. return await group.first(where: { _ in true })!
  298. }
  299. }
  300. task.cancel()
  301. let result = await task.value
  302. // Then
  303. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  304. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  305. XCTAssertTrue(request.isCancelled, "Underlying DownloadRequest should be cancelled.")
  306. }
  307. func testThatDownloadTaskIsNotAutomaticallyCancelledInTaskGroupWhenDisabled() async {
  308. // Given
  309. let session = stored(Session())
  310. let request = session.download(.get)
  311. // When
  312. let task = Task {
  313. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  314. group.addTask {
  315. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  316. }
  317. return await group.first(where: { _ in true })!
  318. }
  319. }
  320. task.cancel()
  321. let result = await task.value
  322. // Then
  323. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  324. XCTAssertFalse(request.isCancelled, "Underlying DownloadRequest should not be cancelled.")
  325. XCTAssertTrue(result.isSuccess, "DownloadRequest should succeed.")
  326. }
  327. }
  328. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  329. final class DataStreamConcurrencyTests: BaseTestCase {
  330. func testThatDataStreamTaskCanStreamData() async {
  331. // Given
  332. let session = stored(Session())
  333. // When
  334. let task = session.streamRequest(.payloads(2)).streamTask()
  335. var datas: [Data] = []
  336. for await data in task.streamingData().compactMap(\.value) {
  337. datas.append(data)
  338. }
  339. // Then
  340. XCTAssertEqual(datas.count, 2)
  341. }
  342. #if swift(>=5.8) && canImport(Darwin)
  343. func testThatDataStreamHasAsyncOnHTTPResponse() async {
  344. // Given
  345. let session = stored(Session())
  346. let functionCalled = expectation(description: "doNothing called")
  347. @Sendable @MainActor func fulfill() async {
  348. functionCalled.fulfill()
  349. }
  350. // When
  351. let task = session.streamRequest(.payloads(2))
  352. .onHTTPResponse { _ in
  353. await fulfill()
  354. }
  355. .streamTask()
  356. var datas: [Data] = []
  357. for await data in task.streamingData().compactMap(\.value) {
  358. datas.append(data)
  359. }
  360. await fulfillment(of: [functionCalled], timeout: timeout)
  361. // Then
  362. XCTAssertEqual(datas.count, 2)
  363. }
  364. func testThatDataOnHTTPResponseCanAllow() async {
  365. // Given
  366. let session = stored(Session())
  367. let functionCalled = expectation(description: "doNothing called")
  368. @Sendable @MainActor func fulfill() async {
  369. functionCalled.fulfill()
  370. }
  371. // When
  372. let task = session.streamRequest(.payloads(2))
  373. .onHTTPResponse { _ in
  374. await fulfill()
  375. return .allow
  376. }
  377. .streamTask()
  378. var datas: [Data] = []
  379. for await data in task.streamingData().compactMap(\.value) {
  380. datas.append(data)
  381. }
  382. await fulfillment(of: [functionCalled], timeout: timeout)
  383. // Then
  384. XCTAssertEqual(datas.count, 2)
  385. }
  386. func testThatDataOnHTTPResponseCanCancel() async {
  387. // Given
  388. let session = stored(Session())
  389. var receivedCompletion: DataStreamRequest.Completion?
  390. let functionCalled = expectation(description: "doNothing called")
  391. @Sendable @MainActor func fulfill() async {
  392. functionCalled.fulfill()
  393. }
  394. // When
  395. let request = session.streamRequest(.payloads(2))
  396. .onHTTPResponse { _ in
  397. await fulfill()
  398. return .cancel
  399. }
  400. let task = request.streamTask()
  401. for await stream in task.streamingResponses(serializedUsing: .passthrough) {
  402. switch stream.event {
  403. case .stream:
  404. XCTFail("cancelled stream should receive no data")
  405. case let .complete(completion):
  406. receivedCompletion = completion
  407. }
  408. }
  409. await fulfillment(of: [functionCalled], timeout: timeout)
  410. // Then
  411. XCTAssertEqual(receivedCompletion?.response?.statusCode, 200)
  412. XCTAssertTrue(request.isCancelled, "onHTTPResponse cancelled request isCancelled should be true")
  413. XCTAssertTrue(request.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled request error should be explicitly cancelled")
  414. }
  415. #endif
  416. func testThatDataStreamTaskCanStreamStrings() async {
  417. // Given
  418. let session = stored(Session())
  419. // When
  420. let task = session.streamRequest(.payloads(2)).streamTask()
  421. var strings: [String] = []
  422. for await string in task.streamingStrings().compactMap(\.value) {
  423. strings.append(string)
  424. }
  425. // Then
  426. XCTAssertEqual(strings.count, 2)
  427. }
  428. func testThatDataStreamTaskCanStreamDecodable() async {
  429. // Given
  430. let session = stored(Session())
  431. // When
  432. let task = session.streamRequest(.payloads(2)).streamTask()
  433. let stream = task.streamingResponses(serializedUsing: DecodableStreamSerializer<TestResponse>())
  434. var responses: [TestResponse] = []
  435. for await response in stream.compactMap(\.value) {
  436. responses.append(response)
  437. }
  438. // Then
  439. XCTAssertEqual(responses.count, 2)
  440. }
  441. func testThatDataStreamTaskCanBeDirectlyCancelled() async {
  442. // Given
  443. let session = stored(Session())
  444. // When
  445. let expectedPayloads = 10
  446. let request = session.streamRequest(.payloads(expectedPayloads))
  447. let task = request.streamTask()
  448. var datas: [Data] = []
  449. for await data in task.streamingData().compactMap(\.value) {
  450. datas.append(data)
  451. if datas.count == 1 {
  452. task.cancel()
  453. }
  454. }
  455. // Then
  456. XCTAssertTrue(request.isCancelled)
  457. XCTAssertTrue(datas.count == 1)
  458. }
  459. func testThatDataStreamTaskIsCancelledByCancellingIteration() async {
  460. // Given
  461. let session = stored(Session())
  462. // When
  463. let expectedPayloads = 10
  464. let request = session.streamRequest(.payloads(expectedPayloads))
  465. let task = request.streamTask()
  466. var datas: [Data] = []
  467. for await data in task.streamingData().compactMap(\.value) {
  468. datas.append(data)
  469. if datas.count == 1 {
  470. break
  471. }
  472. }
  473. // Then
  474. XCTAssertTrue(request.isCancelled)
  475. XCTAssertTrue(datas.count == 1)
  476. }
  477. func testThatDataStreamTaskCanBeImplicitlyCancelled() async {
  478. // Given
  479. let session = stored(Session())
  480. // When
  481. let expectedPayloads = 10
  482. let request = session.streamRequest(.payloads(expectedPayloads))
  483. let task = Task<[Data], Never> {
  484. var datas: [Data] = []
  485. for await data in request.streamTask().streamingData().compactMap(\.value) {
  486. datas.append(data)
  487. }
  488. return datas
  489. }
  490. task.cancel()
  491. let datas: [Data] = await task.value
  492. // Then
  493. XCTAssertTrue(request.isCancelled)
  494. XCTAssertTrue(datas.isEmpty)
  495. }
  496. func testThatDataStreamTaskCanBeCancelledAfterStreamTurnsOffAutomaticCancellation() async {
  497. // Given
  498. let session = stored(Session())
  499. // When
  500. let expectedPayloads = 10
  501. let request = session.streamRequest(.payloads(expectedPayloads))
  502. let task = Task<[Data], Never> {
  503. var datas: [Data] = []
  504. let streamTask = request.streamTask()
  505. for await data in streamTask.streamingData(automaticallyCancelling: false).compactMap(\.value) {
  506. datas.append(data)
  507. break
  508. }
  509. for await data in streamTask.streamingData().compactMap(\.value) {
  510. datas.append(data)
  511. break
  512. }
  513. return datas
  514. }
  515. let datas: [Data] = await task.value
  516. // Then
  517. XCTAssertTrue(request.isCancelled)
  518. XCTAssertTrue(datas.count == 2)
  519. }
  520. }
  521. // Avoid when using swift-corelibs-foundation.
  522. // Only Xcode 14.3+ has async fulfillment.
  523. #if !canImport(FoundationNetworking) && swift(>=5.8)
  524. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  525. final class UploadConcurrencyTests: BaseTestCase {
  526. func testThatDelayedUploadStreamResultsInMultipleProgressValues() async throws {
  527. // Given
  528. let count = 75
  529. let baseString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. "
  530. let baseData = Data(baseString.utf8)
  531. var request = Endpoint.upload.urlRequest
  532. request.headers.add(name: "Content-Length", value: "\(baseData.count * count)")
  533. let expectation = expectation(description: "Bytes upload progress should be reported: \(request.url!)")
  534. var uploadProgressValues: [Double] = []
  535. var downloadProgressValues: [Double] = []
  536. var response: DataResponse<UploadResponse, AFError>?
  537. var inputStream: InputStream!
  538. var outputStream: OutputStream!
  539. Stream.getBoundStreams(withBufferSize: baseData.count, inputStream: &inputStream, outputStream: &outputStream)
  540. CFWriteStreamSetDispatchQueue(outputStream, .main)
  541. outputStream.open()
  542. // When
  543. AF.upload(inputStream, with: request)
  544. .uploadProgress { progress in
  545. uploadProgressValues.append(progress.fractionCompleted)
  546. }
  547. .downloadProgress { progress in
  548. downloadProgressValues.append(progress.fractionCompleted)
  549. }
  550. .responseDecodable(of: UploadResponse.self) { resp in
  551. response = resp
  552. expectation.fulfill()
  553. inputStream.close()
  554. }
  555. func sendData() {
  556. baseData.withUnsafeBytes { (pointer: UnsafeRawBufferPointer) in
  557. let bytesStreamed = outputStream.write(pointer.baseAddress!, maxLength: baseData.count)
  558. switch bytesStreamed {
  559. case baseData.count:
  560. // Successfully sent.
  561. break
  562. case 0:
  563. XCTFail("outputStream somehow reached end")
  564. case -1:
  565. if let streamError = outputStream.streamError {
  566. XCTFail("outputStream.write failed with error: \(streamError)")
  567. } else {
  568. XCTFail("outputStream.write failed with unknown error")
  569. }
  570. default:
  571. XCTFail("outputStream failed to send \(baseData.count) bytes, sent \(bytesStreamed) instead.")
  572. }
  573. }
  574. }
  575. for _ in 0..<count {
  576. sendData()
  577. try await Task.sleep(nanoseconds: 3 * 1_000_000) // milliseconds
  578. }
  579. outputStream.close()
  580. await fulfillment(of: [expectation], timeout: timeout)
  581. // Then
  582. XCTAssertNotNil(response?.request)
  583. XCTAssertNotNil(response?.response)
  584. XCTAssertNotNil(response?.data)
  585. XCTAssertNil(response?.error)
  586. for (progress, nextProgress) in zip(uploadProgressValues, uploadProgressValues.dropFirst()) {
  587. XCTAssertGreaterThanOrEqual(nextProgress, progress)
  588. }
  589. XCTAssertGreaterThan(uploadProgressValues.count, 1, "there should more than 1 uploadProgressValues")
  590. for (progress, nextProgress) in zip(downloadProgressValues, downloadProgressValues.dropFirst()) {
  591. XCTAssertGreaterThanOrEqual(nextProgress, progress)
  592. }
  593. XCTAssertEqual(downloadProgressValues.last, 1.0, "last item in downloadProgressValues should equal 1.0")
  594. XCTAssertEqual(response?.value?.bytes, baseData.count * count)
  595. }
  596. }
  597. #endif
  598. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  599. final class ClosureAPIConcurrencyTests: BaseTestCase {
  600. func testThatDownloadProgressStreamReturnsProgress() async {
  601. // Given
  602. let session = stored(Session())
  603. // When
  604. let request = session.request(.get)
  605. async let httpResponses = request.httpResponses().collect()
  606. async let uploadProgress = request.uploadProgress().collect()
  607. async let downloadProgress = request.downloadProgress().collect()
  608. async let requests = request.urlRequests().collect()
  609. async let tasks = request.urlSessionTasks().collect()
  610. async let descriptions = request.cURLDescriptions().collect()
  611. async let response = request.serializingDecodable(TestResponse.self).response
  612. let values: (httpResponses: [HTTPURLResponse],
  613. uploadProgresses: [Progress],
  614. downloadProgresses: [Progress],
  615. requests: [URLRequest],
  616. tasks: [URLSessionTask],
  617. descriptions: [String],
  618. response: AFDataResponse<TestResponse>)
  619. #if swift(>=5.10)
  620. values = try! await (httpResponses, uploadProgress, downloadProgress, requests, tasks, descriptions, response)
  621. #else
  622. values = await (httpResponses, uploadProgress, downloadProgress, requests, tasks, descriptions, response)
  623. #endif
  624. // Then
  625. XCTAssertTrue(values.httpResponses.count == 1, "httpResponses should have one response")
  626. XCTAssertTrue(values.uploadProgresses.isEmpty, "uploadProgresses should be empty")
  627. XCTAssertNotNil(values.downloadProgresses.last, "downloadProgresses should not be empty")
  628. XCTAssertTrue(values.downloadProgresses.last?.isFinished == true, "last download progression should be finished")
  629. XCTAssertNotNil(values.requests.last, "requests should not be empty")
  630. XCTAssertNotNil(values.tasks.last, "tasks should not be empty")
  631. XCTAssertNotNil(values.descriptions.last, "descriptions should not be empty")
  632. XCTAssertTrue(values.response.result.isSuccess, "request should succeed")
  633. }
  634. }
  635. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  636. extension AsyncSequence {
  637. func collect() async rethrows -> [Element] {
  638. var elements: [Element] = []
  639. for try await element in self {
  640. elements.append(element)
  641. }
  642. return elements
  643. }
  644. }
  645. #endif