ConcurrencyTests.swift 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. //
  2. // ConcurrencyTests.swift
  3. //
  4. // Copyright (c) 2021 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. #if canImport(_Concurrency)
  25. import Alamofire
  26. import XCTest
  27. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  28. final class DataRequestConcurrencyTests: BaseTestCase {
  29. func testThatDataTaskSerializesResponseUsingSerializer() async throws {
  30. // Given
  31. let session = stored(Session())
  32. // When
  33. let value = try await session.request(.get)
  34. .serializingResponse(using: .data)
  35. .value
  36. // Then
  37. XCTAssertNotNil(value)
  38. }
  39. func testThatDataTaskCanBeCancelledConcurrently() async {
  40. // Tests fix for https://github.com/Alamofire/Alamofire/issues/3978
  41. // Given
  42. let session = Session()
  43. // When: a single request has multiple DataTasks attached.
  44. for _ in 0..<100 {
  45. let request = session.request(.get)
  46. let first = Task {
  47. await request
  48. .serializingDecodable(TestResponse.self)
  49. .response
  50. }
  51. let second = Task {
  52. await request
  53. .serializingDecodable(TestResponse.self)
  54. .response
  55. }
  56. async let firstResponse = first.value
  57. async let secondResponse = second.value
  58. // When: both tasks are cancelled concurrently.
  59. async let firstCancel: Void = Task { @Sendable in first.cancel() }.value
  60. async let secondCancel: Void = Task { @Sendable in second.cancel() }.value
  61. // Then: all awaits parts should complete without continuation misuse.
  62. _ = await (firstResponse, secondResponse, firstCancel, secondCancel)
  63. }
  64. }
  65. func testThat500ResponseCanBeRetried() async throws {
  66. // Given
  67. let session = stored(Session())
  68. // When
  69. let value = try await session.request(.endpoints(.status(500), .method(.get)), interceptor: .retryPolicy)
  70. .serializingResponse(using: .data)
  71. .value
  72. // Then
  73. XCTAssertNotNil(value)
  74. }
  75. func testThatDataTaskSerializesDecodable() async throws {
  76. // Given
  77. let session = stored(Session())
  78. // When
  79. let value = try await session.request(.get).serializingDecodable(TestResponse.self).value
  80. // Then
  81. XCTAssertNotNil(value)
  82. }
  83. func testThatDataTaskSerializesString() async throws {
  84. // Given
  85. let session = stored(Session())
  86. // When
  87. let value = try await session.request(.get).serializingString().value
  88. // Then
  89. XCTAssertNotNil(value)
  90. }
  91. func testThatDataTaskSerializesData() async throws {
  92. // Given
  93. let session = stored(Session())
  94. // When
  95. let value = try await session.request(.get).serializingData().value
  96. // Then
  97. XCTAssertNotNil(value)
  98. }
  99. func testThatDataTaskProducesResult() async {
  100. // Given
  101. let session = stored(Session())
  102. // When
  103. let result = await session.request(.get).serializingDecodable(TestResponse.self).result
  104. // Then
  105. XCTAssertNotNil(result.success)
  106. }
  107. func testThatDataTaskProducesValue() async throws {
  108. // Given
  109. let session = stored(Session())
  110. // When
  111. let value = try await session.request(.get).serializingDecodable(TestResponse.self).value
  112. // Then
  113. XCTAssertNotNil(value)
  114. }
  115. func testThatDataTaskProperlySupportsConcurrentRequests() async {
  116. // Given
  117. let session = stored(Session())
  118. // When
  119. async let first = session.request(.get).serializingDecodable(TestResponse.self).response
  120. async let second = session.request(.get).serializingDecodable(TestResponse.self).response
  121. async let third = session.request(.get).serializingDecodable(TestResponse.self).response
  122. // Then
  123. let responses = await [first, second, third]
  124. XCTAssertEqual(responses.count, 3)
  125. XCTAssertTrue(responses.allSatisfy(\.result.isSuccess))
  126. }
  127. func testThatDataTaskCancellationCancelsRequest() async {
  128. // Given
  129. let session = stored(Session())
  130. let request = session.request(.get)
  131. let task = request.serializingDecodable(TestResponse.self)
  132. // When
  133. task.cancel()
  134. let response = await task.response
  135. // Then
  136. XCTAssertTrue(response.error?.isExplicitlyCancelledError == true)
  137. XCTAssertTrue(request.isCancelled, "Underlying DataRequest should be cancelled.")
  138. }
  139. func testThatDataTaskIsAutomaticallyCancelledInTask() async {
  140. // Given
  141. let session = stored(Session())
  142. let request = session.request(.get)
  143. // When
  144. let task = Task {
  145. await request.serializingDecodable(TestResponse.self).result
  146. }
  147. task.cancel()
  148. let result = await task.value
  149. // Then
  150. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  151. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  152. XCTAssertTrue(request.isCancelled, "Underlying DataRequest should be cancelled.")
  153. }
  154. func testThatDataTaskIsNotAutomaticallyCancelledInTaskWhenDisabled() async {
  155. // Given
  156. let session = stored(Session())
  157. let request = session.request(.get)
  158. // When
  159. let task = Task {
  160. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  161. }
  162. task.cancel()
  163. let result = await task.value
  164. // Then
  165. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  166. XCTAssertFalse(request.isCancelled, "Underlying DataRequest should not be cancelled.")
  167. XCTAssertTrue(result.isSuccess, "DataRequest should succeed.")
  168. }
  169. func testThatDataTaskIsAutomaticallyCancelledInTaskGroup() async {
  170. // Given
  171. let session = stored(Session())
  172. let request = session.request(.get)
  173. // When
  174. let task = Task {
  175. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  176. group.addTask {
  177. await request.serializingDecodable(TestResponse.self).result
  178. }
  179. return await group.first(where: { _ in true })!
  180. }
  181. }
  182. task.cancel()
  183. let result = await task.value
  184. // Then
  185. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  186. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  187. XCTAssertTrue(request.isCancelled, "Underlying DataRequest should be cancelled.")
  188. }
  189. func testThatDataTaskIsNotAutomaticallyCancelledInTaskGroupWhenDisabled() async {
  190. // Given
  191. let session = stored(Session())
  192. let request = session.request(.get)
  193. // When
  194. let task = Task {
  195. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  196. group.addTask {
  197. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  198. }
  199. return await group.first(where: { _ in true })!
  200. }
  201. }
  202. task.cancel()
  203. let result = await task.value
  204. // Then
  205. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  206. XCTAssertFalse(request.isCancelled, "Underlying DataRequest should not be cancelled.")
  207. XCTAssertTrue(result.isSuccess, "DataRequest should succeed.")
  208. }
  209. }
  210. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  211. final class DownloadConcurrencyTests: BaseTestCase {
  212. func testThatDownloadTaskSerializesResponseFromSerializer() async throws {
  213. // Given
  214. let session = stored(Session())
  215. // When
  216. let value = try await session.download(.get)
  217. .serializingDownload(using: .data)
  218. .value
  219. // Then
  220. XCTAssertNotNil(value)
  221. }
  222. func testThatDownloadTaskSerializesDecodable() async throws {
  223. // Given
  224. let session = stored(Session())
  225. // When
  226. let value = try await session.download(.get).serializingDecodable(TestResponse.self).value
  227. // Then
  228. XCTAssertNotNil(value)
  229. }
  230. func testThatDownloadTaskSerializesString() async throws {
  231. // Given
  232. let session = stored(Session())
  233. // When
  234. let value = try await session.download(.get).serializingString().value
  235. // Then
  236. XCTAssertNotNil(value)
  237. }
  238. func testThatDownloadTaskSerializesData() async throws {
  239. // Given
  240. let session = stored(Session())
  241. // When
  242. let value = try await session.download(.get).serializingData().value
  243. // Then
  244. XCTAssertNotNil(value)
  245. }
  246. func testThatDownloadTaskSerializesURL() async throws {
  247. // Given
  248. let session = stored(Session())
  249. // When
  250. let value = try await session.download(.get).serializingDownloadedFileURL().value
  251. // Then
  252. XCTAssertNotNil(value)
  253. }
  254. func testThatDownloadTaskProducesResult() async {
  255. // Given
  256. let session = stored(Session())
  257. // When
  258. let result = await session.download(.get).serializingDecodable(TestResponse.self).result
  259. // Then
  260. XCTAssertNotNil(result.success)
  261. }
  262. func testThatDownloadTaskProducesValue() async throws {
  263. // Given
  264. let session = stored(Session())
  265. // When
  266. let value = try await session.download(.get).serializingDecodable(TestResponse.self).value
  267. // Then
  268. XCTAssertNotNil(value)
  269. }
  270. func testThatDownloadTaskProperlySupportsConcurrentRequests() async {
  271. // Given
  272. let session = stored(Session())
  273. // When
  274. async let first = session.download(.get).serializingDecodable(TestResponse.self).response
  275. async let second = session.download(.get).serializingDecodable(TestResponse.self).response
  276. async let third = session.download(.get).serializingDecodable(TestResponse.self).response
  277. // Then
  278. let responses = await [first, second, third]
  279. XCTAssertEqual(responses.count, 3)
  280. XCTAssertTrue(responses.allSatisfy(\.result.isSuccess))
  281. }
  282. func testThatDownloadTaskCancelsRequest() async {
  283. // Given
  284. let session = stored(Session())
  285. let request = session.download(.get)
  286. let task = request.serializingDecodable(TestResponse.self)
  287. // When
  288. task.cancel()
  289. let response = await task.response
  290. // Then
  291. XCTAssertTrue(response.error?.isExplicitlyCancelledError == true)
  292. }
  293. func testThatDownloadTaskIsAutomaticallyCancelledInTask() async {
  294. // Given
  295. let session = stored(Session())
  296. let request = session.download(.get)
  297. // When
  298. let task = Task {
  299. await request.serializingDecodable(TestResponse.self).result
  300. }
  301. task.cancel()
  302. let result = await task.value
  303. // Then
  304. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  305. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  306. XCTAssertTrue(request.isCancelled, "Underlying DownloadRequest should be cancelled.")
  307. }
  308. func testThatDownloadTaskIsNotAutomaticallyCancelledInTaskWhenDisabled() async {
  309. // Given
  310. let session = stored(Session())
  311. let request = session.download(.get)
  312. // When
  313. let task = Task {
  314. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  315. }
  316. task.cancel()
  317. let result = await task.value
  318. // Then
  319. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  320. XCTAssertFalse(request.isCancelled, "Underlying DownloadRequest should not be cancelled.")
  321. XCTAssertTrue(result.isSuccess, "DownloadRequest should succeed.")
  322. }
  323. func testThatDownloadTaskIsAutomaticallyCancelledInTaskGroup() async {
  324. // Given
  325. let session = stored(Session())
  326. let request = session.download(.get)
  327. // When
  328. let task = Task {
  329. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  330. group.addTask {
  331. await request.serializingDecodable(TestResponse.self).result
  332. }
  333. return await group.first(where: { _ in true })!
  334. }
  335. }
  336. task.cancel()
  337. let result = await task.value
  338. // Then
  339. XCTAssertTrue(result.failure?.isExplicitlyCancelledError == true)
  340. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  341. XCTAssertTrue(request.isCancelled, "Underlying DownloadRequest should be cancelled.")
  342. }
  343. func testThatDownloadTaskIsNotAutomaticallyCancelledInTaskGroupWhenDisabled() async {
  344. // Given
  345. let session = stored(Session())
  346. let request = session.download(.get)
  347. // When
  348. let task = Task {
  349. await withTaskGroup(of: Result<TestResponse, AFError>.self) { group -> Result<TestResponse, AFError> in
  350. group.addTask {
  351. await request.serializingDecodable(TestResponse.self, automaticallyCancelling: false).result
  352. }
  353. return await group.first(where: { _ in true })!
  354. }
  355. }
  356. task.cancel()
  357. let result = await task.value
  358. // Then
  359. XCTAssertTrue(task.isCancelled, "Task should be cancelled.")
  360. XCTAssertFalse(request.isCancelled, "Underlying DownloadRequest should not be cancelled.")
  361. XCTAssertTrue(result.isSuccess, "DownloadRequest should succeed.")
  362. }
  363. }
  364. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  365. final class DataStreamConcurrencyTests: BaseTestCase {
  366. func testThatDataStreamTaskCanStreamData() async {
  367. // Given
  368. let session = stored(Session())
  369. // When
  370. let task = session.streamRequest(.payloads(2)).streamTask()
  371. var datas: [Data] = []
  372. for await data in task.streamingData().compactMap(\.value) {
  373. datas.append(data)
  374. }
  375. // Then
  376. XCTAssertEqual(datas.count, 2)
  377. }
  378. #if canImport(Darwin)
  379. func testThatDataStreamHasAsyncOnHTTPResponse() async {
  380. // Given
  381. let session = stored(Session())
  382. let functionCalled = expectation(description: "doNothing called")
  383. @Sendable @MainActor func fulfill() async {
  384. functionCalled.fulfill()
  385. }
  386. // When
  387. let task = session.streamRequest(.payloads(2))
  388. .onHTTPResponse { _ in
  389. await fulfill()
  390. }
  391. .streamTask()
  392. var datas: [Data] = []
  393. for await data in task.streamingData().compactMap(\.value) {
  394. datas.append(data)
  395. }
  396. await fulfillment(of: [functionCalled], timeout: timeout)
  397. // Then
  398. XCTAssertEqual(datas.count, 2)
  399. }
  400. func testThatDataOnHTTPResponseCanAllow() async {
  401. // Given
  402. let session = stored(Session())
  403. let functionCalled = expectation(description: "doNothing called")
  404. @Sendable @MainActor func fulfill() async {
  405. functionCalled.fulfill()
  406. }
  407. // When
  408. let task = session.streamRequest(.payloads(2))
  409. .onHTTPResponse { _ in
  410. await fulfill()
  411. return .allow
  412. }
  413. .streamTask()
  414. var datas: [Data] = []
  415. for await data in task.streamingData().compactMap(\.value) {
  416. datas.append(data)
  417. }
  418. await fulfillment(of: [functionCalled], timeout: timeout)
  419. // Then
  420. XCTAssertEqual(datas.count, 2)
  421. }
  422. func testThatDataOnHTTPResponseCanCancel() async {
  423. // Given
  424. let session = stored(Session())
  425. var receivedCompletion: DataStreamRequest.Completion?
  426. let functionCalled = expectation(description: "doNothing called")
  427. @Sendable @MainActor func fulfill() async {
  428. functionCalled.fulfill()
  429. }
  430. // When
  431. let request = session.streamRequest(.payloads(2))
  432. .onHTTPResponse { _ in
  433. await fulfill()
  434. return .cancel
  435. }
  436. let task = request.streamTask()
  437. for await stream in task.streamingResponses(serializedUsing: .passthrough) {
  438. switch stream.event {
  439. case .stream:
  440. XCTFail("cancelled stream should receive no data")
  441. case let .complete(completion):
  442. receivedCompletion = completion
  443. }
  444. }
  445. await fulfillment(of: [functionCalled], timeout: timeout)
  446. // Then
  447. XCTAssertEqual(receivedCompletion?.response?.statusCode, 200)
  448. XCTAssertTrue(request.isCancelled, "onHTTPResponse cancelled request isCancelled should be true")
  449. XCTAssertTrue(request.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled request error should be explicitly cancelled")
  450. }
  451. #endif
  452. func testThatDataStreamTaskCanStreamStrings() async {
  453. // Given
  454. let session = stored(Session())
  455. // When
  456. let task = session.streamRequest(.payloads(2)).streamTask()
  457. var strings: [String] = []
  458. for await string in task.streamingStrings().compactMap(\.value) {
  459. strings.append(string)
  460. }
  461. // Then
  462. XCTAssertEqual(strings.count, 2)
  463. }
  464. func testThatDataStreamTaskCanStreamDecodable() async {
  465. // Given
  466. let session = stored(Session())
  467. // When
  468. let task = session.streamRequest(.payloads(2)).streamTask()
  469. let stream = task.streamingResponses(serializedUsing: DecodableStreamSerializer<TestResponse>())
  470. var responses: [TestResponse] = []
  471. for await response in stream.compactMap(\.value) {
  472. responses.append(response)
  473. }
  474. // Then
  475. XCTAssertEqual(responses.count, 2)
  476. }
  477. func testThatDataStreamTaskCanBeDirectlyCancelled() async {
  478. // Given
  479. let session = stored(Session())
  480. // When
  481. let expectedPayloads = 10
  482. let request = session.streamRequest(.payloads(expectedPayloads))
  483. let task = request.streamTask()
  484. var datas: [Data] = []
  485. for await data in task.streamingData().compactMap(\.value) {
  486. datas.append(data)
  487. if datas.count == 1 {
  488. task.cancel()
  489. }
  490. }
  491. // Then
  492. XCTAssertTrue(request.isCancelled)
  493. XCTAssertTrue(datas.count == 1)
  494. }
  495. func testThatDataStreamTaskIsCancelledByCancellingIteration() async {
  496. // Given
  497. let session = stored(Session())
  498. // When
  499. let expectedPayloads = 10
  500. let request = session.streamRequest(.payloads(expectedPayloads))
  501. let task = request.streamTask()
  502. var datas: [Data] = []
  503. for await data in task.streamingData().compactMap(\.value) {
  504. datas.append(data)
  505. if datas.count == 1 {
  506. break
  507. }
  508. }
  509. // Then
  510. XCTAssertTrue(request.isCancelled)
  511. XCTAssertTrue(datas.count == 1)
  512. }
  513. func testThatDataStreamTaskCanBeImplicitlyCancelled() async {
  514. // Given
  515. let session = stored(Session())
  516. // When
  517. let expectedPayloads = 10
  518. let request = session.streamRequest(.payloads(expectedPayloads))
  519. let task = Task<[Data], Never> {
  520. var datas: [Data] = []
  521. for await data in request.streamTask().streamingData().compactMap(\.value) {
  522. datas.append(data)
  523. }
  524. return datas
  525. }
  526. task.cancel()
  527. let datas: [Data] = await task.value
  528. // Then
  529. XCTAssertTrue(request.isCancelled)
  530. XCTAssertTrue(datas.isEmpty)
  531. }
  532. func testThatDataStreamTaskCanBeCancelledAfterStreamTurnsOffAutomaticCancellation() async {
  533. // Given
  534. let session = stored(Session())
  535. // When
  536. let expectedPayloads = 10
  537. let request = session.streamRequest(.payloads(expectedPayloads))
  538. let task = Task<[Data], Never> {
  539. var datas: [Data] = []
  540. let streamTask = request.streamTask()
  541. for await data in streamTask.streamingData(automaticallyCancelling: false).compactMap(\.value) {
  542. datas.append(data)
  543. break
  544. }
  545. for await data in streamTask.streamingData().compactMap(\.value) {
  546. datas.append(data)
  547. break
  548. }
  549. return datas
  550. }
  551. let datas: [Data] = await task.value
  552. // Then
  553. XCTAssertTrue(request.isCancelled)
  554. XCTAssertTrue(datas.count == 2)
  555. }
  556. }
  557. // Avoid when using swift-corelibs-foundation.
  558. // Only Xcode 14.3+ has async fulfillment.
  559. #if !canImport(FoundationNetworking)
  560. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  561. final class UploadConcurrencyTests: BaseTestCase {
  562. func testThatDelayedUploadStreamResultsInMultipleProgressValues() async throws {
  563. // Given
  564. let count = 75
  565. let baseString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. "
  566. let baseData = Data(baseString.utf8)
  567. var request = Endpoint.upload.urlRequest
  568. request.headers.add(name: "Content-Length", value: "\(baseData.count * count)")
  569. let expectation = expectation(description: "Bytes upload progress should be reported: \(request.url!)")
  570. var uploadProgressValues: [Double] = []
  571. var downloadProgressValues: [Double] = []
  572. var response: DataResponse<UploadResponse, AFError>?
  573. var inputStream: InputStream!
  574. var outputStream: OutputStream!
  575. Stream.getBoundStreams(withBufferSize: baseData.count, inputStream: &inputStream, outputStream: &outputStream)
  576. CFWriteStreamSetDispatchQueue(outputStream, .main)
  577. outputStream.open()
  578. // When
  579. AF.upload(inputStream, with: request)
  580. .uploadProgress { progress in
  581. uploadProgressValues.append(progress.fractionCompleted)
  582. }
  583. .downloadProgress { progress in
  584. downloadProgressValues.append(progress.fractionCompleted)
  585. }
  586. .responseDecodable(of: UploadResponse.self) { resp in
  587. response = resp
  588. expectation.fulfill()
  589. inputStream.close()
  590. }
  591. func sendData() {
  592. baseData.withUnsafeBytes { (pointer: UnsafeRawBufferPointer) in
  593. let bytesStreamed = outputStream.write(pointer.baseAddress!, maxLength: baseData.count)
  594. switch bytesStreamed {
  595. case baseData.count:
  596. // Successfully sent.
  597. break
  598. case 0:
  599. XCTFail("outputStream somehow reached end")
  600. case -1:
  601. if let streamError = outputStream.streamError {
  602. XCTFail("outputStream.write failed with error: \(streamError)")
  603. } else {
  604. XCTFail("outputStream.write failed with unknown error")
  605. }
  606. default:
  607. XCTFail("outputStream failed to send \(baseData.count) bytes, sent \(bytesStreamed) instead.")
  608. }
  609. }
  610. }
  611. for _ in 0..<count {
  612. sendData()
  613. try await Task.sleep(nanoseconds: 3 * 1_000_000) // milliseconds
  614. }
  615. outputStream.close()
  616. await fulfillment(of: [expectation], timeout: timeout)
  617. // Then
  618. XCTAssertNotNil(response?.request)
  619. XCTAssertNotNil(response?.response)
  620. XCTAssertNotNil(response?.data)
  621. XCTAssertNil(response?.error)
  622. for (progress, nextProgress) in zip(uploadProgressValues, uploadProgressValues.dropFirst()) {
  623. XCTAssertGreaterThanOrEqual(nextProgress, progress)
  624. }
  625. XCTAssertGreaterThan(uploadProgressValues.count, 1, "there should more than 1 uploadProgressValues")
  626. for (progress, nextProgress) in zip(downloadProgressValues, downloadProgressValues.dropFirst()) {
  627. XCTAssertGreaterThanOrEqual(nextProgress, progress)
  628. }
  629. XCTAssertEqual(downloadProgressValues.last, 1.0, "last item in downloadProgressValues should equal 1.0")
  630. XCTAssertEqual(response?.value?.bytes, baseData.count * count)
  631. }
  632. }
  633. #endif
  634. #if canImport(Darwin) && !canImport(FoundationNetworking)
  635. @_spi(WebSocket) import Alamofire
  636. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  637. final class WebSocketConcurrencyTests: BaseTestCase {
  638. func testThatMessageEventsCanBeStreamed() async {
  639. // Given
  640. let session = stored(Session())
  641. let receivedEvent = expectation(description: "receivedEvent")
  642. receivedEvent.expectedFulfillmentCount = 4
  643. // When
  644. for await _ in session.webSocketRequest(.websocket()).webSocketTask().streamingMessageEvents() {
  645. receivedEvent.fulfill()
  646. }
  647. await fulfillment(of: [receivedEvent])
  648. // Then
  649. }
  650. func testThatMessagesCanBeStreamed() async {
  651. // Given
  652. let session = stored(Session())
  653. // When
  654. let messages = await session.webSocketRequest(.websocket()).webSocketTask().streamingMessages().collect()
  655. // Then
  656. XCTAssertTrue(messages.count == 1)
  657. }
  658. }
  659. #endif
  660. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  661. final class ClosureAPIConcurrencyTests: BaseTestCase {
  662. func testThatDownloadProgressStreamReturnsProgress() async {
  663. // Given
  664. let session = stored(Session())
  665. // When
  666. let request = session.request(.get)
  667. async let httpResponses = request.httpResponses().collect()
  668. async let uploadProgress = request.uploadProgress().collect()
  669. async let downloadProgress = request.downloadProgress().collect()
  670. async let requests = request.urlRequests().collect()
  671. async let tasks = request.urlSessionTasks().collect()
  672. async let descriptions = request.cURLDescriptions().collect()
  673. async let response = request.serializingDecodable(TestResponse.self).response
  674. let values: (httpResponses: [HTTPURLResponse],
  675. uploadProgresses: [Progress],
  676. downloadProgresses: [Progress],
  677. requests: [URLRequest],
  678. tasks: [URLSessionTask],
  679. descriptions: [String],
  680. response: AFDataResponse<TestResponse>)
  681. values = await (httpResponses, uploadProgress, downloadProgress, requests, tasks, descriptions, response)
  682. // Then
  683. XCTAssertTrue(values.httpResponses.count == 1, "httpResponses should have one response")
  684. XCTAssertTrue(values.uploadProgresses.isEmpty, "uploadProgresses should be empty")
  685. XCTAssertNotNil(values.downloadProgresses.last, "downloadProgresses should not be empty")
  686. XCTAssertTrue(values.downloadProgresses.last?.isFinished == true, "last download progression should be finished")
  687. XCTAssertNotNil(values.requests.last, "requests should not be empty")
  688. XCTAssertNotNil(values.tasks.last, "tasks should not be empty")
  689. XCTAssertNotNil(values.descriptions.last, "descriptions should not be empty")
  690. XCTAssertTrue(values.response.result.isSuccess, "request should succeed")
  691. }
  692. }
  693. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  694. extension AsyncSequence {
  695. func collect() async rethrows -> [Element] {
  696. var elements: [Element] = []
  697. for try await element in self {
  698. elements.append(element)
  699. }
  700. return elements
  701. }
  702. }
  703. #endif