DataStreamTests.swift 42 KB


  1. //
  2. // DataStreamTests.swift
  3. //
  4. // Copyright (c) 2020 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. import Alamofire
  25. import XCTest
  26. final class DataStreamTests: BaseTestCase {
  27. func testThatDataCanBeStreamedOnMainQueue() {
  28. // Given
  29. let expectedSize = 10
  30. var accumulatedData = Data()
  31. var response: HTTPURLResponse?
  32. var streamOnMain = false
  33. var completeOnMain = false
  34. let didReceive = expectation(description: "stream should receive once")
  35. let didComplete = expectation(description: "stream should complete")
  36. // When
  37. AF.streamRequest(.bytes(expectedSize)).responseStream { stream in
  38. switch stream.event {
  39. case let .stream(result):
  40. streamOnMain = Thread.isMainThread
  41. switch result {
  42. case let .success(data):
  43. accumulatedData.append(data)
  44. }
  45. didReceive.fulfill()
  46. case let .complete(completion):
  47. completeOnMain = Thread.isMainThread
  48. response = completion.response
  49. didComplete.fulfill()
  50. }
  51. }
  52. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  53. // Then
  54. XCTAssertEqual(response?.statusCode, 200)
  55. XCTAssertEqual(accumulatedData.count, expectedSize)
  56. XCTAssertTrue(streamOnMain)
  57. XCTAssertTrue(completeOnMain)
  58. }
  59. func testThatDataCanBeStreamedByByte() {
  60. // Given
  61. let expectedSize = 10
  62. var accumulatedData = Data()
  63. var response: HTTPURLResponse?
  64. var streamOnMain = false
  65. var completeOnMain = false
  66. var streamCalled = 0
  67. let didReceive = expectation(description: "stream should receive once")
  68. if #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) {
  69. didReceive.expectedFulfillmentCount = expectedSize
  70. }
  71. let didComplete = expectation(description: "stream should complete")
  72. // When
  73. AF.streamRequest(.chunked(expectedSize)).responseStream { stream in
  74. switch stream.event {
  75. case let .stream(result):
  76. streamOnMain = Thread.isMainThread
  77. switch result {
  78. case let .success(data):
  79. accumulatedData.append(data)
  80. }
  81. streamCalled += 1
  82. didReceive.fulfill()
  83. case let .complete(completion):
  84. completeOnMain = Thread.isMainThread
  85. response = completion.response
  86. didComplete.fulfill()
  87. }
  88. }
  89. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  90. // Then
  91. XCTAssertEqual(response?.statusCode, 200)
  92. // Older OSes don't return individual bytes.
  93. if #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) {
  94. XCTAssertEqual(streamCalled, expectedSize)
  95. } else {
  96. XCTAssertEqual(streamCalled, 1)
  97. }
  98. XCTAssertEqual(accumulatedData.count, expectedSize)
  99. XCTAssertTrue(streamOnMain)
  100. XCTAssertTrue(completeOnMain)
  101. }
  102. func testThatDataCanBeStreamedAsMultipleJSONPayloads() throws {
  103. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  104. throw XCTSkip("Older OSes do not separate chunked payloads in callbacks.")
  105. }
  106. // Given
  107. let expectedSize = 10
  108. var responses: [TestResponse] = []
  109. var response: HTTPURLResponse?
  110. var streamOnMain = false
  111. var completeOnMain = false
  112. var streamCalled = 0
  113. let didReceive = expectation(description: "stream should receive once")
  114. didReceive.expectedFulfillmentCount = expectedSize
  115. let didComplete = expectation(description: "stream should complete")
  116. // When
  117. AF.streamRequest(.payloads(expectedSize))
  118. .responseStreamDecodable(of: TestResponse.self) { stream in
  119. switch stream.event {
  120. case let .stream(result):
  121. streamOnMain = Thread.isMainThread
  122. switch result {
  123. case let .success(value):
  124. responses.append(value)
  125. case let .failure(error):
  126. XCTFail("JSON stream failed due to error: \(error.localizedDescription)")
  127. }
  128. streamCalled += 1
  129. didReceive.fulfill()
  130. case let .complete(completion):
  131. completeOnMain = Thread.isMainThread
  132. response = completion.response
  133. didComplete.fulfill()
  134. }
  135. }
  136. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  137. // Then
  138. XCTAssertEqual(response?.statusCode, 200)
  139. XCTAssertEqual(streamCalled, expectedSize)
  140. XCTAssertEqual(responses.count, expectedSize)
  141. XCTAssertTrue(streamOnMain)
  142. XCTAssertTrue(completeOnMain)
  143. }
  144. func testThatDataCanBeStreamedFromURL() {
  145. // Given
  146. let expectedSize = 1
  147. var accumulatedData = Data()
  148. var response: HTTPURLResponse?
  149. var streamOnMain = false
  150. var completeOnMain = false
  151. let didReceive = expectation(description: "stream should receive")
  152. let didComplete = expectation(description: "stream should complete")
  153. // When
  154. AF.streamRequest(.bytes(expectedSize)).responseStream { stream in
  155. switch stream.event {
  156. case let .stream(result):
  157. streamOnMain = Thread.isMainThread
  158. switch result {
  159. case let .success(data):
  160. accumulatedData.append(data)
  161. }
  162. didReceive.fulfill()
  163. case let .complete(completion):
  164. completeOnMain = Thread.isMainThread
  165. response = completion.response
  166. didComplete.fulfill()
  167. }
  168. }
  169. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  170. // Then
  171. XCTAssertEqual(response?.statusCode, 200)
  172. XCTAssertEqual(accumulatedData.count, expectedSize)
  173. XCTAssertTrue(streamOnMain)
  174. XCTAssertTrue(completeOnMain)
  175. }
  176. func testThatDataCanBeStreamedManyTimes() {
  177. // Given
  178. let expectedSize = 1
  179. var firstAccumulatedData = Data()
  180. var firstResponse: HTTPURLResponse?
  181. var firstStreamOnMain = false
  182. var firstCompleteOnMain = false
  183. let firstReceive = expectation(description: "first stream should receive")
  184. let firstCompletion = expectation(description: "first stream should complete")
  185. var secondAccumulatedData = Data()
  186. var secondResponse: HTTPURLResponse?
  187. var secondStreamOnMain = false
  188. var secondCompleteOnMain = false
  189. let secondReceive = expectation(description: "second stream should receive")
  190. let secondCompletion = expectation(description: "second stream should complete")
  191. // When
  192. AF.streamRequest(.bytes(expectedSize))
  193. .responseStream { stream in
  194. switch stream.event {
  195. case let .stream(result):
  196. firstStreamOnMain = Thread.isMainThread
  197. switch result {
  198. case let .success(data):
  199. firstAccumulatedData.append(data)
  200. }
  201. firstReceive.fulfill()
  202. case let .complete(completion):
  203. firstCompleteOnMain = Thread.isMainThread
  204. firstResponse = completion.response
  205. firstCompletion.fulfill()
  206. }
  207. }
  208. .responseStream { stream in
  209. switch stream.event {
  210. case let .stream(result):
  211. secondStreamOnMain = Thread.isMainThread
  212. switch result {
  213. case let .success(data):
  214. secondAccumulatedData.append(data)
  215. }
  216. secondReceive.fulfill()
  217. case let .complete(completion):
  218. secondCompleteOnMain = Thread.isMainThread
  219. secondResponse = completion.response
  220. secondCompletion.fulfill()
  221. }
  222. }
  223. wait(for: [firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  224. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  225. // Then
  226. XCTAssertTrue(firstStreamOnMain)
  227. XCTAssertTrue(firstCompleteOnMain)
  228. XCTAssertEqual(firstResponse?.statusCode, 200)
  229. XCTAssertEqual(firstAccumulatedData.count, expectedSize)
  230. XCTAssertTrue(secondStreamOnMain)
  231. XCTAssertTrue(secondCompleteOnMain)
  232. XCTAssertEqual(secondResponse?.statusCode, 200)
  233. XCTAssertEqual(secondAccumulatedData.count, expectedSize)
  234. }
  235. func testThatDataCanBeStreamedAndDecodedAtTheSameTime() {
  236. // Given
  237. var firstAccumulatedData = Data()
  238. var firstResponse: HTTPURLResponse?
  239. var firstStreamOnMain = false
  240. var firstCompleteOnMain = false
  241. let firstReceive = expectation(description: "first stream should receive")
  242. let firstCompletion = expectation(description: "first stream should complete")
  243. var decodedResponse: TestResponse?
  244. var decodingError: AFError?
  245. var secondResponse: HTTPURLResponse?
  246. var secondStreamOnMain = false
  247. var secondCompleteOnMain = false
  248. let secondReceive = expectation(description: "second stream should receive")
  249. let secondCompletion = expectation(description: "second stream should complete")
  250. // When
  251. AF.streamRequest(.stream(1))
  252. .responseStream { stream in
  253. switch stream.event {
  254. case let .stream(result):
  255. firstStreamOnMain = Thread.isMainThread
  256. switch result {
  257. case let .success(data):
  258. firstAccumulatedData.append(data)
  259. }
  260. firstReceive.fulfill()
  261. case let .complete(completion):
  262. firstCompleteOnMain = Thread.isMainThread
  263. firstResponse = completion.response
  264. firstCompletion.fulfill()
  265. }
  266. }
  267. .responseStreamDecodable(of: TestResponse.self) { stream in
  268. switch stream.event {
  269. case let .stream(result):
  270. secondStreamOnMain = Thread.isMainThread
  271. switch result {
  272. case let .success(value):
  273. decodedResponse = value
  274. case let .failure(error):
  275. decodingError = error
  276. }
  277. secondReceive.fulfill()
  278. case let .complete(completion):
  279. secondCompleteOnMain = Thread.isMainThread
  280. secondResponse = completion.response
  281. secondCompletion.fulfill()
  282. }
  283. }
  284. wait(for: [firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  285. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  286. // Then
  287. XCTAssertTrue(firstStreamOnMain)
  288. XCTAssertTrue(firstCompleteOnMain)
  289. XCTAssertEqual(firstResponse?.statusCode, 200)
  290. XCTAssertTrue(!firstAccumulatedData.isEmpty)
  291. XCTAssertTrue(secondStreamOnMain)
  292. XCTAssertTrue(secondCompleteOnMain)
  293. XCTAssertEqual(secondResponse?.statusCode, 200)
  294. XCTAssertNotNil(decodedResponse)
  295. XCTAssertNil(decodingError)
  296. }
  297. #if !(os(Linux) || os(Windows))
  298. func testThatDataStreamRequestProducesWorkingInputStream() {
  299. // Given
  300. let expect = expectation(description: "stream complete")
  301. // When
  302. let stream = AF.streamRequest(.xml)
  303. .responseStream { stream in
  304. switch stream.event {
  305. case .complete:
  306. expect.fulfill()
  307. default: break
  308. }
  309. }.asInputStream()
  310. waitForExpectations(timeout: timeout)
  311. // Then
  312. let parser = XMLParser(stream: stream!)
  313. let parsed = parser.parse()
  314. XCTAssertTrue(parsed)
  315. XCTAssertNil(parser.parserError)
  316. }
  317. #endif
  318. func testThatDataStreamCanBeManuallyResumed() {
  319. // Given
  320. let session = Session(startRequestsImmediately: false)
  321. var response: HTTPURLResponse?
  322. var streamOnMain = false
  323. var completeOnMain = false
  324. let didReceive = expectation(description: "stream did receive")
  325. let didComplete = expectation(description: "stream complete")
  326. // When
  327. session.streamRequest(.stream(1))
  328. .responseStream { stream in
  329. switch stream.event {
  330. case .stream:
  331. streamOnMain = Thread.isMainThread
  332. didReceive.fulfill()
  333. case let .complete(completion):
  334. completeOnMain = Thread.isMainThread
  335. response = completion.response
  336. didComplete.fulfill()
  337. }
  338. }.resume()
  339. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  340. // Then
  341. XCTAssertTrue(streamOnMain)
  342. XCTAssertTrue(completeOnMain)
  343. XCTAssertEqual(response?.statusCode, 200)
  344. }
  345. func testThatDataStreamIsAutomaticallyCanceledOnStreamErrorWhenEnabled() {
  346. var response: HTTPURLResponse?
  347. var complete: DataStreamRequest.Completion?
  348. let didComplete = expectation(description: "stream complete")
  349. // When
  350. AF.streamRequest(.bytes(50), automaticallyCancelOnStreamError: true)
  351. .responseStreamDecodable(of: TestResponse.self) { stream in
  352. switch stream.event {
  353. case let .complete(completion):
  354. complete = completion
  355. response = completion.response
  356. didComplete.fulfill()
  357. default: break
  358. }
  359. }
  360. waitForExpectations(timeout: timeout)
  361. // Then
  362. XCTAssertEqual(response?.statusCode, 200)
  363. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == true,
  364. "error is not explicitly cancelled but \(complete?.error?.localizedDescription ?? "None")")
  365. }
  366. func testThatDataStreamIsAutomaticallyCanceledOnStreamClosureError() {
  367. // Given
  368. enum LocalError: Error { case failed }
  369. var response: HTTPURLResponse?
  370. var complete: DataStreamRequest.Completion?
  371. let didReceive = expectation(description: "stream did receieve")
  372. let didComplete = expectation(description: "stream complete")
  373. // When
  374. AF.streamRequest(.bytes(50))
  375. .responseStream { stream in
  376. switch stream.event {
  377. case .stream:
  378. didReceive.fulfill()
  379. throw LocalError.failed
  380. case let .complete(completion):
  381. complete = completion
  382. response = completion.response
  383. didComplete.fulfill()
  384. }
  385. }
  386. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  387. // Then
  388. XCTAssertEqual(response?.statusCode, 200)
  389. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == false)
  390. }
  391. func testThatDataStreamCanBeCancelledInClosure() {
  392. // Given
  393. // Use .main so that completion can't beat cancellation.
  394. let session = Session(rootQueue: .main)
  395. var completion: DataStreamRequest.Completion?
  396. let didReceive = expectation(description: "stream should receive")
  397. let didComplete = expectation(description: "stream should complete")
  398. // When
  399. session.streamRequest(.bytes(1)).responseStream { stream in
  400. switch stream.event {
  401. case .stream:
  402. didReceive.fulfill()
  403. stream.cancel()
  404. case .complete:
  405. completion = stream.completion
  406. didComplete.fulfill()
  407. }
  408. }
  409. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  410. // Then
  411. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  412. """
  413. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  414. response is: \(completion?.response?.description ?? "none").
  415. """)
  416. }
  417. func testThatDataStreamCanBeCancelledByToken() {
  418. // Given
  419. // Use .main so that completion can't beat cancellation.
  420. let session = Session(rootQueue: .main)
  421. var completion: DataStreamRequest.Completion?
  422. let didReceive = expectation(description: "stream should receive")
  423. let didComplete = expectation(description: "stream should complete")
  424. // When
  425. session.streamRequest(.bytes(1)).responseStream { stream in
  426. switch stream.event {
  427. case .stream:
  428. didReceive.fulfill()
  429. stream.token.cancel()
  430. case .complete:
  431. completion = stream.completion
  432. didComplete.fulfill()
  433. }
  434. }
  435. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  436. // Then
  437. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  438. """
  439. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  440. response is: \(completion?.response?.description ?? "none").
  441. """)
  442. }
  443. }
  444. // MARK: - Serialization Tests
  445. final class DataStreamSerializationTests: BaseTestCase {
  446. func testThatDataStreamsCanBeAString() {
  447. // Given
  448. var responseString: String?
  449. var streamOnMain = false
  450. var completeOnMain = false
  451. var response: HTTPURLResponse?
  452. let didStream = expectation(description: "did stream")
  453. let didComplete = expectation(description: "stream complete")
  454. // When
  455. AF.streamRequest(.stream(1))
  456. .responseStreamString { stream in
  457. switch stream.event {
  458. case let .stream(result):
  459. streamOnMain = Thread.isMainThread
  460. switch result {
  461. case let .success(string):
  462. responseString = string
  463. }
  464. didStream.fulfill()
  465. case let .complete(completion):
  466. completeOnMain = Thread.isMainThread
  467. response = completion.response
  468. didComplete.fulfill()
  469. }
  470. }
  471. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  472. // Then
  473. XCTAssertTrue(streamOnMain)
  474. XCTAssertTrue(completeOnMain)
  475. XCTAssertNotNil(responseString)
  476. XCTAssertEqual(response?.statusCode, 200)
  477. }
  478. func testThatDataStreamsCanBeDecoded() {
  479. // Given
  480. var response: TestResponse?
  481. var httpResponse: HTTPURLResponse?
  482. var decodingError: AFError?
  483. var streamOnMain = false
  484. var completeOnMain = false
  485. let didReceive = expectation(description: "stream did receive")
  486. let didComplete = expectation(description: "stream complete")
  487. // When
  488. AF.streamRequest(.stream(1))
  489. .responseStreamDecodable(of: TestResponse.self) { stream in
  490. switch stream.event {
  491. case let .stream(result):
  492. streamOnMain = Thread.isMainThread
  493. switch result {
  494. case let .success(value):
  495. response = value
  496. case let .failure(error):
  497. decodingError = error
  498. }
  499. didReceive.fulfill()
  500. case let .complete(completion):
  501. completeOnMain = Thread.isMainThread
  502. httpResponse = completion.response
  503. didComplete.fulfill()
  504. }
  505. }
  506. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  507. // Then
  508. XCTAssertTrue(streamOnMain)
  509. XCTAssertTrue(completeOnMain)
  510. XCTAssertNotNil(response)
  511. XCTAssertEqual(httpResponse?.statusCode, 200)
  512. XCTAssertNil(decodingError)
  513. }
  514. func testThatDataStreamSerializerCanBeUsedDirectly() {
  515. // Given
  516. var response: HTTPURLResponse?
  517. var decodedResponse: TestResponse?
  518. var decodingError: AFError?
  519. var streamOnMain = false
  520. var completeOnMain = false
  521. let serializer = DecodableStreamSerializer<TestResponse>()
  522. let didReceive = expectation(description: "stream did receive")
  523. let didComplete = expectation(description: "stream complete")
  524. // When
  525. AF.streamRequest(.stream(1))
  526. .responseStream(using: serializer) { stream in
  527. switch stream.event {
  528. case let .stream(result):
  529. streamOnMain = Thread.isMainThread
  530. switch result {
  531. case let .success(value):
  532. decodedResponse = value
  533. case let .failure(error):
  534. decodingError = error
  535. }
  536. didReceive.fulfill()
  537. case let .complete(completion):
  538. completeOnMain = Thread.isMainThread
  539. response = completion.response
  540. didComplete.fulfill()
  541. }
  542. }
  543. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  544. // Then
  545. XCTAssertTrue(streamOnMain)
  546. XCTAssertTrue(completeOnMain)
  547. XCTAssertNotNil(decodedResponse)
  548. XCTAssertEqual(response?.statusCode, 200)
  549. XCTAssertNil(decodingError)
  550. }
  551. }
  552. // MARK: - Integration Tests
  553. final class DataStreamIntegrationTests: BaseTestCase {
  554. func testThatDataStreamCanFailValidation() {
  555. // Given
  556. var dataSeen = false
  557. var error: AFError?
  558. let didComplete = expectation(description: "stream should complete")
  559. // When
  560. AF.streamRequest(.status(401))
  561. .validate()
  562. .responseStream { stream in
  563. switch stream.event {
  564. case .stream:
  565. dataSeen = true
  566. case let .complete(completion):
  567. error = completion.error
  568. didComplete.fulfill()
  569. }
  570. }
  571. waitForExpectations(timeout: timeout)
  572. // Then
  573. XCTAssertNotNil(error, "error should not be nil")
  574. XCTAssertTrue(error?.isResponseValidationError == true, "error should be response validation error")
  575. XCTAssertFalse(dataSeen, "no data should be seen")
  576. }
  577. func testThatDataStreamsCanBeRetried() {
  578. // Given
  579. final class GoodRetry: RequestInterceptor {
  580. var hasRetried = false
  581. func adapt(_ urlRequest: URLRequest, for session: Session, completion: @escaping (Result<URLRequest, Error>) -> Void) {
  582. if hasRetried {
  583. completion(.success(Endpoint.bytes(1000).urlRequest))
  584. } else {
  585. completion(.success(urlRequest))
  586. }
  587. }
  588. func retry(_ request: Request, for session: Session, dueTo error: Error, completion: @escaping (RetryResult) -> Void) {
  589. hasRetried = true
  590. completion(.retry)
  591. }
  592. }
  593. let session = Session(interceptor: GoodRetry())
  594. var accumulatedData = Data()
  595. var streamOnMain = false
  596. var completeOnMain = false
  597. var response: HTTPURLResponse?
  598. let didReceive = expectation(description: "stream should receive")
  599. let didComplete = expectation(description: "stream should complete")
  600. // When
  601. session.streamRequest(.status(401))
  602. .validate()
  603. .responseStream { stream in
  604. switch stream.event {
  605. case let .stream(result):
  606. streamOnMain = Thread.isMainThread
  607. switch result {
  608. case let .success(data):
  609. accumulatedData.append(data)
  610. }
  611. didReceive.fulfill()
  612. case let .complete(completion):
  613. completeOnMain = Thread.isMainThread
  614. response = completion.response
  615. didComplete.fulfill()
  616. }
  617. }
  618. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  619. // Then
  620. XCTAssertTrue(streamOnMain)
  621. XCTAssertTrue(completeOnMain)
  622. XCTAssertEqual(accumulatedData.count, 1000)
  623. XCTAssertEqual(response?.statusCode, 200)
  624. }
  625. func testThatDataStreamCanBeRedirected() {
  626. // Given
  627. var response: HTTPURLResponse?
  628. var decodedResponse: TestResponse?
  629. var decodingError: AFError?
  630. var streamOnMain = false
  631. var completeOnMain = false
  632. let didRedirect = expectation(description: "stream redirected")
  633. let redirector = Redirector(behavior: .modify { _, _, _ in
  634. didRedirect.fulfill()
  635. return Endpoint.stream(1).urlRequest
  636. })
  637. let didReceive = expectation(description: "stream should receive")
  638. let didComplete = expectation(description: "stream should complete")
  639. // When
  640. AF.streamRequest(.status(301))
  641. .redirect(using: redirector)
  642. .responseStreamDecodable(of: TestResponse.self) { stream in
  643. switch stream.event {
  644. case let .stream(result):
  645. streamOnMain = Thread.isMainThread
  646. switch result {
  647. case let .success(value):
  648. decodedResponse = value
  649. case let .failure(error):
  650. decodingError = error
  651. }
  652. didReceive.fulfill()
  653. case let .complete(completion):
  654. completeOnMain = Thread.isMainThread
  655. response = completion.response
  656. didComplete.fulfill()
  657. }
  658. }
  659. wait(for: [didRedirect, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  660. // Then
  661. XCTAssertTrue(streamOnMain)
  662. XCTAssertTrue(completeOnMain)
  663. XCTAssertNotNil(decodedResponse)
  664. XCTAssertEqual(response?.statusCode, 200)
  665. XCTAssertNil(decodingError)
  666. }
  667. func testThatDataStreamCallsCachedResponseHandler() {
  668. // Given
  669. var response: HTTPURLResponse?
  670. var decodedResponse: TestResponse?
  671. var decodingError: AFError?
  672. var streamOnMain = false
  673. var completeOnMain = false
  674. let cached = expectation(description: "stream called cacher")
  675. let cacher = ResponseCacher(behavior: .modify { _, _ in
  676. cached.fulfill()
  677. return nil
  678. })
  679. let didReceive = expectation(description: "stream did receive")
  680. let didComplete = expectation(description: "stream complete")
  681. // When
  682. AF.streamRequest(.stream(1))
  683. .cacheResponse(using: cacher)
  684. .responseStreamDecodable(of: TestResponse.self) { stream in
  685. switch stream.event {
  686. case let .stream(result):
  687. streamOnMain = Thread.isMainThread
  688. switch result {
  689. case let .success(value):
  690. decodedResponse = value
  691. case let .failure(error):
  692. decodingError = error
  693. }
  694. didReceive.fulfill()
  695. case let .complete(completion):
  696. completeOnMain = Thread.isMainThread
  697. response = completion.response
  698. didComplete.fulfill()
  699. }
  700. }
  701. // willCacheResponse called after receiving all Data, so may be called before or after the asynchronous stream
  702. // handlers.
  703. wait(for: [cached], timeout: timeout)
  704. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  705. // Then
  706. XCTAssertTrue(streamOnMain)
  707. XCTAssertTrue(completeOnMain)
  708. XCTAssertNotNil(decodedResponse)
  709. XCTAssertEqual(response?.statusCode, 200)
  710. XCTAssertNil(decodingError)
  711. }
  712. func testThatDataStreamWorksCorrectlyWithMultipleSerialQueues() {
  713. // Given
  714. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue")
  715. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue")
  716. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  717. var firstResponse: HTTPURLResponse?
  718. var firstDecodedResponse: TestResponse?
  719. var firstDecodingError: AFError?
  720. var firstStreamOnMain = false
  721. var firstCompleteOnMain = false
  722. let firstStream = expectation(description: "first stream")
  723. let firstDidReceive = expectation(description: "first stream did receive")
  724. let firstDidComplete = expectation(description: "first stream complete")
  725. var secondResponse: HTTPURLResponse?
  726. var secondDecodedResponse: TestResponse?
  727. var secondDecodingError: AFError?
  728. var secondStreamOnMain = false
  729. var secondCompleteOnMain = false
  730. let secondStream = expectation(description: "second stream")
  731. let secondDidReceive = expectation(description: "second stream did receive")
  732. let secondDidComplete = expectation(description: "second stream complete")
  733. // When
  734. session.streamRequest(.stream(1))
  735. .responseStreamDecodable(of: TestResponse.self) { stream in
  736. switch stream.event {
  737. case let .stream(result):
  738. firstStreamOnMain = Thread.isMainThread
  739. switch result {
  740. case let .success(value):
  741. firstDecodedResponse = value
  742. case let .failure(error):
  743. firstDecodingError = error
  744. }
  745. firstStream.fulfill()
  746. firstDidReceive.fulfill()
  747. case let .complete(completion):
  748. firstCompleteOnMain = Thread.isMainThread
  749. firstResponse = completion.response
  750. firstDidComplete.fulfill()
  751. }
  752. }
  753. .responseStreamDecodable(of: TestResponse.self) { stream in
  754. switch stream.event {
  755. case let .stream(result):
  756. secondStreamOnMain = Thread.isMainThread
  757. switch result {
  758. case let .success(value):
  759. secondDecodedResponse = value
  760. case let .failure(error):
  761. secondDecodingError = error
  762. }
  763. secondStream.fulfill()
  764. secondDidReceive.fulfill()
  765. case let .complete(completion):
  766. secondCompleteOnMain = Thread.isMainThread
  767. secondResponse = completion.response
  768. secondDidComplete.fulfill()
  769. }
  770. }
  771. wait(for: [firstStream, secondStream], timeout: timeout, enforceOrder: true)
  772. // Cannot test order of completion events, as one may have been enqueued while the other executed directly.
  773. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  774. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  775. // Then
  776. XCTAssertTrue(firstStreamOnMain)
  777. XCTAssertTrue(firstCompleteOnMain)
  778. XCTAssertNotNil(firstDecodedResponse)
  779. XCTAssertEqual(firstResponse?.statusCode, 200)
  780. XCTAssertNil(firstDecodingError)
  781. XCTAssertTrue(secondStreamOnMain)
  782. XCTAssertTrue(secondCompleteOnMain)
  783. XCTAssertNotNil(secondDecodedResponse)
  784. XCTAssertEqual(secondResponse?.statusCode, 200)
  785. XCTAssertNil(secondDecodingError)
  786. }
  787. func testThatDataStreamWorksCorrectlyWithMultipleConcurrentQueues() {
  788. // Given
  789. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue", attributes: .concurrent)
  790. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue", attributes: .concurrent)
  791. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  792. var firstResponse: HTTPURLResponse?
  793. var firstDecodedResponse: TestResponse?
  794. var firstDecodingError: AFError?
  795. var firstStreamOnMain = false
  796. var firstCompleteOnMain = false
  797. let firstDidReceive = expectation(description: "first stream did receive")
  798. let firstDidComplete = expectation(description: "first stream complete")
  799. var secondResponse: HTTPURLResponse?
  800. var secondDecodedResponse: TestResponse?
  801. var secondDecodingError: AFError?
  802. var secondStreamOnMain = false
  803. var secondCompleteOnMain = false
  804. let secondDidReceive = expectation(description: "second stream did receive")
  805. let secondDidComplete = expectation(description: "second stream complete")
  806. // When
  807. session.streamRequest(.stream(1))
  808. .responseStreamDecodable(of: TestResponse.self) { stream in
  809. switch stream.event {
  810. case let .stream(result):
  811. firstStreamOnMain = Thread.isMainThread
  812. switch result {
  813. case let .success(value):
  814. firstDecodedResponse = value
  815. case let .failure(error):
  816. firstDecodingError = error
  817. }
  818. firstDidReceive.fulfill()
  819. case let .complete(completion):
  820. firstCompleteOnMain = Thread.isMainThread
  821. firstResponse = completion.response
  822. firstDidComplete.fulfill()
  823. }
  824. }
  825. .responseStreamDecodable(of: TestResponse.self) { stream in
  826. switch stream.event {
  827. case let .stream(result):
  828. secondStreamOnMain = Thread.isMainThread
  829. switch result {
  830. case let .success(value):
  831. secondDecodedResponse = value
  832. case let .failure(error):
  833. secondDecodingError = error
  834. }
  835. secondDidReceive.fulfill()
  836. case let .complete(completion):
  837. secondCompleteOnMain = Thread.isMainThread
  838. secondResponse = completion.response
  839. secondDidComplete.fulfill()
  840. }
  841. }
  842. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  843. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  844. // Then
  845. XCTAssertTrue(firstStreamOnMain)
  846. XCTAssertTrue(firstCompleteOnMain)
  847. XCTAssertNotNil(firstDecodedResponse)
  848. XCTAssertEqual(firstResponse?.statusCode, 200)
  849. XCTAssertNil(firstDecodingError)
  850. XCTAssertTrue(secondStreamOnMain)
  851. XCTAssertTrue(secondCompleteOnMain)
  852. XCTAssertNotNil(secondDecodedResponse)
  853. XCTAssertEqual(secondResponse?.statusCode, 200)
  854. XCTAssertNil(secondDecodingError)
  855. }
  856. func testThatDataStreamCanAuthenticate() {
  857. // Given
  858. let user = "userstream", password = "password"
  859. var response: HTTPURLResponse?
  860. var streamOnMain = false
  861. var completeOnMain = false
  862. let didReceive = expectation(description: "stream did receive")
  863. let didComplete = expectation(description: "stream complete")
  864. // When
  865. AF.streamRequest(.basicAuth(forUser: user, password: password))
  866. .authenticate(username: user, password: password)
  867. .responseStream { stream in
  868. switch stream.event {
  869. case .stream:
  870. streamOnMain = Thread.isMainThread
  871. didReceive.fulfill()
  872. case let .complete(completion):
  873. completeOnMain = Thread.isMainThread
  874. response = completion.response
  875. didComplete.fulfill()
  876. }
  877. }
  878. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  879. // Then
  880. XCTAssertTrue(streamOnMain)
  881. XCTAssertTrue(completeOnMain)
  882. XCTAssertEqual(response?.statusCode, 200)
  883. }
  884. }
  885. final class DataStreamLifetimeEvents: BaseTestCase {
  886. func testThatDataStreamRequestHasAppropriateLifetimeEvents() {
  887. // Given
  888. final class Monitor: EventMonitor {
  889. var called: (() -> Void)?
  890. func request<Value>(_ request: DataStreamRequest, didParseStream result: Result<Value, AFError>) {
  891. called?()
  892. }
  893. }
  894. let eventMonitor = ClosureEventMonitor()
  895. let parseMonitor = Monitor()
  896. let session = Session(eventMonitors: [eventMonitor, parseMonitor])
  897. // Disable event test until Firewalk supports HTTPS.
  898. // let didReceiveChallenge = expectation(description: "didReceiveChallenge should fire")
  899. let taskDidFinishCollecting = expectation(description: "taskDidFinishCollecting should fire")
  900. let didReceiveData = expectation(description: "didReceiveData should fire")
  901. let willCacheResponse = expectation(description: "willCacheResponse should fire")
  902. let didCreateURLRequest = expectation(description: "didCreateInitialURLRequest should fire")
  903. let didCreateTask = expectation(description: "didCreateTask should fire")
  904. let didGatherMetrics = expectation(description: "didGatherMetrics should fire")
  905. let didComplete = expectation(description: "didComplete should fire")
  906. let didFinish = expectation(description: "didFinish should fire")
  907. let didResume = expectation(description: "didResume should fire")
  908. let didResumeTask = expectation(description: "didResumeTask should fire")
  909. let didValidate = expectation(description: "didValidateRequest should fire")
  910. didValidate.expectedFulfillmentCount = 2
  911. let didParse = expectation(description: "streamDidParse should fire")
  912. let didReceive = expectation(description: "stream should receive")
  913. let didCompleteStream = expectation(description: "stream should complete")
  914. var dataReceived = false
  915. // Disable event test until Firewalk supports HTTPS.
  916. // eventMonitor.taskDidReceiveChallenge = { _, _, _ in didReceiveChallenge.fulfill() }
  917. eventMonitor.taskDidFinishCollectingMetrics = { _, _, _ in taskDidFinishCollecting.fulfill() }
  918. eventMonitor.dataTaskDidReceiveData = { _, _, _ in
  919. guard !dataReceived else { return }
  920. // Data may be received many times, fulfill only once.
  921. dataReceived = true
  922. didReceiveData.fulfill()
  923. }
  924. eventMonitor.dataTaskWillCacheResponse = { _, _, _ in willCacheResponse.fulfill() }
  925. eventMonitor.requestDidCreateInitialURLRequest = { _, _ in didCreateURLRequest.fulfill() }
  926. eventMonitor.requestDidCreateTask = { _, _ in didCreateTask.fulfill() }
  927. eventMonitor.requestDidGatherMetrics = { _, _ in didGatherMetrics.fulfill() }
  928. eventMonitor.requestDidCompleteTaskWithError = { _, _, _ in didComplete.fulfill() }
  929. eventMonitor.requestDidFinish = { _ in didFinish.fulfill() }
  930. eventMonitor.requestDidResume = { _ in didResume.fulfill() }
  931. eventMonitor.requestDidResumeTask = { _, _ in didResumeTask.fulfill() }
  932. eventMonitor.requestDidValidateRequestResponseWithResult = { _, _, _, _ in didValidate.fulfill() }
  933. parseMonitor.called = { didParse.fulfill() }
  934. // When
  935. let request = session.streamRequest(.stream(1))
  936. .validate()
  937. .responseStreamDecodable(of: TestResponse.self) { stream in
  938. switch stream.event {
  939. case .stream:
  940. didReceive.fulfill()
  941. case .complete:
  942. didCompleteStream.fulfill()
  943. }
  944. }
  945. waitForExpectations(timeout: timeout)
  946. // Then
  947. XCTAssertEqual(request.state, .finished)
  948. }
  949. }