DataStreamTests.swift 51 KB


  1. //
  2. // DataStreamTests.swift
  3. //
  4. // Copyright (c) 2020 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. import Alamofire
  25. import XCTest
  26. final class DataStreamTests: BaseTestCase {
  27. func testThatDataCanBeStreamedOnMainQueue() {
  28. // Given
  29. let expectedSize = 5
  30. var accumulatedData = Data()
  31. var initialResponse: HTTPURLResponse?
  32. var response: HTTPURLResponse?
  33. var streamOnMain = false
  34. var completeOnMain = false
  35. let didReceiveResponse = expectation(description: "stream should receive response once")
  36. let didReceive = expectation(description: "stream should receive once")
  37. let didComplete = expectation(description: "stream should complete")
  38. // When
  39. AF.streamRequest(.bytes(expectedSize))
  40. .onHTTPResponse { response in
  41. initialResponse = response
  42. didReceiveResponse.fulfill()
  43. }
  44. .responseStream { stream in
  45. switch stream.event {
  46. case let .stream(result):
  47. streamOnMain = Thread.isMainThread
  48. switch result {
  49. case let .success(data):
  50. accumulatedData.append(data)
  51. }
  52. didReceive.fulfill()
  53. case let .complete(completion):
  54. completeOnMain = Thread.isMainThread
  55. response = completion.response
  56. didComplete.fulfill()
  57. }
  58. }
  59. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  60. // Then
  61. XCTAssertEqual(response?.statusCode, 200)
  62. XCTAssertEqual(initialResponse, response)
  63. XCTAssertEqual(accumulatedData.count, expectedSize)
  64. XCTAssertTrue(streamOnMain)
  65. XCTAssertTrue(completeOnMain)
  66. }
  67. func testThatDataCanBeStreamedOnArbitraryQueue() {
  68. // Given
  69. let expectedSize = 5
  70. var accumulatedData = Data()
  71. var initialResponse: HTTPURLResponse?
  72. var response: HTTPURLResponse?
  73. let streamQueue = DispatchQueue(label: "com.alamofire.tests.ArbitraryQueue")
  74. let didReceiveResponse = expectation(description: "stream should receive response once")
  75. let didReceive = expectation(description: "stream should receive once")
  76. let didComplete = expectation(description: "stream should complete")
  77. // When
  78. AF.streamRequest(.bytes(expectedSize))
  79. .onHTTPResponse { response in
  80. initialResponse = response
  81. didReceiveResponse.fulfill()
  82. }
  83. .responseStream(on: streamQueue) { stream in
  84. dispatchPrecondition(condition: .onQueue(streamQueue))
  85. switch stream.event {
  86. case let .stream(result):
  87. switch result {
  88. case let .success(data):
  89. accumulatedData.append(data)
  90. }
  91. didReceive.fulfill()
  92. case let .complete(completion):
  93. response = completion.response
  94. didComplete.fulfill()
  95. }
  96. }
  97. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  98. // Then
  99. XCTAssertEqual(response?.statusCode, 200)
  100. XCTAssertEqual(initialResponse, response)
  101. XCTAssertEqual(accumulatedData.count, expectedSize)
  102. }
  103. func testThatDataCanBeStreamedByByte() throws {
  104. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  105. throw XCTSkip("Older OSes don't return individual bytes.")
  106. }
  107. // Given
  108. let expectedSize = 5
  109. var accumulatedData = Data()
  110. var initialResponse: HTTPURLResponse?
  111. var response: HTTPURLResponse?
  112. var streamOnMain = false
  113. var completeOnMain = false
  114. var streamCalled = 0
  115. let didReceiveResponse = expectation(description: "stream should receive response once")
  116. let didReceive = expectation(description: "stream should receive once")
  117. didReceive.expectedFulfillmentCount = expectedSize
  118. let didComplete = expectation(description: "stream should complete")
  119. // When
  120. AF.streamRequest(.chunked(expectedSize))
  121. .onHTTPResponse { response in
  122. initialResponse = response
  123. didReceiveResponse.fulfill()
  124. }
  125. .responseStream { stream in
  126. switch stream.event {
  127. case let .stream(result):
  128. streamOnMain = Thread.isMainThread
  129. switch result {
  130. case let .success(data):
  131. accumulatedData.append(data)
  132. }
  133. streamCalled += 1
  134. didReceive.fulfill()
  135. case let .complete(completion):
  136. completeOnMain = Thread.isMainThread
  137. response = completion.response
  138. didComplete.fulfill()
  139. }
  140. }
  141. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  142. // Then
  143. XCTAssertEqual(response?.statusCode, 200)
  144. XCTAssertEqual(streamCalled, expectedSize)
  145. XCTAssertEqual(initialResponse, response)
  146. XCTAssertEqual(accumulatedData.count, expectedSize)
  147. XCTAssertTrue(streamOnMain)
  148. XCTAssertTrue(completeOnMain)
  149. }
  150. func testThatDataCanBeStreamedAsMultipleJSONPayloads() throws {
  151. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  152. throw XCTSkip("Older OSes do not separate chunked payloads in callbacks.")
  153. }
  154. // Given
  155. let expectedSize = 5
  156. var responses: [TestResponse] = []
  157. var initialResponse: HTTPURLResponse?
  158. var response: HTTPURLResponse?
  159. var streamOnMain = false
  160. var completeOnMain = false
  161. var streamCalled = 0
  162. let didReceiveResponse = expectation(description: "stream should receive response once")
  163. let didReceive = expectation(description: "stream should receive once")
  164. didReceive.expectedFulfillmentCount = expectedSize
  165. let didComplete = expectation(description: "stream should complete")
  166. // When
  167. AF.streamRequest(.payloads(expectedSize))
  168. .onHTTPResponse { response in
  169. initialResponse = response
  170. didReceiveResponse.fulfill()
  171. }
  172. .responseStreamDecodable(of: TestResponse.self) { stream in
  173. switch stream.event {
  174. case let .stream(result):
  175. streamOnMain = Thread.isMainThread
  176. switch result {
  177. case let .success(value):
  178. responses.append(value)
  179. case let .failure(error):
  180. XCTFail("JSON stream failed due to error: \(error.localizedDescription)")
  181. }
  182. streamCalled += 1
  183. didReceive.fulfill()
  184. case let .complete(completion):
  185. completeOnMain = Thread.isMainThread
  186. response = completion.response
  187. didComplete.fulfill()
  188. }
  189. }
  190. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  191. // Then
  192. XCTAssertEqual(response?.statusCode, 200)
  193. XCTAssertEqual(streamCalled, expectedSize)
  194. XCTAssertEqual(responses.count, expectedSize)
  195. XCTAssertEqual(initialResponse, response)
  196. XCTAssertTrue(streamOnMain)
  197. XCTAssertTrue(completeOnMain)
  198. }
  199. func testThatDataCanBeStreamedFromURL() {
  200. // Given
  201. let expectedSize = 1
  202. var accumulatedData = Data()
  203. var initialResponse: HTTPURLResponse?
  204. var response: HTTPURLResponse?
  205. var streamOnMain = false
  206. var completeOnMain = false
  207. let didReceiveResponse = expectation(description: "stream should receive response once")
  208. let didReceive = expectation(description: "stream should receive")
  209. let didComplete = expectation(description: "stream should complete")
  210. // When
  211. AF.streamRequest(.bytes(expectedSize))
  212. .onHTTPResponse { response in
  213. initialResponse = response
  214. didReceiveResponse.fulfill()
  215. }
  216. .responseStream { stream in
  217. switch stream.event {
  218. case let .stream(result):
  219. streamOnMain = Thread.isMainThread
  220. switch result {
  221. case let .success(data):
  222. accumulatedData.append(data)
  223. }
  224. didReceive.fulfill()
  225. case let .complete(completion):
  226. completeOnMain = Thread.isMainThread
  227. response = completion.response
  228. didComplete.fulfill()
  229. }
  230. }
  231. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  232. // Then
  233. XCTAssertEqual(response?.statusCode, 200)
  234. XCTAssertEqual(accumulatedData.count, expectedSize)
  235. XCTAssertEqual(initialResponse, response)
  236. XCTAssertTrue(streamOnMain)
  237. XCTAssertTrue(completeOnMain)
  238. }
  239. func testThatDataCanBeStreamedManyTimes() {
  240. // Given
  241. let expectedSize = 1
  242. var initialResponse: HTTPURLResponse?
  243. let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
  244. var firstAccumulatedData = Data()
  245. var firstResponse: HTTPURLResponse?
  246. var firstStreamOnMain = false
  247. var firstCompleteOnMain = false
  248. let firstReceive = expectation(description: "first stream should receive")
  249. let firstCompletion = expectation(description: "first stream should complete")
  250. var secondAccumulatedData = Data()
  251. var secondResponse: HTTPURLResponse?
  252. var secondStreamOnMain = false
  253. var secondCompleteOnMain = false
  254. let secondReceive = expectation(description: "second stream should receive")
  255. let secondCompletion = expectation(description: "second stream should complete")
  256. // When
  257. AF.streamRequest(.bytes(expectedSize))
  258. .onHTTPResponse { response in
  259. initialResponse = response
  260. onHTTPResponse.fulfill()
  261. }
  262. .responseStream { stream in
  263. switch stream.event {
  264. case let .stream(result):
  265. firstStreamOnMain = Thread.isMainThread
  266. switch result {
  267. case let .success(data):
  268. firstAccumulatedData.append(data)
  269. }
  270. firstReceive.fulfill()
  271. case let .complete(completion):
  272. firstCompleteOnMain = Thread.isMainThread
  273. firstResponse = completion.response
  274. firstCompletion.fulfill()
  275. }
  276. }
  277. .responseStream { stream in
  278. switch stream.event {
  279. case let .stream(result):
  280. secondStreamOnMain = Thread.isMainThread
  281. switch result {
  282. case let .success(data):
  283. secondAccumulatedData.append(data)
  284. }
  285. secondReceive.fulfill()
  286. case let .complete(completion):
  287. secondCompleteOnMain = Thread.isMainThread
  288. secondResponse = completion.response
  289. secondCompletion.fulfill()
  290. }
  291. }
  292. wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  293. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  294. // Then
  295. XCTAssertEqual(initialResponse, firstResponse)
  296. XCTAssertEqual(initialResponse, secondResponse)
  297. XCTAssertTrue(firstStreamOnMain)
  298. XCTAssertTrue(firstCompleteOnMain)
  299. XCTAssertEqual(firstResponse?.statusCode, 200)
  300. XCTAssertEqual(firstAccumulatedData.count, expectedSize)
  301. XCTAssertTrue(secondStreamOnMain)
  302. XCTAssertTrue(secondCompleteOnMain)
  303. XCTAssertEqual(secondResponse?.statusCode, 200)
  304. XCTAssertEqual(secondAccumulatedData.count, expectedSize)
  305. }
  306. func testThatDataCanBeStreamedAndDecodedAtTheSameTime() {
  307. // Given
  308. var initialResponse: HTTPURLResponse?
  309. let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
  310. var firstAccumulatedData = Data()
  311. var firstResponse: HTTPURLResponse?
  312. var firstStreamOnMain = false
  313. var firstCompleteOnMain = false
  314. let firstReceive = expectation(description: "first stream should receive")
  315. let firstCompletion = expectation(description: "first stream should complete")
  316. var decodedResponse: TestResponse?
  317. var decodingError: AFError?
  318. var secondResponse: HTTPURLResponse?
  319. var secondStreamOnMain = false
  320. var secondCompleteOnMain = false
  321. let secondReceive = expectation(description: "second stream should receive")
  322. let secondCompletion = expectation(description: "second stream should complete")
  323. // When
  324. AF.streamRequest(.stream(1))
  325. .onHTTPResponse { response in
  326. initialResponse = response
  327. onHTTPResponse.fulfill()
  328. }
  329. .responseStream { stream in
  330. switch stream.event {
  331. case let .stream(result):
  332. firstStreamOnMain = Thread.isMainThread
  333. switch result {
  334. case let .success(data):
  335. firstAccumulatedData.append(data)
  336. }
  337. firstReceive.fulfill()
  338. case let .complete(completion):
  339. firstCompleteOnMain = Thread.isMainThread
  340. firstResponse = completion.response
  341. firstCompletion.fulfill()
  342. }
  343. }
  344. .responseStreamDecodable(of: TestResponse.self) { stream in
  345. switch stream.event {
  346. case let .stream(result):
  347. secondStreamOnMain = Thread.isMainThread
  348. switch result {
  349. case let .success(value):
  350. decodedResponse = value
  351. case let .failure(error):
  352. decodingError = error
  353. }
  354. secondReceive.fulfill()
  355. case let .complete(completion):
  356. secondCompleteOnMain = Thread.isMainThread
  357. secondResponse = completion.response
  358. secondCompletion.fulfill()
  359. }
  360. }
  361. wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  362. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  363. // Then
  364. XCTAssertEqual(initialResponse, firstResponse)
  365. XCTAssertEqual(initialResponse, secondResponse)
  366. XCTAssertTrue(firstStreamOnMain)
  367. XCTAssertTrue(firstCompleteOnMain)
  368. XCTAssertEqual(firstResponse?.statusCode, 200)
  369. XCTAssertTrue(!firstAccumulatedData.isEmpty)
  370. XCTAssertTrue(secondStreamOnMain)
  371. XCTAssertTrue(secondCompleteOnMain)
  372. XCTAssertEqual(secondResponse?.statusCode, 200)
  373. XCTAssertNotNil(decodedResponse)
  374. XCTAssertNil(decodingError)
  375. }
  376. #if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
  377. func testThatDataStreamRequestProducesWorkingInputStream() {
  378. // Given
  379. let expect = expectation(description: "stream complete")
  380. // When
  381. let stream = AF.streamRequest(.xml)
  382. .responseStream { stream in
  383. switch stream.event {
  384. case .complete:
  385. expect.fulfill()
  386. default: break
  387. }
  388. }
  389. .asInputStream()
  390. waitForExpectations(timeout: timeout)
  391. // Then
  392. let parser = XMLParser(stream: stream!)
  393. let parsed = parser.parse()
  394. XCTAssertTrue(parsed)
  395. XCTAssertNil(parser.parserError)
  396. }
  397. #endif
  398. func testThatDataStreamCanBeManuallyResumed() {
  399. // Given
  400. let session = Session(startRequestsImmediately: false)
  401. var response: HTTPURLResponse?
  402. var streamOnMain = false
  403. var completeOnMain = false
  404. let didReceive = expectation(description: "stream did receive")
  405. let didComplete = expectation(description: "stream complete")
  406. // When
  407. session.streamRequest(.stream(1))
  408. .responseStream { stream in
  409. switch stream.event {
  410. case .stream:
  411. streamOnMain = Thread.isMainThread
  412. didReceive.fulfill()
  413. case let .complete(completion):
  414. completeOnMain = Thread.isMainThread
  415. response = completion.response
  416. didComplete.fulfill()
  417. }
  418. }.resume()
  419. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  420. // Then
  421. XCTAssertTrue(streamOnMain)
  422. XCTAssertTrue(completeOnMain)
  423. XCTAssertEqual(response?.statusCode, 200)
  424. }
  425. func testThatDataStreamIsAutomaticallyCanceledOnStreamErrorWhenEnabled() {
  426. var response: HTTPURLResponse?
  427. var complete: DataStreamRequest.Completion?
  428. let didComplete = expectation(description: "stream complete")
  429. // When
  430. AF.streamRequest(.bytes(50), automaticallyCancelOnStreamError: true)
  431. .responseStreamDecodable(of: TestResponse.self) { stream in
  432. switch stream.event {
  433. case let .complete(completion):
  434. complete = completion
  435. response = completion.response
  436. didComplete.fulfill()
  437. default: break
  438. }
  439. }
  440. waitForExpectations(timeout: timeout)
  441. // Then
  442. XCTAssertEqual(response?.statusCode, 200)
  443. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == true,
  444. "error is not explicitly cancelled but \(complete?.error?.localizedDescription ?? "None")")
  445. }
  446. func testThatDataStreamIsAutomaticallyCanceledOnStreamClosureError() {
  447. // Given
  448. enum LocalError: Error { case failed }
  449. var response: HTTPURLResponse?
  450. var complete: DataStreamRequest.Completion?
  451. let didReceive = expectation(description: "stream did receieve")
  452. let didComplete = expectation(description: "stream complete")
  453. // When
  454. AF.streamRequest(.bytes(50))
  455. .responseStream { stream in
  456. switch stream.event {
  457. case .stream:
  458. didReceive.fulfill()
  459. throw LocalError.failed
  460. case let .complete(completion):
  461. complete = completion
  462. response = completion.response
  463. didComplete.fulfill()
  464. }
  465. }
  466. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  467. // Then
  468. XCTAssertEqual(response?.statusCode, 200)
  469. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == false)
  470. }
  471. func testThatDataStreamCanBeCancelledInClosure() {
  472. // Given
  473. // Use .main so that completion can't beat cancellation.
  474. let session = Session(rootQueue: .main)
  475. var completion: DataStreamRequest.Completion?
  476. let didReceive = expectation(description: "stream should receive")
  477. let didComplete = expectation(description: "stream should complete")
  478. // When
  479. session.streamRequest(.bytes(1)).responseStream { stream in
  480. switch stream.event {
  481. case .stream:
  482. didReceive.fulfill()
  483. stream.cancel()
  484. case .complete:
  485. completion = stream.completion
  486. didComplete.fulfill()
  487. }
  488. }
  489. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  490. // Then
  491. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  492. """
  493. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  494. response is: \(completion?.response?.description ?? "none").
  495. """)
  496. }
  497. func testThatDataStreamCanBeCancelledByToken() {
  498. // Given
  499. // Use .main so that completion can't beat cancellation.
  500. let session = Session(rootQueue: .main)
  501. var completion: DataStreamRequest.Completion?
  502. let didReceive = expectation(description: "stream should receive")
  503. let didComplete = expectation(description: "stream should complete")
  504. // When
  505. session.streamRequest(.bytes(1)).responseStream { stream in
  506. switch stream.event {
  507. case .stream:
  508. didReceive.fulfill()
  509. stream.token.cancel()
  510. case .complete:
  511. completion = stream.completion
  512. didComplete.fulfill()
  513. }
  514. }
  515. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  516. // Then
  517. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  518. """
  519. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  520. response is: \(completion?.response?.description ?? "none").
  521. """)
  522. }
  523. func testThatOnHTTPResponseCanContinueStream() {
  524. // Given
  525. let expectedSize = 5
  526. var accumulatedData = Data()
  527. var initialResponse: HTTPURLResponse?
  528. var response: HTTPURLResponse?
  529. var streamOnMain = false
  530. var completeOnMain = false
  531. let didReceiveResponse = expectation(description: "stream should receive response once")
  532. let didReceive = expectation(description: "stream should receive once")
  533. let didComplete = expectation(description: "stream should complete")
  534. // When
  535. AF.streamRequest(.bytes(expectedSize))
  536. .onHTTPResponse { response, completionHandler in
  537. initialResponse = response
  538. didReceiveResponse.fulfill()
  539. completionHandler(.allow)
  540. }
  541. .responseStream { stream in
  542. switch stream.event {
  543. case let .stream(result):
  544. streamOnMain = Thread.isMainThread
  545. switch result {
  546. case let .success(data):
  547. accumulatedData.append(data)
  548. }
  549. didReceive.fulfill()
  550. case let .complete(completion):
  551. completeOnMain = Thread.isMainThread
  552. response = completion.response
  553. didComplete.fulfill()
  554. }
  555. }
  556. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  557. // Then
  558. XCTAssertEqual(response?.statusCode, 200)
  559. XCTAssertEqual(initialResponse, response)
  560. XCTAssertEqual(accumulatedData.count, expectedSize)
  561. XCTAssertTrue(streamOnMain)
  562. XCTAssertTrue(completeOnMain)
  563. }
  564. func testThatOnHTTPResponseCanCancelStream() {
  565. // Given
  566. let expectedSize = 5
  567. var initialResponse: HTTPURLResponse?
  568. var response: HTTPURLResponse?
  569. var completion: DataStreamRequest.Completion?
  570. var didCompleteOnMain = false
  571. let didReceiveResponse = expectation(description: "stream should receive response once")
  572. let didComplete = expectation(description: "stream should complete")
  573. // When
  574. AF.streamRequest(.bytes(expectedSize))
  575. .onHTTPResponse { response, completionHandler in
  576. initialResponse = response
  577. didReceiveResponse.fulfill()
  578. completionHandler(.cancel)
  579. }
  580. .responseStream { stream in
  581. switch stream.event {
  582. case .stream:
  583. XCTFail("should never receive stream in a cancelled request")
  584. case let .complete(comp):
  585. didCompleteOnMain = Thread.isMainThread
  586. completion = comp
  587. response = comp.response
  588. didComplete.fulfill()
  589. }
  590. }
  591. wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
  592. // Then
  593. XCTAssertEqual(response?.statusCode, 200)
  594. XCTAssertEqual(initialResponse, response)
  595. XCTAssertTrue(didCompleteOnMain)
  596. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled stream should be explicitly cancelled")
  597. }
  598. }
  599. // MARK: - Serialization Tests
  600. final class DataStreamSerializationTests: BaseTestCase {
  601. func testThatDataStreamsCanBeAString() {
  602. // Given
  603. var responseString: String?
  604. var streamOnMain = false
  605. var completeOnMain = false
  606. var response: HTTPURLResponse?
  607. let didStream = expectation(description: "did stream")
  608. let didComplete = expectation(description: "stream complete")
  609. // When
  610. AF.streamRequest(.stream(1))
  611. .responseStreamString { stream in
  612. switch stream.event {
  613. case let .stream(result):
  614. streamOnMain = Thread.isMainThread
  615. switch result {
  616. case let .success(string):
  617. responseString = string
  618. }
  619. didStream.fulfill()
  620. case let .complete(completion):
  621. completeOnMain = Thread.isMainThread
  622. response = completion.response
  623. didComplete.fulfill()
  624. }
  625. }
  626. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  627. // Then
  628. XCTAssertTrue(streamOnMain)
  629. XCTAssertTrue(completeOnMain)
  630. XCTAssertNotNil(responseString)
  631. XCTAssertEqual(response?.statusCode, 200)
  632. }
  633. func testThatDataStreamsCanBeAStringOnAnArbitraryQueue() {
  634. // Given
  635. var responseString: String?
  636. var response: HTTPURLResponse?
  637. let streamQueue = DispatchQueue(label: "com.alamofire.tests.ArbitraryQueue")
  638. let didStream = expectation(description: "did stream")
  639. let didComplete = expectation(description: "stream complete")
  640. // When
  641. AF.streamRequest(.stream(1))
  642. .responseStreamString(on: streamQueue) { stream in
  643. dispatchPrecondition(condition: .onQueue(streamQueue))
  644. switch stream.event {
  645. case let .stream(result):
  646. switch result {
  647. case let .success(string):
  648. responseString = string
  649. }
  650. didStream.fulfill()
  651. case let .complete(completion):
  652. response = completion.response
  653. didComplete.fulfill()
  654. }
  655. }
  656. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  657. // Then
  658. XCTAssertNotNil(responseString)
  659. XCTAssertEqual(response?.statusCode, 200)
  660. }
  661. func testThatDataStreamsCanBeDecoded() {
  662. // Given
  663. var response: TestResponse?
  664. var httpResponse: HTTPURLResponse?
  665. var decodingError: AFError?
  666. var streamOnMain = false
  667. var completeOnMain = false
  668. let didReceive = expectation(description: "stream did receive")
  669. let didComplete = expectation(description: "stream complete")
  670. // When
  671. AF.streamRequest(.stream(1))
  672. .responseStreamDecodable(of: TestResponse.self) { stream in
  673. switch stream.event {
  674. case let .stream(result):
  675. streamOnMain = Thread.isMainThread
  676. switch result {
  677. case let .success(value):
  678. response = value
  679. case let .failure(error):
  680. decodingError = error
  681. }
  682. didReceive.fulfill()
  683. case let .complete(completion):
  684. completeOnMain = Thread.isMainThread
  685. httpResponse = completion.response
  686. didComplete.fulfill()
  687. }
  688. }
  689. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  690. // Then
  691. XCTAssertTrue(streamOnMain)
  692. XCTAssertTrue(completeOnMain)
  693. XCTAssertNotNil(response)
  694. XCTAssertEqual(httpResponse?.statusCode, 200)
  695. XCTAssertNil(decodingError)
  696. }
  697. func testThatDataStreamsCanBeDecodedOnAnArbitraryQueue() {
  698. // Given
  699. var response: TestResponse?
  700. var httpResponse: HTTPURLResponse?
  701. var decodingError: AFError?
  702. let streamQueue = DispatchQueue(label: "com.alamofire.tests.ArbitraryQueue")
  703. let didReceive = expectation(description: "stream did receive")
  704. let didComplete = expectation(description: "stream complete")
  705. // When
  706. AF.streamRequest(.stream(1))
  707. .responseStreamDecodable(of: TestResponse.self, on: streamQueue) { stream in
  708. dispatchPrecondition(condition: .onQueue(streamQueue))
  709. switch stream.event {
  710. case let .stream(result):
  711. switch result {
  712. case let .success(value):
  713. response = value
  714. case let .failure(error):
  715. decodingError = error
  716. }
  717. didReceive.fulfill()
  718. case let .complete(completion):
  719. httpResponse = completion.response
  720. didComplete.fulfill()
  721. }
  722. }
  723. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  724. // Then
  725. XCTAssertNotNil(response)
  726. XCTAssertEqual(httpResponse?.statusCode, 200)
  727. XCTAssertNil(decodingError)
  728. }
  729. func testThatDataStreamSerializerCanBeUsedDirectly() {
  730. // Given
  731. var response: HTTPURLResponse?
  732. var decodedResponse: TestResponse?
  733. var decodingError: AFError?
  734. var streamOnMain = false
  735. var completeOnMain = false
  736. let serializer = DecodableStreamSerializer<TestResponse>()
  737. let didReceive = expectation(description: "stream did receive")
  738. let didComplete = expectation(description: "stream complete")
  739. // When
  740. AF.streamRequest(.stream(1))
  741. .responseStream(using: serializer) { stream in
  742. switch stream.event {
  743. case let .stream(result):
  744. streamOnMain = Thread.isMainThread
  745. switch result {
  746. case let .success(value):
  747. decodedResponse = value
  748. case let .failure(error):
  749. decodingError = error
  750. }
  751. didReceive.fulfill()
  752. case let .complete(completion):
  753. completeOnMain = Thread.isMainThread
  754. response = completion.response
  755. didComplete.fulfill()
  756. }
  757. }
  758. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  759. // Then
  760. XCTAssertTrue(streamOnMain)
  761. XCTAssertTrue(completeOnMain)
  762. XCTAssertNotNil(decodedResponse)
  763. XCTAssertEqual(response?.statusCode, 200)
  764. XCTAssertNil(decodingError)
  765. }
  766. }
  767. // MARK: - Integration Tests
  768. final class DataStreamIntegrationTests: BaseTestCase {
  769. func testThatDataStreamCanFailValidation() {
  770. // Given
  771. var dataSeen = false
  772. var error: AFError?
  773. let didComplete = expectation(description: "stream should complete")
  774. // When
  775. AF.streamRequest(.status(401))
  776. .validate()
  777. .responseStream { stream in
  778. switch stream.event {
  779. case .stream:
  780. dataSeen = true
  781. case let .complete(completion):
  782. error = completion.error
  783. didComplete.fulfill()
  784. }
  785. }
  786. waitForExpectations(timeout: timeout)
  787. // Then
  788. XCTAssertNotNil(error, "error should not be nil")
  789. XCTAssertTrue(error?.isResponseValidationError == true, "error should be response validation error")
  790. XCTAssertFalse(dataSeen, "no data should be seen")
  791. }
  792. func testThatDataStreamsCanBeRetried() {
  793. // Given
  794. final class GoodRetry: RequestInterceptor {
  795. var hasRetried = false
  796. func adapt(_ urlRequest: URLRequest, for session: Session, completion: @escaping (Result<URLRequest, Error>) -> Void) {
  797. if hasRetried {
  798. completion(.success(Endpoint.bytes(1000).urlRequest))
  799. } else {
  800. completion(.success(urlRequest))
  801. }
  802. }
  803. func retry(_ request: Request, for session: Session, dueTo error: Error, completion: @escaping (RetryResult) -> Void) {
  804. hasRetried = true
  805. completion(.retry)
  806. }
  807. }
  808. let session = Session(interceptor: GoodRetry())
  809. var accumulatedData = Data()
  810. var streamOnMain = false
  811. var completeOnMain = false
  812. var response: HTTPURLResponse?
  813. let didReceive = expectation(description: "stream should receive")
  814. let didComplete = expectation(description: "stream should complete")
  815. // When
  816. session.streamRequest(.status(401))
  817. .validate()
  818. .responseStream { stream in
  819. switch stream.event {
  820. case let .stream(result):
  821. streamOnMain = Thread.isMainThread
  822. switch result {
  823. case let .success(data):
  824. accumulatedData.append(data)
  825. }
  826. didReceive.fulfill()
  827. case let .complete(completion):
  828. completeOnMain = Thread.isMainThread
  829. response = completion.response
  830. didComplete.fulfill()
  831. }
  832. }
  833. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  834. // Then
  835. XCTAssertTrue(streamOnMain)
  836. XCTAssertTrue(completeOnMain)
  837. XCTAssertEqual(accumulatedData.count, 1000)
  838. XCTAssertEqual(response?.statusCode, 200)
  839. }
  840. func testThatDataStreamCanBeRedirected() {
  841. // Given
  842. var response: HTTPURLResponse?
  843. var decodedResponse: TestResponse?
  844. var decodingError: AFError?
  845. var streamOnMain = false
  846. var completeOnMain = false
  847. let didRedirect = expectation(description: "stream redirected")
  848. let redirector = Redirector(behavior: .modify { _, _, _ in
  849. didRedirect.fulfill()
  850. return Endpoint.stream(1).urlRequest
  851. })
  852. let didReceive = expectation(description: "stream should receive")
  853. let didComplete = expectation(description: "stream should complete")
  854. // When
  855. AF.streamRequest(.status(301))
  856. .redirect(using: redirector)
  857. .responseStreamDecodable(of: TestResponse.self) { stream in
  858. switch stream.event {
  859. case let .stream(result):
  860. streamOnMain = Thread.isMainThread
  861. switch result {
  862. case let .success(value):
  863. decodedResponse = value
  864. case let .failure(error):
  865. decodingError = error
  866. }
  867. didReceive.fulfill()
  868. case let .complete(completion):
  869. completeOnMain = Thread.isMainThread
  870. response = completion.response
  871. didComplete.fulfill()
  872. }
  873. }
  874. wait(for: [didRedirect, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  875. // Then
  876. XCTAssertTrue(streamOnMain)
  877. XCTAssertTrue(completeOnMain)
  878. XCTAssertNotNil(decodedResponse)
  879. XCTAssertEqual(response?.statusCode, 200)
  880. XCTAssertNil(decodingError)
  881. }
  882. func testThatDataStreamCallsCachedResponseHandler() {
  883. // Given
  884. var response: HTTPURLResponse?
  885. var decodedResponse: TestResponse?
  886. var decodingError: AFError?
  887. var streamOnMain = false
  888. var completeOnMain = false
  889. let cached = expectation(description: "stream called cacher")
  890. let cacher = ResponseCacher(behavior: .modify { _, _ in
  891. cached.fulfill()
  892. return nil
  893. })
  894. let didReceive = expectation(description: "stream did receive")
  895. let didComplete = expectation(description: "stream complete")
  896. // When
  897. AF.streamRequest(.stream(1))
  898. .cacheResponse(using: cacher)
  899. .responseStreamDecodable(of: TestResponse.self) { stream in
  900. switch stream.event {
  901. case let .stream(result):
  902. streamOnMain = Thread.isMainThread
  903. switch result {
  904. case let .success(value):
  905. decodedResponse = value
  906. case let .failure(error):
  907. decodingError = error
  908. }
  909. didReceive.fulfill()
  910. case let .complete(completion):
  911. completeOnMain = Thread.isMainThread
  912. response = completion.response
  913. didComplete.fulfill()
  914. }
  915. }
  916. // willCacheResponse called after receiving all Data, so may be called before or after the asynchronous stream
  917. // handlers.
  918. wait(for: [cached], timeout: timeout)
  919. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  920. // Then
  921. XCTAssertTrue(streamOnMain)
  922. XCTAssertTrue(completeOnMain)
  923. XCTAssertNotNil(decodedResponse)
  924. XCTAssertEqual(response?.statusCode, 200)
  925. XCTAssertNil(decodingError)
  926. }
  927. func testThatDataStreamWorksCorrectlyWithMultipleSerialQueues() {
  928. // Given
  929. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue")
  930. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue")
  931. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  932. var firstResponse: HTTPURLResponse?
  933. var firstDecodedResponse: TestResponse?
  934. var firstDecodingError: AFError?
  935. var firstStreamOnMain = false
  936. var firstCompleteOnMain = false
  937. let firstStream = expectation(description: "first stream")
  938. let firstDidReceive = expectation(description: "first stream did receive")
  939. let firstDidComplete = expectation(description: "first stream complete")
  940. var secondResponse: HTTPURLResponse?
  941. var secondDecodedResponse: TestResponse?
  942. var secondDecodingError: AFError?
  943. var secondStreamOnMain = false
  944. var secondCompleteOnMain = false
  945. let secondStream = expectation(description: "second stream")
  946. let secondDidReceive = expectation(description: "second stream did receive")
  947. let secondDidComplete = expectation(description: "second stream complete")
  948. // When
  949. session.streamRequest(.stream(1))
  950. .responseStreamDecodable(of: TestResponse.self) { stream in
  951. switch stream.event {
  952. case let .stream(result):
  953. firstStreamOnMain = Thread.isMainThread
  954. switch result {
  955. case let .success(value):
  956. firstDecodedResponse = value
  957. case let .failure(error):
  958. firstDecodingError = error
  959. }
  960. firstStream.fulfill()
  961. firstDidReceive.fulfill()
  962. case let .complete(completion):
  963. firstCompleteOnMain = Thread.isMainThread
  964. firstResponse = completion.response
  965. firstDidComplete.fulfill()
  966. }
  967. }
  968. .responseStreamDecodable(of: TestResponse.self) { stream in
  969. switch stream.event {
  970. case let .stream(result):
  971. secondStreamOnMain = Thread.isMainThread
  972. switch result {
  973. case let .success(value):
  974. secondDecodedResponse = value
  975. case let .failure(error):
  976. secondDecodingError = error
  977. }
  978. secondStream.fulfill()
  979. secondDidReceive.fulfill()
  980. case let .complete(completion):
  981. secondCompleteOnMain = Thread.isMainThread
  982. secondResponse = completion.response
  983. secondDidComplete.fulfill()
  984. }
  985. }
  986. wait(for: [firstStream, secondStream], timeout: timeout, enforceOrder: true)
  987. // Cannot test order of completion events, as one may have been enqueued while the other executed directly.
  988. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  989. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  990. // Then
  991. XCTAssertTrue(firstStreamOnMain)
  992. XCTAssertTrue(firstCompleteOnMain)
  993. XCTAssertNotNil(firstDecodedResponse)
  994. XCTAssertEqual(firstResponse?.statusCode, 200)
  995. XCTAssertNil(firstDecodingError)
  996. XCTAssertTrue(secondStreamOnMain)
  997. XCTAssertTrue(secondCompleteOnMain)
  998. XCTAssertNotNil(secondDecodedResponse)
  999. XCTAssertEqual(secondResponse?.statusCode, 200)
  1000. XCTAssertNil(secondDecodingError)
  1001. }
  1002. func testThatDataStreamWorksCorrectlyWithMultipleConcurrentQueues() {
  1003. // Given
  1004. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue", attributes: .concurrent)
  1005. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue", attributes: .concurrent)
  1006. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  1007. var firstResponse: HTTPURLResponse?
  1008. var firstDecodedResponse: TestResponse?
  1009. var firstDecodingError: AFError?
  1010. var firstStreamOnMain = false
  1011. var firstCompleteOnMain = false
  1012. let firstDidReceive = expectation(description: "first stream did receive")
  1013. let firstDidComplete = expectation(description: "first stream complete")
  1014. var secondResponse: HTTPURLResponse?
  1015. var secondDecodedResponse: TestResponse?
  1016. var secondDecodingError: AFError?
  1017. var secondStreamOnMain = false
  1018. var secondCompleteOnMain = false
  1019. let secondDidReceive = expectation(description: "second stream did receive")
  1020. let secondDidComplete = expectation(description: "second stream complete")
  1021. // When
  1022. session.streamRequest(.stream(1))
  1023. .responseStreamDecodable(of: TestResponse.self) { stream in
  1024. switch stream.event {
  1025. case let .stream(result):
  1026. firstStreamOnMain = Thread.isMainThread
  1027. switch result {
  1028. case let .success(value):
  1029. firstDecodedResponse = value
  1030. case let .failure(error):
  1031. firstDecodingError = error
  1032. }
  1033. firstDidReceive.fulfill()
  1034. case let .complete(completion):
  1035. firstCompleteOnMain = Thread.isMainThread
  1036. firstResponse = completion.response
  1037. firstDidComplete.fulfill()
  1038. }
  1039. }
  1040. .responseStreamDecodable(of: TestResponse.self) { stream in
  1041. switch stream.event {
  1042. case let .stream(result):
  1043. secondStreamOnMain = Thread.isMainThread
  1044. switch result {
  1045. case let .success(value):
  1046. secondDecodedResponse = value
  1047. case let .failure(error):
  1048. secondDecodingError = error
  1049. }
  1050. secondDidReceive.fulfill()
  1051. case let .complete(completion):
  1052. secondCompleteOnMain = Thread.isMainThread
  1053. secondResponse = completion.response
  1054. secondDidComplete.fulfill()
  1055. }
  1056. }
  1057. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  1058. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  1059. // Then
  1060. XCTAssertTrue(firstStreamOnMain)
  1061. XCTAssertTrue(firstCompleteOnMain)
  1062. XCTAssertNotNil(firstDecodedResponse)
  1063. XCTAssertEqual(firstResponse?.statusCode, 200)
  1064. XCTAssertNil(firstDecodingError)
  1065. XCTAssertTrue(secondStreamOnMain)
  1066. XCTAssertTrue(secondCompleteOnMain)
  1067. XCTAssertNotNil(secondDecodedResponse)
  1068. XCTAssertEqual(secondResponse?.statusCode, 200)
  1069. XCTAssertNil(secondDecodingError)
  1070. }
  1071. func testThatDataStreamCanAuthenticate() {
  1072. // Given
  1073. let user = "userstream", password = "password"
  1074. var response: HTTPURLResponse?
  1075. var streamOnMain = false
  1076. var completeOnMain = false
  1077. let didReceive = expectation(description: "stream did receive")
  1078. let didComplete = expectation(description: "stream complete")
  1079. // When
  1080. AF.streamRequest(.basicAuth(forUser: user, password: password))
  1081. .authenticate(username: user, password: password)
  1082. .responseStream { stream in
  1083. switch stream.event {
  1084. case .stream:
  1085. streamOnMain = Thread.isMainThread
  1086. didReceive.fulfill()
  1087. case let .complete(completion):
  1088. completeOnMain = Thread.isMainThread
  1089. response = completion.response
  1090. didComplete.fulfill()
  1091. }
  1092. }
  1093. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  1094. // Then
  1095. XCTAssertTrue(streamOnMain)
  1096. XCTAssertTrue(completeOnMain)
  1097. XCTAssertEqual(response?.statusCode, 200)
  1098. }
  1099. }
  1100. final class DataStreamLifetimeEvents: BaseTestCase {
  1101. func testThatDataStreamRequestHasAppropriateLifetimeEvents() {
  1102. // Given
  1103. final class Monitor: EventMonitor {
  1104. var called: (() -> Void)?
  1105. func request<Value>(_ request: DataStreamRequest, didParseStream result: Result<Value, AFError>) {
  1106. called?()
  1107. }
  1108. }
  1109. let eventMonitor = ClosureEventMonitor()
  1110. let parseMonitor = Monitor()
  1111. let session = Session(eventMonitors: [eventMonitor, parseMonitor])
  1112. // Disable event test until Firewalk supports HTTPS.
  1113. // let didReceiveChallenge = expectation(description: "didReceiveChallenge should fire")
  1114. let taskDidFinishCollecting = expectation(description: "taskDidFinishCollecting should fire")
  1115. let didReceiveData = expectation(description: "didReceiveData should fire")
  1116. let willCacheResponse = expectation(description: "willCacheResponse should fire")
  1117. let didCreateURLRequest = expectation(description: "didCreateInitialURLRequest should fire")
  1118. let didCreateTask = expectation(description: "didCreateTask should fire")
  1119. let didGatherMetrics = expectation(description: "didGatherMetrics should fire")
  1120. let didComplete = expectation(description: "didComplete should fire")
  1121. let didFinish = expectation(description: "didFinish should fire")
  1122. let didResume = expectation(description: "didResume should fire")
  1123. let didResumeTask = expectation(description: "didResumeTask should fire")
  1124. let didValidate = expectation(description: "didValidateRequest should fire")
  1125. didValidate.expectedFulfillmentCount = 2
  1126. let didParse = expectation(description: "streamDidParse should fire")
  1127. let didReceive = expectation(description: "stream should receive")
  1128. let didCompleteStream = expectation(description: "stream should complete")
  1129. var dataReceived = false
  1130. // Disable event test until Firewalk supports HTTPS.
  1131. // eventMonitor.taskDidReceiveChallenge = { _, _, _ in didReceiveChallenge.fulfill() }
  1132. eventMonitor.taskDidFinishCollectingMetrics = { _, _, _ in taskDidFinishCollecting.fulfill() }
  1133. eventMonitor.dataTaskDidReceiveData = { _, _, _ in
  1134. guard !dataReceived else { return }
  1135. // Data may be received many times, fulfill only once.
  1136. dataReceived = true
  1137. didReceiveData.fulfill()
  1138. }
  1139. eventMonitor.dataTaskWillCacheResponse = { _, _, _ in willCacheResponse.fulfill() }
  1140. eventMonitor.requestDidCreateInitialURLRequest = { _, _ in didCreateURLRequest.fulfill() }
  1141. eventMonitor.requestDidCreateTask = { _, _ in didCreateTask.fulfill() }
  1142. eventMonitor.requestDidGatherMetrics = { _, _ in didGatherMetrics.fulfill() }
  1143. eventMonitor.requestDidCompleteTaskWithError = { _, _, _ in didComplete.fulfill() }
  1144. eventMonitor.requestDidFinish = { _ in didFinish.fulfill() }
  1145. eventMonitor.requestDidResume = { _ in didResume.fulfill() }
  1146. eventMonitor.requestDidResumeTask = { _, _ in didResumeTask.fulfill() }
  1147. eventMonitor.requestDidValidateRequestResponseWithResult = { _, _, _, _ in didValidate.fulfill() }
  1148. parseMonitor.called = { didParse.fulfill() }
  1149. // When
  1150. let request = session.streamRequest(.stream(1))
  1151. .validate()
  1152. .responseStreamDecodable(of: TestResponse.self) { stream in
  1153. switch stream.event {
  1154. case .stream:
  1155. didReceive.fulfill()
  1156. case .complete:
  1157. didCompleteStream.fulfill()
  1158. }
  1159. }
  1160. waitForExpectations(timeout: timeout)
  1161. // Then
  1162. XCTAssertEqual(request.state, .finished)
  1163. }
  1164. }