DataStreamTests.swift 52 KB


  1. //
  2. // DataStreamTests.swift
  3. //
  4. // Copyright (c) 2020 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. import Alamofire
  25. import XCTest
  26. final class DataStreamTests: BaseTestCase {
  27. @MainActor
  28. func testThatDataCanBeStreamedOnMainQueue() {
  29. // Given
  30. let expectedSize = 5
  31. var accumulatedData = Data()
  32. var initialResponse: HTTPURLResponse?
  33. var response: HTTPURLResponse?
  34. var streamOnMain = false
  35. var completeOnMain = false
  36. let didReceiveResponse = expectation(description: "stream should receive response once")
  37. let didReceive = expectation(description: "stream should receive once")
  38. let didComplete = expectation(description: "stream should complete")
  39. // When
  40. AF.streamRequest(.bytes(expectedSize))
  41. .onHTTPResponse { response in
  42. initialResponse = response
  43. didReceiveResponse.fulfill()
  44. }
  45. .responseStream { stream in
  46. switch stream.event {
  47. case let .stream(result):
  48. streamOnMain = Thread.isMainThread
  49. switch result {
  50. case let .success(data):
  51. accumulatedData.append(data)
  52. }
  53. didReceive.fulfill()
  54. case let .complete(completion):
  55. completeOnMain = Thread.isMainThread
  56. response = completion.response
  57. didComplete.fulfill()
  58. }
  59. }
  60. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  61. // Then
  62. XCTAssertEqual(response?.statusCode, 200)
  63. XCTAssertEqual(initialResponse, response)
  64. XCTAssertEqual(accumulatedData.count, expectedSize)
  65. XCTAssertTrue(streamOnMain)
  66. XCTAssertTrue(completeOnMain)
  67. }
  68. @MainActor
  69. func testThatDataCanBeStreamedOnArbitraryQueue() {
  70. // Given
  71. let expectedSize = 5
  72. var accumulatedData = Data()
  73. var initialResponse: HTTPURLResponse?
  74. var response: HTTPURLResponse?
  75. let streamQueue = DispatchQueue(label: "com.alamofire.tests.ArbitraryQueue")
  76. let didReceiveResponse = expectation(description: "stream should receive response once")
  77. let didReceive = expectation(description: "stream should receive once")
  78. let didComplete = expectation(description: "stream should complete")
  79. // When
  80. AF.streamRequest(.bytes(expectedSize))
  81. .onHTTPResponse { response in
  82. initialResponse = response
  83. didReceiveResponse.fulfill()
  84. }
  85. .responseStream(on: streamQueue) { stream in
  86. dispatchPrecondition(condition: .onQueue(streamQueue))
  87. switch stream.event {
  88. case let .stream(result):
  89. switch result {
  90. case let .success(data):
  91. accumulatedData.append(data)
  92. }
  93. didReceive.fulfill()
  94. case let .complete(completion):
  95. response = completion.response
  96. didComplete.fulfill()
  97. }
  98. }
  99. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  100. // Then
  101. XCTAssertEqual(response?.statusCode, 200)
  102. XCTAssertEqual(initialResponse, response)
  103. XCTAssertEqual(accumulatedData.count, expectedSize)
  104. }
  105. @MainActor
  106. func testThatDataCanBeStreamedByByte() throws {
  107. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  108. throw XCTSkip("Older OSes don't return individual bytes.")
  109. }
  110. // Given
  111. let expectedSize = 5
  112. var accumulatedData = Data()
  113. var initialResponse: HTTPURLResponse?
  114. var response: HTTPURLResponse?
  115. var streamOnMain = false
  116. var completeOnMain = false
  117. var streamCalled = 0
  118. let didReceiveResponse = expectation(description: "stream should receive response once")
  119. let didReceive = expectation(description: "stream should receive once")
  120. didReceive.expectedFulfillmentCount = expectedSize
  121. let didComplete = expectation(description: "stream should complete")
  122. // When
  123. AF.streamRequest(.chunked(expectedSize))
  124. .onHTTPResponse { response in
  125. initialResponse = response
  126. didReceiveResponse.fulfill()
  127. }
  128. .responseStream { stream in
  129. switch stream.event {
  130. case let .stream(result):
  131. streamOnMain = Thread.isMainThread
  132. switch result {
  133. case let .success(data):
  134. accumulatedData.append(data)
  135. }
  136. streamCalled += 1
  137. didReceive.fulfill()
  138. case let .complete(completion):
  139. completeOnMain = Thread.isMainThread
  140. response = completion.response
  141. didComplete.fulfill()
  142. }
  143. }
  144. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  145. // Then
  146. XCTAssertEqual(response?.statusCode, 200)
  147. XCTAssertEqual(streamCalled, expectedSize)
  148. XCTAssertEqual(initialResponse, response)
  149. XCTAssertEqual(accumulatedData.count, expectedSize)
  150. XCTAssertTrue(streamOnMain)
  151. XCTAssertTrue(completeOnMain)
  152. }
  153. @MainActor
  154. func testThatDataCanBeStreamedAsMultipleJSONPayloads() throws {
  155. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  156. throw XCTSkip("Older OSes do not separate chunked payloads in callbacks.")
  157. }
  158. // Given
  159. let expectedSize = 5
  160. var responses: [TestResponse] = []
  161. var initialResponse: HTTPURLResponse?
  162. var response: HTTPURLResponse?
  163. var streamOnMain = false
  164. var completeOnMain = false
  165. var streamCalled = 0
  166. let didReceiveResponse = expectation(description: "stream should receive response once")
  167. let didReceive = expectation(description: "stream should receive once")
  168. didReceive.expectedFulfillmentCount = expectedSize
  169. let didComplete = expectation(description: "stream should complete")
  170. // When
  171. AF.streamRequest(.payloads(expectedSize))
  172. .onHTTPResponse { response in
  173. initialResponse = response
  174. didReceiveResponse.fulfill()
  175. }
  176. .responseStreamDecodable(of: TestResponse.self) { stream in
  177. switch stream.event {
  178. case let .stream(result):
  179. streamOnMain = Thread.isMainThread
  180. switch result {
  181. case let .success(value):
  182. responses.append(value)
  183. case let .failure(error):
  184. XCTFail("JSON stream failed due to error: \(error.localizedDescription)")
  185. }
  186. streamCalled += 1
  187. didReceive.fulfill()
  188. case let .complete(completion):
  189. completeOnMain = Thread.isMainThread
  190. response = completion.response
  191. didComplete.fulfill()
  192. }
  193. }
  194. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  195. // Then
  196. XCTAssertEqual(response?.statusCode, 200)
  197. XCTAssertEqual(streamCalled, expectedSize)
  198. XCTAssertEqual(responses.count, expectedSize)
  199. XCTAssertEqual(initialResponse, response)
  200. XCTAssertTrue(streamOnMain)
  201. XCTAssertTrue(completeOnMain)
  202. }
  203. @MainActor
  204. func testThatDataCanBeStreamedFromURL() {
  205. // Given
  206. let expectedSize = 1
  207. var accumulatedData = Data()
  208. var initialResponse: HTTPURLResponse?
  209. var response: HTTPURLResponse?
  210. var streamOnMain = false
  211. var completeOnMain = false
  212. let didReceiveResponse = expectation(description: "stream should receive response once")
  213. let didReceive = expectation(description: "stream should receive")
  214. let didComplete = expectation(description: "stream should complete")
  215. // When
  216. AF.streamRequest(.bytes(expectedSize))
  217. .onHTTPResponse { response in
  218. initialResponse = response
  219. didReceiveResponse.fulfill()
  220. }
  221. .responseStream { stream in
  222. switch stream.event {
  223. case let .stream(result):
  224. streamOnMain = Thread.isMainThread
  225. switch result {
  226. case let .success(data):
  227. accumulatedData.append(data)
  228. }
  229. didReceive.fulfill()
  230. case let .complete(completion):
  231. completeOnMain = Thread.isMainThread
  232. response = completion.response
  233. didComplete.fulfill()
  234. }
  235. }
  236. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  237. // Then
  238. XCTAssertEqual(response?.statusCode, 200)
  239. XCTAssertEqual(accumulatedData.count, expectedSize)
  240. XCTAssertEqual(initialResponse, response)
  241. XCTAssertTrue(streamOnMain)
  242. XCTAssertTrue(completeOnMain)
  243. }
  244. @MainActor
  245. func testThatDataCanBeStreamedManyTimes() {
  246. // Given
  247. let expectedSize = 1
  248. var initialResponse: HTTPURLResponse?
  249. let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
  250. var firstAccumulatedData = Data()
  251. var firstResponse: HTTPURLResponse?
  252. var firstStreamOnMain = false
  253. var firstCompleteOnMain = false
  254. let firstReceive = expectation(description: "first stream should receive")
  255. let firstCompletion = expectation(description: "first stream should complete")
  256. var secondAccumulatedData = Data()
  257. var secondResponse: HTTPURLResponse?
  258. var secondStreamOnMain = false
  259. var secondCompleteOnMain = false
  260. let secondReceive = expectation(description: "second stream should receive")
  261. let secondCompletion = expectation(description: "second stream should complete")
  262. // When
  263. AF.streamRequest(.bytes(expectedSize))
  264. .onHTTPResponse { response in
  265. initialResponse = response
  266. onHTTPResponse.fulfill()
  267. }
  268. .responseStream { stream in
  269. switch stream.event {
  270. case let .stream(result):
  271. firstStreamOnMain = Thread.isMainThread
  272. switch result {
  273. case let .success(data):
  274. firstAccumulatedData.append(data)
  275. }
  276. firstReceive.fulfill()
  277. case let .complete(completion):
  278. firstCompleteOnMain = Thread.isMainThread
  279. firstResponse = completion.response
  280. firstCompletion.fulfill()
  281. }
  282. }
  283. .responseStream { stream in
  284. switch stream.event {
  285. case let .stream(result):
  286. secondStreamOnMain = Thread.isMainThread
  287. switch result {
  288. case let .success(data):
  289. secondAccumulatedData.append(data)
  290. }
  291. secondReceive.fulfill()
  292. case let .complete(completion):
  293. secondCompleteOnMain = Thread.isMainThread
  294. secondResponse = completion.response
  295. secondCompletion.fulfill()
  296. }
  297. }
  298. wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  299. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  300. // Then
  301. XCTAssertEqual(initialResponse, firstResponse)
  302. XCTAssertEqual(initialResponse, secondResponse)
  303. XCTAssertTrue(firstStreamOnMain)
  304. XCTAssertTrue(firstCompleteOnMain)
  305. XCTAssertEqual(firstResponse?.statusCode, 200)
  306. XCTAssertEqual(firstAccumulatedData.count, expectedSize)
  307. XCTAssertTrue(secondStreamOnMain)
  308. XCTAssertTrue(secondCompleteOnMain)
  309. XCTAssertEqual(secondResponse?.statusCode, 200)
  310. XCTAssertEqual(secondAccumulatedData.count, expectedSize)
  311. }
  312. @MainActor
  313. func testThatDataCanBeStreamedAndDecodedAtTheSameTime() {
  314. // Given
  315. var initialResponse: HTTPURLResponse?
  316. let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
  317. var firstAccumulatedData = Data()
  318. var firstResponse: HTTPURLResponse?
  319. var firstStreamOnMain = false
  320. var firstCompleteOnMain = false
  321. let firstReceive = expectation(description: "first stream should receive")
  322. let firstCompletion = expectation(description: "first stream should complete")
  323. var decodedResponse: TestResponse?
  324. var decodingError: AFError?
  325. var secondResponse: HTTPURLResponse?
  326. var secondStreamOnMain = false
  327. var secondCompleteOnMain = false
  328. let secondReceive = expectation(description: "second stream should receive")
  329. let secondCompletion = expectation(description: "second stream should complete")
  330. // When
  331. AF.streamRequest(.stream(1))
  332. .onHTTPResponse { response in
  333. initialResponse = response
  334. onHTTPResponse.fulfill()
  335. }
  336. .responseStream { stream in
  337. switch stream.event {
  338. case let .stream(result):
  339. firstStreamOnMain = Thread.isMainThread
  340. switch result {
  341. case let .success(data):
  342. firstAccumulatedData.append(data)
  343. }
  344. firstReceive.fulfill()
  345. case let .complete(completion):
  346. firstCompleteOnMain = Thread.isMainThread
  347. firstResponse = completion.response
  348. firstCompletion.fulfill()
  349. }
  350. }
  351. .responseStreamDecodable(of: TestResponse.self) { stream in
  352. switch stream.event {
  353. case let .stream(result):
  354. secondStreamOnMain = Thread.isMainThread
  355. switch result {
  356. case let .success(value):
  357. decodedResponse = value
  358. case let .failure(error):
  359. decodingError = error
  360. }
  361. secondReceive.fulfill()
  362. case let .complete(completion):
  363. secondCompleteOnMain = Thread.isMainThread
  364. secondResponse = completion.response
  365. secondCompletion.fulfill()
  366. }
  367. }
  368. wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  369. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  370. // Then
  371. XCTAssertEqual(initialResponse, firstResponse)
  372. XCTAssertEqual(initialResponse, secondResponse)
  373. XCTAssertTrue(firstStreamOnMain)
  374. XCTAssertTrue(firstCompleteOnMain)
  375. XCTAssertEqual(firstResponse?.statusCode, 200)
  376. XCTAssertTrue(!firstAccumulatedData.isEmpty)
  377. XCTAssertTrue(secondStreamOnMain)
  378. XCTAssertTrue(secondCompleteOnMain)
  379. XCTAssertEqual(secondResponse?.statusCode, 200)
  380. XCTAssertNotNil(decodedResponse)
  381. XCTAssertNil(decodingError)
  382. }
  383. #if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
  384. @MainActor
  385. func testThatDataStreamRequestProducesWorkingInputStream() {
  386. // Given
  387. let expect = expectation(description: "stream complete")
  388. // When
  389. let stream = AF.streamRequest(.xml)
  390. .responseStream { stream in
  391. switch stream.event {
  392. case .complete:
  393. expect.fulfill()
  394. default: break
  395. }
  396. }
  397. .asInputStream()
  398. waitForExpectations(timeout: timeout)
  399. // Then
  400. let parser = XMLParser(stream: stream!)
  401. let parsed = parser.parse()
  402. XCTAssertTrue(parsed)
  403. XCTAssertNil(parser.parserError)
  404. }
  405. #endif
  406. @MainActor
  407. func testThatDataStreamCanBeManuallyResumed() {
  408. // Given
  409. let session = Session(startRequestsImmediately: false)
  410. var response: HTTPURLResponse?
  411. var streamOnMain = false
  412. var completeOnMain = false
  413. let didReceive = expectation(description: "stream did receive")
  414. let didComplete = expectation(description: "stream complete")
  415. // When
  416. session.streamRequest(.stream(1))
  417. .responseStream { stream in
  418. switch stream.event {
  419. case .stream:
  420. streamOnMain = Thread.isMainThread
  421. didReceive.fulfill()
  422. case let .complete(completion):
  423. completeOnMain = Thread.isMainThread
  424. response = completion.response
  425. didComplete.fulfill()
  426. }
  427. }.resume()
  428. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  429. // Then
  430. XCTAssertTrue(streamOnMain)
  431. XCTAssertTrue(completeOnMain)
  432. XCTAssertEqual(response?.statusCode, 200)
  433. }
  434. @MainActor
  435. func testThatDataStreamIsAutomaticallyCanceledOnStreamErrorWhenEnabled() {
  436. var response: HTTPURLResponse?
  437. var complete: DataStreamRequest.Completion?
  438. let didComplete = expectation(description: "stream complete")
  439. // When
  440. AF.streamRequest(.bytes(50), automaticallyCancelOnStreamError: true)
  441. .responseStreamDecodable(of: TestResponse.self) { stream in
  442. switch stream.event {
  443. case let .complete(completion):
  444. complete = completion
  445. response = completion.response
  446. didComplete.fulfill()
  447. default: break
  448. }
  449. }
  450. waitForExpectations(timeout: timeout)
  451. // Then
  452. XCTAssertEqual(response?.statusCode, 200)
  453. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == true,
  454. "error is not explicitly cancelled but \(complete?.error?.localizedDescription ?? "None")")
  455. }
  456. @MainActor
  457. func testThatDataStreamIsAutomaticallyCanceledOnStreamClosureError() {
  458. // Given
  459. enum LocalError: Error { case failed }
  460. var response: HTTPURLResponse?
  461. var complete: DataStreamRequest.Completion?
  462. let didReceive = expectation(description: "stream did receive")
  463. let didComplete = expectation(description: "stream complete")
  464. // When
  465. AF.streamRequest(.bytes(50))
  466. .responseStream { stream in
  467. switch stream.event {
  468. case .stream:
  469. didReceive.fulfill()
  470. throw LocalError.failed
  471. case let .complete(completion):
  472. complete = completion
  473. response = completion.response
  474. didComplete.fulfill()
  475. }
  476. }
  477. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  478. // Then
  479. XCTAssertEqual(response?.statusCode, 200)
  480. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == false)
  481. }
  482. @MainActor
  483. func testThatDataStreamCanBeCancelledInClosure() {
  484. // Given
  485. // Use .main so that completion can't beat cancellation.
  486. let session = Session(rootQueue: .main)
  487. var completion: DataStreamRequest.Completion?
  488. let didReceive = expectation(description: "stream should receive")
  489. let didComplete = expectation(description: "stream should complete")
  490. // When
  491. session.streamRequest(.bytes(1)).responseStream { stream in
  492. switch stream.event {
  493. case .stream:
  494. didReceive.fulfill()
  495. stream.cancel()
  496. case .complete:
  497. completion = stream.completion
  498. didComplete.fulfill()
  499. }
  500. }
  501. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  502. // Then
  503. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  504. """
  505. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  506. response is: \(completion?.response?.description ?? "none").
  507. """)
  508. }
  509. @MainActor
  510. func testThatDataStreamCanBeCancelledByToken() {
  511. // Given
  512. // Use .main so that completion can't beat cancellation.
  513. let session = Session(rootQueue: .main)
  514. var completion: DataStreamRequest.Completion?
  515. let didReceive = expectation(description: "stream should receive")
  516. let didComplete = expectation(description: "stream should complete")
  517. // When
  518. session.streamRequest(.bytes(1)).responseStream { stream in
  519. switch stream.event {
  520. case .stream:
  521. didReceive.fulfill()
  522. stream.token.cancel()
  523. case .complete:
  524. completion = stream.completion
  525. didComplete.fulfill()
  526. }
  527. }
  528. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  529. // Then
  530. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  531. """
  532. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  533. response is: \(completion?.response?.description ?? "none").
  534. """)
  535. }
  536. @MainActor
  537. func testThatOnHTTPResponseCanContinueStream() {
  538. // Given
  539. let expectedSize = 5
  540. var accumulatedData = Data()
  541. var initialResponse: HTTPURLResponse?
  542. var response: HTTPURLResponse?
  543. var streamOnMain = false
  544. var completeOnMain = false
  545. let didReceiveResponse = expectation(description: "stream should receive response once")
  546. let didReceive = expectation(description: "stream should receive once")
  547. let didComplete = expectation(description: "stream should complete")
  548. // When
  549. AF.streamRequest(.bytes(expectedSize))
  550. .onHTTPResponse { response, completionHandler in
  551. initialResponse = response
  552. didReceiveResponse.fulfill()
  553. completionHandler(.allow)
  554. }
  555. .responseStream { stream in
  556. switch stream.event {
  557. case let .stream(result):
  558. streamOnMain = Thread.isMainThread
  559. switch result {
  560. case let .success(data):
  561. accumulatedData.append(data)
  562. }
  563. didReceive.fulfill()
  564. case let .complete(completion):
  565. completeOnMain = Thread.isMainThread
  566. response = completion.response
  567. didComplete.fulfill()
  568. }
  569. }
  570. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  571. // Then
  572. XCTAssertEqual(response?.statusCode, 200)
  573. XCTAssertEqual(initialResponse, response)
  574. XCTAssertEqual(accumulatedData.count, expectedSize)
  575. XCTAssertTrue(streamOnMain)
  576. XCTAssertTrue(completeOnMain)
  577. }
  578. @MainActor
  579. func testThatOnHTTPResponseCanCancelStream() {
  580. // Given
  581. let expectedSize = 5
  582. var initialResponse: HTTPURLResponse?
  583. var response: HTTPURLResponse?
  584. var completion: DataStreamRequest.Completion?
  585. var didCompleteOnMain = false
  586. let didReceiveResponse = expectation(description: "stream should receive response once")
  587. let didComplete = expectation(description: "stream should complete")
  588. // When
  589. AF.streamRequest(.bytes(expectedSize))
  590. .onHTTPResponse { response, completionHandler in
  591. initialResponse = response
  592. didReceiveResponse.fulfill()
  593. completionHandler(.cancel)
  594. }
  595. .responseStream { stream in
  596. switch stream.event {
  597. case .stream:
  598. XCTFail("should never receive stream in a cancelled request")
  599. case let .complete(comp):
  600. didCompleteOnMain = Thread.isMainThread
  601. completion = comp
  602. response = comp.response
  603. didComplete.fulfill()
  604. }
  605. }
  606. wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
  607. // Then
  608. XCTAssertEqual(response?.statusCode, 200)
  609. XCTAssertEqual(initialResponse, response)
  610. XCTAssertTrue(didCompleteOnMain)
  611. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled stream should be explicitly cancelled")
  612. }
  613. }
  614. // MARK: - Serialization Tests
  615. final class DataStreamSerializationTests: BaseTestCase {
  616. @MainActor
  617. func testThatDataStreamsCanBeAString() {
  618. // Given
  619. var responseString: String?
  620. var streamOnMain = false
  621. var completeOnMain = false
  622. var response: HTTPURLResponse?
  623. let didStream = expectation(description: "did stream")
  624. let didComplete = expectation(description: "stream complete")
  625. // When
  626. AF.streamRequest(.stream(1))
  627. .responseStreamString { stream in
  628. switch stream.event {
  629. case let .stream(result):
  630. streamOnMain = Thread.isMainThread
  631. switch result {
  632. case let .success(string):
  633. responseString = string
  634. }
  635. didStream.fulfill()
  636. case let .complete(completion):
  637. completeOnMain = Thread.isMainThread
  638. response = completion.response
  639. didComplete.fulfill()
  640. }
  641. }
  642. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  643. // Then
  644. XCTAssertTrue(streamOnMain)
  645. XCTAssertTrue(completeOnMain)
  646. XCTAssertNotNil(responseString)
  647. XCTAssertEqual(response?.statusCode, 200)
  648. }
  649. @MainActor
  650. func testThatDataStreamsCanBeAStringOnAnArbitraryQueue() {
  651. // Given
  652. var responseString: String?
  653. var response: HTTPURLResponse?
  654. let streamQueue = DispatchQueue(label: "com.alamofire.tests.ArbitraryQueue")
  655. let didStream = expectation(description: "did stream")
  656. let didComplete = expectation(description: "stream complete")
  657. // When
  658. AF.streamRequest(.stream(1))
  659. .responseStreamString(on: streamQueue) { stream in
  660. dispatchPrecondition(condition: .onQueue(streamQueue))
  661. switch stream.event {
  662. case let .stream(result):
  663. switch result {
  664. case let .success(string):
  665. responseString = string
  666. }
  667. didStream.fulfill()
  668. case let .complete(completion):
  669. response = completion.response
  670. didComplete.fulfill()
  671. }
  672. }
  673. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  674. // Then
  675. XCTAssertNotNil(responseString)
  676. XCTAssertEqual(response?.statusCode, 200)
  677. }
  678. @MainActor
  679. func testThatDataStreamsCanBeDecoded() {
  680. // Given
  681. var response: TestResponse?
  682. var httpResponse: HTTPURLResponse?
  683. var decodingError: AFError?
  684. var streamOnMain = false
  685. var completeOnMain = false
  686. let didReceive = expectation(description: "stream did receive")
  687. let didComplete = expectation(description: "stream complete")
  688. // When
  689. AF.streamRequest(.stream(1))
  690. .responseStreamDecodable(of: TestResponse.self) { stream in
  691. switch stream.event {
  692. case let .stream(result):
  693. streamOnMain = Thread.isMainThread
  694. switch result {
  695. case let .success(value):
  696. response = value
  697. case let .failure(error):
  698. decodingError = error
  699. }
  700. didReceive.fulfill()
  701. case let .complete(completion):
  702. completeOnMain = Thread.isMainThread
  703. httpResponse = completion.response
  704. didComplete.fulfill()
  705. }
  706. }
  707. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  708. // Then
  709. XCTAssertTrue(streamOnMain)
  710. XCTAssertTrue(completeOnMain)
  711. XCTAssertNotNil(response)
  712. XCTAssertEqual(httpResponse?.statusCode, 200)
  713. XCTAssertNil(decodingError)
  714. }
  715. @MainActor
  716. func testThatDataStreamsCanBeDecodedOnAnArbitraryQueue() {
  717. // Given
  718. var response: TestResponse?
  719. var httpResponse: HTTPURLResponse?
  720. var decodingError: AFError?
  721. let streamQueue = DispatchQueue(label: "com.alamofire.tests.ArbitraryQueue")
  722. let didReceive = expectation(description: "stream did receive")
  723. let didComplete = expectation(description: "stream complete")
  724. // When
  725. AF.streamRequest(.stream(1))
  726. .responseStreamDecodable(of: TestResponse.self, on: streamQueue) { stream in
  727. dispatchPrecondition(condition: .onQueue(streamQueue))
  728. switch stream.event {
  729. case let .stream(result):
  730. switch result {
  731. case let .success(value):
  732. response = value
  733. case let .failure(error):
  734. decodingError = error
  735. }
  736. didReceive.fulfill()
  737. case let .complete(completion):
  738. httpResponse = completion.response
  739. didComplete.fulfill()
  740. }
  741. }
  742. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  743. // Then
  744. XCTAssertNotNil(response)
  745. XCTAssertEqual(httpResponse?.statusCode, 200)
  746. XCTAssertNil(decodingError)
  747. }
  748. @MainActor
  749. func testThatDataStreamSerializerCanBeUsedDirectly() {
  750. // Given
  751. var response: HTTPURLResponse?
  752. var decodedResponse: TestResponse?
  753. var decodingError: AFError?
  754. var streamOnMain = false
  755. var completeOnMain = false
  756. let serializer = DecodableStreamSerializer<TestResponse>()
  757. let didReceive = expectation(description: "stream did receive")
  758. let didComplete = expectation(description: "stream complete")
  759. // When
  760. AF.streamRequest(.stream(1))
  761. .responseStream(using: serializer) { stream in
  762. switch stream.event {
  763. case let .stream(result):
  764. streamOnMain = Thread.isMainThread
  765. switch result {
  766. case let .success(value):
  767. decodedResponse = value
  768. case let .failure(error):
  769. decodingError = error
  770. }
  771. didReceive.fulfill()
  772. case let .complete(completion):
  773. completeOnMain = Thread.isMainThread
  774. response = completion.response
  775. didComplete.fulfill()
  776. }
  777. }
  778. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  779. // Then
  780. XCTAssertTrue(streamOnMain)
  781. XCTAssertTrue(completeOnMain)
  782. XCTAssertNotNil(decodedResponse)
  783. XCTAssertEqual(response?.statusCode, 200)
  784. XCTAssertNil(decodingError)
  785. }
  786. }
  787. // MARK: - Integration Tests
  788. final class DataStreamIntegrationTests: BaseTestCase {
  789. @MainActor
  790. func testThatDataStreamCanFailValidation() {
  791. // Given
  792. var dataSeen = false
  793. var error: AFError?
  794. let didComplete = expectation(description: "stream should complete")
  795. // When
  796. AF.streamRequest(.status(401))
  797. .validate()
  798. .responseStream { stream in
  799. switch stream.event {
  800. case .stream:
  801. dataSeen = true
  802. case let .complete(completion):
  803. error = completion.error
  804. didComplete.fulfill()
  805. }
  806. }
  807. waitForExpectations(timeout: timeout)
  808. // Then
  809. XCTAssertNotNil(error, "error should not be nil")
  810. XCTAssertTrue(error?.isResponseValidationError == true, "error should be response validation error")
  811. XCTAssertFalse(dataSeen, "no data should be seen")
  812. }
  813. @MainActor
  814. func testThatDataStreamsCanBeRetried() {
  815. // Given
  816. final class GoodRetry: RequestInterceptor {
  817. var hasRetried = false
  818. func adapt(_ urlRequest: URLRequest, for session: Session, completion: @escaping (Result<URLRequest, any Error>) -> Void) {
  819. if hasRetried {
  820. completion(.success(Endpoint.bytes(1000).urlRequest))
  821. } else {
  822. completion(.success(urlRequest))
  823. }
  824. }
  825. func retry(_ request: Request, for session: Session, dueTo error: any Error, completion: @escaping (RetryResult) -> Void) {
  826. hasRetried = true
  827. completion(.retry)
  828. }
  829. }
  830. let session = Session(interceptor: GoodRetry())
  831. var accumulatedData = Data()
  832. var streamOnMain = false
  833. var completeOnMain = false
  834. var response: HTTPURLResponse?
  835. let didReceive = expectation(description: "stream should receive")
  836. let didComplete = expectation(description: "stream should complete")
  837. // When
  838. session.streamRequest(.status(401))
  839. .validate()
  840. .responseStream { stream in
  841. switch stream.event {
  842. case let .stream(result):
  843. streamOnMain = Thread.isMainThread
  844. switch result {
  845. case let .success(data):
  846. accumulatedData.append(data)
  847. }
  848. didReceive.fulfill()
  849. case let .complete(completion):
  850. completeOnMain = Thread.isMainThread
  851. response = completion.response
  852. didComplete.fulfill()
  853. }
  854. }
  855. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  856. // Then
  857. XCTAssertTrue(streamOnMain)
  858. XCTAssertTrue(completeOnMain)
  859. XCTAssertEqual(accumulatedData.count, 1000)
  860. XCTAssertEqual(response?.statusCode, 200)
  861. }
  862. @MainActor
  863. func testThatDataStreamCanBeRedirected() {
  864. // Given
  865. var response: HTTPURLResponse?
  866. var decodedResponse: TestResponse?
  867. var decodingError: AFError?
  868. var streamOnMain = false
  869. var completeOnMain = false
  870. let didRedirect = expectation(description: "stream redirected")
  871. let redirector = Redirector(behavior: .modify { _, _, _ in
  872. didRedirect.fulfill()
  873. return Endpoint.stream(1).urlRequest
  874. })
  875. let didReceive = expectation(description: "stream should receive")
  876. let didComplete = expectation(description: "stream should complete")
  877. // When
  878. AF.streamRequest(.status(301))
  879. .redirect(using: redirector)
  880. .responseStreamDecodable(of: TestResponse.self) { stream in
  881. switch stream.event {
  882. case let .stream(result):
  883. streamOnMain = Thread.isMainThread
  884. switch result {
  885. case let .success(value):
  886. decodedResponse = value
  887. case let .failure(error):
  888. decodingError = error
  889. }
  890. didReceive.fulfill()
  891. case let .complete(completion):
  892. completeOnMain = Thread.isMainThread
  893. response = completion.response
  894. didComplete.fulfill()
  895. }
  896. }
  897. wait(for: [didRedirect, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  898. // Then
  899. XCTAssertTrue(streamOnMain)
  900. XCTAssertTrue(completeOnMain)
  901. XCTAssertNotNil(decodedResponse)
  902. XCTAssertEqual(response?.statusCode, 200)
  903. XCTAssertNil(decodingError)
  904. }
  905. @MainActor
  906. func testThatDataStreamCallsCachedResponseHandler() {
  907. // Given
  908. var response: HTTPURLResponse?
  909. var decodedResponse: TestResponse?
  910. var decodingError: AFError?
  911. var streamOnMain = false
  912. var completeOnMain = false
  913. let cached = expectation(description: "stream called cacher")
  914. let cacher = ResponseCacher(behavior: .modify { _, _ in
  915. cached.fulfill()
  916. return nil
  917. })
  918. let didReceive = expectation(description: "stream did receive")
  919. let didComplete = expectation(description: "stream complete")
  920. // When
  921. AF.streamRequest(.stream(1))
  922. .cacheResponse(using: cacher)
  923. .responseStreamDecodable(of: TestResponse.self) { stream in
  924. switch stream.event {
  925. case let .stream(result):
  926. streamOnMain = Thread.isMainThread
  927. switch result {
  928. case let .success(value):
  929. decodedResponse = value
  930. case let .failure(error):
  931. decodingError = error
  932. }
  933. didReceive.fulfill()
  934. case let .complete(completion):
  935. completeOnMain = Thread.isMainThread
  936. response = completion.response
  937. didComplete.fulfill()
  938. }
  939. }
  940. // willCacheResponse called after receiving all Data, so may be called before or after the asynchronous stream
  941. // handlers.
  942. wait(for: [cached], timeout: timeout)
  943. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  944. // Then
  945. XCTAssertTrue(streamOnMain)
  946. XCTAssertTrue(completeOnMain)
  947. XCTAssertNotNil(decodedResponse)
  948. XCTAssertEqual(response?.statusCode, 200)
  949. XCTAssertNil(decodingError)
  950. }
  951. @MainActor
  952. func testThatDataStreamWorksCorrectlyWithMultipleSerialQueues() {
  953. // Given
  954. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue")
  955. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue")
  956. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  957. var firstResponse: HTTPURLResponse?
  958. var firstDecodedResponse: TestResponse?
  959. var firstDecodingError: AFError?
  960. var firstStreamOnMain = false
  961. var firstCompleteOnMain = false
  962. let firstStream = expectation(description: "first stream")
  963. let firstDidReceive = expectation(description: "first stream did receive")
  964. let firstDidComplete = expectation(description: "first stream complete")
  965. var secondResponse: HTTPURLResponse?
  966. var secondDecodedResponse: TestResponse?
  967. var secondDecodingError: AFError?
  968. var secondStreamOnMain = false
  969. var secondCompleteOnMain = false
  970. let secondStream = expectation(description: "second stream")
  971. let secondDidReceive = expectation(description: "second stream did receive")
  972. let secondDidComplete = expectation(description: "second stream complete")
  973. // When
  974. session.streamRequest(.stream(1))
  975. .responseStreamDecodable(of: TestResponse.self) { stream in
  976. switch stream.event {
  977. case let .stream(result):
  978. firstStreamOnMain = Thread.isMainThread
  979. switch result {
  980. case let .success(value):
  981. firstDecodedResponse = value
  982. case let .failure(error):
  983. firstDecodingError = error
  984. }
  985. firstStream.fulfill()
  986. firstDidReceive.fulfill()
  987. case let .complete(completion):
  988. firstCompleteOnMain = Thread.isMainThread
  989. firstResponse = completion.response
  990. firstDidComplete.fulfill()
  991. }
  992. }
  993. .responseStreamDecodable(of: TestResponse.self) { stream in
  994. switch stream.event {
  995. case let .stream(result):
  996. secondStreamOnMain = Thread.isMainThread
  997. switch result {
  998. case let .success(value):
  999. secondDecodedResponse = value
  1000. case let .failure(error):
  1001. secondDecodingError = error
  1002. }
  1003. secondStream.fulfill()
  1004. secondDidReceive.fulfill()
  1005. case let .complete(completion):
  1006. secondCompleteOnMain = Thread.isMainThread
  1007. secondResponse = completion.response
  1008. secondDidComplete.fulfill()
  1009. }
  1010. }
  1011. wait(for: [firstStream, secondStream], timeout: timeout, enforceOrder: true)
  1012. // Cannot test order of completion events, as one may have been enqueued while the other executed directly.
  1013. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  1014. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  1015. // Then
  1016. XCTAssertTrue(firstStreamOnMain)
  1017. XCTAssertTrue(firstCompleteOnMain)
  1018. XCTAssertNotNil(firstDecodedResponse)
  1019. XCTAssertEqual(firstResponse?.statusCode, 200)
  1020. XCTAssertNil(firstDecodingError)
  1021. XCTAssertTrue(secondStreamOnMain)
  1022. XCTAssertTrue(secondCompleteOnMain)
  1023. XCTAssertNotNil(secondDecodedResponse)
  1024. XCTAssertEqual(secondResponse?.statusCode, 200)
  1025. XCTAssertNil(secondDecodingError)
  1026. }
  1027. @MainActor
  1028. func testThatDataStreamWorksCorrectlyWithMultipleConcurrentQueues() {
  1029. // Given
  1030. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue", attributes: .concurrent)
  1031. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue", attributes: .concurrent)
  1032. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  1033. var firstResponse: HTTPURLResponse?
  1034. var firstDecodedResponse: TestResponse?
  1035. var firstDecodingError: AFError?
  1036. var firstStreamOnMain = false
  1037. var firstCompleteOnMain = false
  1038. let firstDidReceive = expectation(description: "first stream did receive")
  1039. let firstDidComplete = expectation(description: "first stream complete")
  1040. var secondResponse: HTTPURLResponse?
  1041. var secondDecodedResponse: TestResponse?
  1042. var secondDecodingError: AFError?
  1043. var secondStreamOnMain = false
  1044. var secondCompleteOnMain = false
  1045. let secondDidReceive = expectation(description: "second stream did receive")
  1046. let secondDidComplete = expectation(description: "second stream complete")
  1047. // When
  1048. session.streamRequest(.stream(1))
  1049. .responseStreamDecodable(of: TestResponse.self) { stream in
  1050. switch stream.event {
  1051. case let .stream(result):
  1052. firstStreamOnMain = Thread.isMainThread
  1053. switch result {
  1054. case let .success(value):
  1055. firstDecodedResponse = value
  1056. case let .failure(error):
  1057. firstDecodingError = error
  1058. }
  1059. firstDidReceive.fulfill()
  1060. case let .complete(completion):
  1061. firstCompleteOnMain = Thread.isMainThread
  1062. firstResponse = completion.response
  1063. firstDidComplete.fulfill()
  1064. }
  1065. }
  1066. .responseStreamDecodable(of: TestResponse.self) { stream in
  1067. switch stream.event {
  1068. case let .stream(result):
  1069. secondStreamOnMain = Thread.isMainThread
  1070. switch result {
  1071. case let .success(value):
  1072. secondDecodedResponse = value
  1073. case let .failure(error):
  1074. secondDecodingError = error
  1075. }
  1076. secondDidReceive.fulfill()
  1077. case let .complete(completion):
  1078. secondCompleteOnMain = Thread.isMainThread
  1079. secondResponse = completion.response
  1080. secondDidComplete.fulfill()
  1081. }
  1082. }
  1083. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  1084. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  1085. // Then
  1086. XCTAssertTrue(firstStreamOnMain)
  1087. XCTAssertTrue(firstCompleteOnMain)
  1088. XCTAssertNotNil(firstDecodedResponse)
  1089. XCTAssertEqual(firstResponse?.statusCode, 200)
  1090. XCTAssertNil(firstDecodingError)
  1091. XCTAssertTrue(secondStreamOnMain)
  1092. XCTAssertTrue(secondCompleteOnMain)
  1093. XCTAssertNotNil(secondDecodedResponse)
  1094. XCTAssertEqual(secondResponse?.statusCode, 200)
  1095. XCTAssertNil(secondDecodingError)
  1096. }
  1097. @MainActor
  1098. func testThatDataStreamCanAuthenticate() {
  1099. // Given
  1100. let user = "userstream", password = "password"
  1101. var response: HTTPURLResponse?
  1102. var streamOnMain = false
  1103. var completeOnMain = false
  1104. let didReceive = expectation(description: "stream did receive")
  1105. let didComplete = expectation(description: "stream complete")
  1106. // When
  1107. AF.streamRequest(.basicAuth(forUser: user, password: password))
  1108. .authenticate(username: user, password: password)
  1109. .responseStream { stream in
  1110. switch stream.event {
  1111. case .stream:
  1112. streamOnMain = Thread.isMainThread
  1113. didReceive.fulfill()
  1114. case let .complete(completion):
  1115. completeOnMain = Thread.isMainThread
  1116. response = completion.response
  1117. didComplete.fulfill()
  1118. }
  1119. }
  1120. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  1121. // Then
  1122. XCTAssertTrue(streamOnMain)
  1123. XCTAssertTrue(completeOnMain)
  1124. XCTAssertEqual(response?.statusCode, 200)
  1125. }
  1126. }
  1127. final class DataStreamLifetimeEvents: BaseTestCase {
  1128. @MainActor
  1129. func testThatDataStreamRequestHasAppropriateLifetimeEvents() {
  1130. // Given
  1131. final class Monitor: EventMonitor {
  1132. private let called: (@Sendable () -> Void)?
  1133. init(called: (@Sendable () -> Void)? = nil) {
  1134. self.called = called
  1135. }
  1136. func request<Value>(_ request: DataStreamRequest, didParseStream result: Result<Value, AFError>) {
  1137. called?()
  1138. }
  1139. }
  1140. let eventMonitor = ClosureEventMonitor()
  1141. // Disable event test until Firewalk supports HTTPS.
  1142. // let didReceiveChallenge = expectation(description: "didReceiveChallenge should fire")
  1143. let taskDidFinishCollecting = expectation(description: "taskDidFinishCollecting should fire")
  1144. let didReceiveData = expectation(description: "didReceiveData should fire")
  1145. let willCacheResponse = expectation(description: "willCacheResponse should fire")
  1146. let didCreateURLRequest = expectation(description: "didCreateInitialURLRequest should fire")
  1147. let didCreateTask = expectation(description: "didCreateTask should fire")
  1148. let didGatherMetrics = expectation(description: "didGatherMetrics should fire")
  1149. let didComplete = expectation(description: "didComplete should fire")
  1150. let didFinish = expectation(description: "didFinish should fire")
  1151. let didResume = expectation(description: "didResume should fire")
  1152. let didResumeTask = expectation(description: "didResumeTask should fire")
  1153. let didValidate = expectation(description: "didValidateRequest should fire")
  1154. didValidate.expectedFulfillmentCount = 2
  1155. let didParse = expectation(description: "streamDidParse should fire")
  1156. let didReceive = expectation(description: "stream should receive")
  1157. let didCompleteStream = expectation(description: "stream should complete")
  1158. var dataReceived = false
  1159. // Disable event test until Firewalk supports HTTPS.
  1160. // eventMonitor.taskDidReceiveChallenge = { _, _, _ in didReceiveChallenge.fulfill() }
  1161. eventMonitor.taskDidFinishCollectingMetrics = { _, _, _ in taskDidFinishCollecting.fulfill() }
  1162. eventMonitor.dataTaskDidReceiveData = { _, _, _ in
  1163. guard !dataReceived else { return }
  1164. // Data may be received many times, fulfill only once.
  1165. dataReceived = true
  1166. didReceiveData.fulfill()
  1167. }
  1168. eventMonitor.dataTaskWillCacheResponse = { _, _, _ in willCacheResponse.fulfill() }
  1169. eventMonitor.requestDidCreateInitialURLRequest = { _, _ in didCreateURLRequest.fulfill() }
  1170. eventMonitor.requestDidCreateTask = { _, _ in didCreateTask.fulfill() }
  1171. eventMonitor.requestDidGatherMetrics = { _, _ in didGatherMetrics.fulfill() }
  1172. eventMonitor.requestDidCompleteTaskWithError = { _, _, _ in didComplete.fulfill() }
  1173. eventMonitor.requestDidFinish = { _ in didFinish.fulfill() }
  1174. eventMonitor.requestDidResume = { _ in didResume.fulfill() }
  1175. eventMonitor.requestDidResumeTask = { _, _ in didResumeTask.fulfill() }
  1176. eventMonitor.requestDidValidateRequestResponseWithResult = { _, _, _, _ in didValidate.fulfill() }
  1177. let session = Session(eventMonitors: [eventMonitor, Monitor { didParse.fulfill() }])
  1178. // When
  1179. let request = session.streamRequest(.stream(1))
  1180. .validate()
  1181. .responseStreamDecodable(of: TestResponse.self) { stream in
  1182. switch stream.event {
  1183. case .stream:
  1184. didReceive.fulfill()
  1185. case .complete:
  1186. didCompleteStream.fulfill()
  1187. }
  1188. }
  1189. waitForExpectations(timeout: timeout)
  1190. // Then
  1191. XCTAssertEqual(request.state, .finished)
  1192. }
  1193. }