DataStreamTests.swift 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181
  1. //
  2. // DataStreamTests.swift
  3. //
  4. // Copyright (c) 2020 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. import Alamofire
  25. import XCTest
  26. final class DataStreamTests: BaseTestCase {
  27. func testThatDataCanBeStreamedOnMainQueue() {
  28. // Given
  29. let expectedSize = 5
  30. var accumulatedData = Data()
  31. var initialResponse: HTTPURLResponse?
  32. var response: HTTPURLResponse?
  33. var streamOnMain = false
  34. var completeOnMain = false
  35. let didReceiveResponse = expectation(description: "stream should receive response once")
  36. let didReceive = expectation(description: "stream should receive once")
  37. let didComplete = expectation(description: "stream should complete")
  38. // When
  39. AF.streamRequest(.bytes(expectedSize))
  40. .onHTTPResponse { response in
  41. initialResponse = response
  42. didReceiveResponse.fulfill()
  43. }
  44. .responseStream { stream in
  45. switch stream.event {
  46. case let .stream(result):
  47. streamOnMain = Thread.isMainThread
  48. switch result {
  49. case let .success(data):
  50. accumulatedData.append(data)
  51. }
  52. didReceive.fulfill()
  53. case let .complete(completion):
  54. completeOnMain = Thread.isMainThread
  55. response = completion.response
  56. didComplete.fulfill()
  57. }
  58. }
  59. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  60. // Then
  61. XCTAssertEqual(response?.statusCode, 200)
  62. XCTAssertEqual(initialResponse, response)
  63. XCTAssertEqual(accumulatedData.count, expectedSize)
  64. XCTAssertTrue(streamOnMain)
  65. XCTAssertTrue(completeOnMain)
  66. }
  67. func testThatDataCanBeStreamedByByte() throws {
  68. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  69. throw XCTSkip("Older OSes don't return individual bytes.")
  70. }
  71. // Given
  72. let expectedSize = 5
  73. var accumulatedData = Data()
  74. var initialResponse: HTTPURLResponse?
  75. var response: HTTPURLResponse?
  76. var streamOnMain = false
  77. var completeOnMain = false
  78. var streamCalled = 0
  79. let didReceiveResponse = expectation(description: "stream should receive response once")
  80. let didReceive = expectation(description: "stream should receive once")
  81. didReceive.expectedFulfillmentCount = expectedSize
  82. let didComplete = expectation(description: "stream should complete")
  83. // When
  84. AF.streamRequest(.chunked(expectedSize))
  85. .onHTTPResponse { response in
  86. initialResponse = response
  87. didReceiveResponse.fulfill()
  88. }
  89. .responseStream { stream in
  90. switch stream.event {
  91. case let .stream(result):
  92. streamOnMain = Thread.isMainThread
  93. switch result {
  94. case let .success(data):
  95. accumulatedData.append(data)
  96. }
  97. streamCalled += 1
  98. didReceive.fulfill()
  99. case let .complete(completion):
  100. completeOnMain = Thread.isMainThread
  101. response = completion.response
  102. didComplete.fulfill()
  103. }
  104. }
  105. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  106. // Then
  107. XCTAssertEqual(response?.statusCode, 200)
  108. XCTAssertEqual(streamCalled, expectedSize)
  109. XCTAssertEqual(initialResponse, response)
  110. XCTAssertEqual(accumulatedData.count, expectedSize)
  111. XCTAssertTrue(streamOnMain)
  112. XCTAssertTrue(completeOnMain)
  113. }
  114. func testThatDataCanBeStreamedAsMultipleJSONPayloads() throws {
  115. guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
  116. throw XCTSkip("Older OSes do not separate chunked payloads in callbacks.")
  117. }
  118. // Given
  119. let expectedSize = 5
  120. var responses: [TestResponse] = []
  121. var initialResponse: HTTPURLResponse?
  122. var response: HTTPURLResponse?
  123. var streamOnMain = false
  124. var completeOnMain = false
  125. var streamCalled = 0
  126. let didReceiveResponse = expectation(description: "stream should receive response once")
  127. let didReceive = expectation(description: "stream should receive once")
  128. didReceive.expectedFulfillmentCount = expectedSize
  129. let didComplete = expectation(description: "stream should complete")
  130. // When
  131. AF.streamRequest(.payloads(expectedSize))
  132. .onHTTPResponse { response in
  133. initialResponse = response
  134. didReceiveResponse.fulfill()
  135. }
  136. .responseStreamDecodable(of: TestResponse.self) { stream in
  137. switch stream.event {
  138. case let .stream(result):
  139. streamOnMain = Thread.isMainThread
  140. switch result {
  141. case let .success(value):
  142. responses.append(value)
  143. case let .failure(error):
  144. XCTFail("JSON stream failed due to error: \(error.localizedDescription)")
  145. }
  146. streamCalled += 1
  147. didReceive.fulfill()
  148. case let .complete(completion):
  149. completeOnMain = Thread.isMainThread
  150. response = completion.response
  151. didComplete.fulfill()
  152. }
  153. }
  154. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  155. // Then
  156. XCTAssertEqual(response?.statusCode, 200)
  157. XCTAssertEqual(streamCalled, expectedSize)
  158. XCTAssertEqual(responses.count, expectedSize)
  159. XCTAssertEqual(initialResponse, response)
  160. XCTAssertTrue(streamOnMain)
  161. XCTAssertTrue(completeOnMain)
  162. }
  163. func testThatDataCanBeStreamedFromURL() {
  164. // Given
  165. let expectedSize = 1
  166. var accumulatedData = Data()
  167. var initialResponse: HTTPURLResponse?
  168. var response: HTTPURLResponse?
  169. var streamOnMain = false
  170. var completeOnMain = false
  171. let didReceiveResponse = expectation(description: "stream should receive response once")
  172. let didReceive = expectation(description: "stream should receive")
  173. let didComplete = expectation(description: "stream should complete")
  174. // When
  175. AF.streamRequest(.bytes(expectedSize))
  176. .onHTTPResponse { response in
  177. initialResponse = response
  178. didReceiveResponse.fulfill()
  179. }
  180. .responseStream { stream in
  181. switch stream.event {
  182. case let .stream(result):
  183. streamOnMain = Thread.isMainThread
  184. switch result {
  185. case let .success(data):
  186. accumulatedData.append(data)
  187. }
  188. didReceive.fulfill()
  189. case let .complete(completion):
  190. completeOnMain = Thread.isMainThread
  191. response = completion.response
  192. didComplete.fulfill()
  193. }
  194. }
  195. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  196. // Then
  197. XCTAssertEqual(response?.statusCode, 200)
  198. XCTAssertEqual(accumulatedData.count, expectedSize)
  199. XCTAssertEqual(initialResponse, response)
  200. XCTAssertTrue(streamOnMain)
  201. XCTAssertTrue(completeOnMain)
  202. }
  203. func testThatDataCanBeStreamedManyTimes() {
  204. // Given
  205. let expectedSize = 1
  206. var initialResponse: HTTPURLResponse?
  207. let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
  208. var firstAccumulatedData = Data()
  209. var firstResponse: HTTPURLResponse?
  210. var firstStreamOnMain = false
  211. var firstCompleteOnMain = false
  212. let firstReceive = expectation(description: "first stream should receive")
  213. let firstCompletion = expectation(description: "first stream should complete")
  214. var secondAccumulatedData = Data()
  215. var secondResponse: HTTPURLResponse?
  216. var secondStreamOnMain = false
  217. var secondCompleteOnMain = false
  218. let secondReceive = expectation(description: "second stream should receive")
  219. let secondCompletion = expectation(description: "second stream should complete")
  220. // When
  221. AF.streamRequest(.bytes(expectedSize))
  222. .onHTTPResponse { response in
  223. initialResponse = response
  224. onHTTPResponse.fulfill()
  225. }
  226. .responseStream { stream in
  227. switch stream.event {
  228. case let .stream(result):
  229. firstStreamOnMain = Thread.isMainThread
  230. switch result {
  231. case let .success(data):
  232. firstAccumulatedData.append(data)
  233. }
  234. firstReceive.fulfill()
  235. case let .complete(completion):
  236. firstCompleteOnMain = Thread.isMainThread
  237. firstResponse = completion.response
  238. firstCompletion.fulfill()
  239. }
  240. }
  241. .responseStream { stream in
  242. switch stream.event {
  243. case let .stream(result):
  244. secondStreamOnMain = Thread.isMainThread
  245. switch result {
  246. case let .success(data):
  247. secondAccumulatedData.append(data)
  248. }
  249. secondReceive.fulfill()
  250. case let .complete(completion):
  251. secondCompleteOnMain = Thread.isMainThread
  252. secondResponse = completion.response
  253. secondCompletion.fulfill()
  254. }
  255. }
  256. wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  257. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  258. // Then
  259. XCTAssertEqual(initialResponse, firstResponse)
  260. XCTAssertEqual(initialResponse, secondResponse)
  261. XCTAssertTrue(firstStreamOnMain)
  262. XCTAssertTrue(firstCompleteOnMain)
  263. XCTAssertEqual(firstResponse?.statusCode, 200)
  264. XCTAssertEqual(firstAccumulatedData.count, expectedSize)
  265. XCTAssertTrue(secondStreamOnMain)
  266. XCTAssertTrue(secondCompleteOnMain)
  267. XCTAssertEqual(secondResponse?.statusCode, 200)
  268. XCTAssertEqual(secondAccumulatedData.count, expectedSize)
  269. }
  270. func testThatDataCanBeStreamedAndDecodedAtTheSameTime() {
  271. // Given
  272. var initialResponse: HTTPURLResponse?
  273. let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
  274. var firstAccumulatedData = Data()
  275. var firstResponse: HTTPURLResponse?
  276. var firstStreamOnMain = false
  277. var firstCompleteOnMain = false
  278. let firstReceive = expectation(description: "first stream should receive")
  279. let firstCompletion = expectation(description: "first stream should complete")
  280. var decodedResponse: TestResponse?
  281. var decodingError: AFError?
  282. var secondResponse: HTTPURLResponse?
  283. var secondStreamOnMain = false
  284. var secondCompleteOnMain = false
  285. let secondReceive = expectation(description: "second stream should receive")
  286. let secondCompletion = expectation(description: "second stream should complete")
  287. // When
  288. AF.streamRequest(.stream(1))
  289. .onHTTPResponse { response in
  290. initialResponse = response
  291. onHTTPResponse.fulfill()
  292. }
  293. .responseStream { stream in
  294. switch stream.event {
  295. case let .stream(result):
  296. firstStreamOnMain = Thread.isMainThread
  297. switch result {
  298. case let .success(data):
  299. firstAccumulatedData.append(data)
  300. }
  301. firstReceive.fulfill()
  302. case let .complete(completion):
  303. firstCompleteOnMain = Thread.isMainThread
  304. firstResponse = completion.response
  305. firstCompletion.fulfill()
  306. }
  307. }
  308. .responseStreamDecodable(of: TestResponse.self) { stream in
  309. switch stream.event {
  310. case let .stream(result):
  311. secondStreamOnMain = Thread.isMainThread
  312. switch result {
  313. case let .success(value):
  314. decodedResponse = value
  315. case let .failure(error):
  316. decodingError = error
  317. }
  318. secondReceive.fulfill()
  319. case let .complete(completion):
  320. secondCompleteOnMain = Thread.isMainThread
  321. secondResponse = completion.response
  322. secondCompletion.fulfill()
  323. }
  324. }
  325. wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  326. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  327. // Then
  328. XCTAssertEqual(initialResponse, firstResponse)
  329. XCTAssertEqual(initialResponse, secondResponse)
  330. XCTAssertTrue(firstStreamOnMain)
  331. XCTAssertTrue(firstCompleteOnMain)
  332. XCTAssertEqual(firstResponse?.statusCode, 200)
  333. XCTAssertTrue(!firstAccumulatedData.isEmpty)
  334. XCTAssertTrue(secondStreamOnMain)
  335. XCTAssertTrue(secondCompleteOnMain)
  336. XCTAssertEqual(secondResponse?.statusCode, 200)
  337. XCTAssertNotNil(decodedResponse)
  338. XCTAssertNil(decodingError)
  339. }
  340. #if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
  341. func testThatDataStreamRequestProducesWorkingInputStream() {
  342. // Given
  343. let expect = expectation(description: "stream complete")
  344. // When
  345. let stream = AF.streamRequest(.xml)
  346. .responseStream { stream in
  347. switch stream.event {
  348. case .complete:
  349. expect.fulfill()
  350. default: break
  351. }
  352. }
  353. .asInputStream()
  354. waitForExpectations(timeout: timeout)
  355. // Then
  356. let parser = XMLParser(stream: stream!)
  357. let parsed = parser.parse()
  358. XCTAssertTrue(parsed)
  359. XCTAssertNil(parser.parserError)
  360. }
  361. #endif
  362. func testThatDataStreamCanBeManuallyResumed() {
  363. // Given
  364. let session = Session(startRequestsImmediately: false)
  365. var response: HTTPURLResponse?
  366. var streamOnMain = false
  367. var completeOnMain = false
  368. let didReceive = expectation(description: "stream did receive")
  369. let didComplete = expectation(description: "stream complete")
  370. // When
  371. session.streamRequest(.stream(1))
  372. .responseStream { stream in
  373. switch stream.event {
  374. case .stream:
  375. streamOnMain = Thread.isMainThread
  376. didReceive.fulfill()
  377. case let .complete(completion):
  378. completeOnMain = Thread.isMainThread
  379. response = completion.response
  380. didComplete.fulfill()
  381. }
  382. }.resume()
  383. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  384. // Then
  385. XCTAssertTrue(streamOnMain)
  386. XCTAssertTrue(completeOnMain)
  387. XCTAssertEqual(response?.statusCode, 200)
  388. }
  389. func testThatDataStreamIsAutomaticallyCanceledOnStreamErrorWhenEnabled() {
  390. var response: HTTPURLResponse?
  391. var complete: DataStreamRequest.Completion?
  392. let didComplete = expectation(description: "stream complete")
  393. // When
  394. AF.streamRequest(.bytes(50), automaticallyCancelOnStreamError: true)
  395. .responseStreamDecodable(of: TestResponse.self) { stream in
  396. switch stream.event {
  397. case let .complete(completion):
  398. complete = completion
  399. response = completion.response
  400. didComplete.fulfill()
  401. default: break
  402. }
  403. }
  404. waitForExpectations(timeout: timeout)
  405. // Then
  406. XCTAssertEqual(response?.statusCode, 200)
  407. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == true,
  408. "error is not explicitly cancelled but \(complete?.error?.localizedDescription ?? "None")")
  409. }
  410. func testThatDataStreamIsAutomaticallyCanceledOnStreamClosureError() {
  411. // Given
  412. enum LocalError: Error { case failed }
  413. var response: HTTPURLResponse?
  414. var complete: DataStreamRequest.Completion?
  415. let didReceive = expectation(description: "stream did receieve")
  416. let didComplete = expectation(description: "stream complete")
  417. // When
  418. AF.streamRequest(.bytes(50))
  419. .responseStream { stream in
  420. switch stream.event {
  421. case .stream:
  422. didReceive.fulfill()
  423. throw LocalError.failed
  424. case let .complete(completion):
  425. complete = completion
  426. response = completion.response
  427. didComplete.fulfill()
  428. }
  429. }
  430. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  431. // Then
  432. XCTAssertEqual(response?.statusCode, 200)
  433. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == false)
  434. }
  435. func testThatDataStreamCanBeCancelledInClosure() {
  436. // Given
  437. // Use .main so that completion can't beat cancellation.
  438. let session = Session(rootQueue: .main)
  439. var completion: DataStreamRequest.Completion?
  440. let didReceive = expectation(description: "stream should receive")
  441. let didComplete = expectation(description: "stream should complete")
  442. // When
  443. session.streamRequest(.bytes(1)).responseStream { stream in
  444. switch stream.event {
  445. case .stream:
  446. didReceive.fulfill()
  447. stream.cancel()
  448. case .complete:
  449. completion = stream.completion
  450. didComplete.fulfill()
  451. }
  452. }
  453. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  454. // Then
  455. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  456. """
  457. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  458. response is: \(completion?.response?.description ?? "none").
  459. """)
  460. }
  461. func testThatDataStreamCanBeCancelledByToken() {
  462. // Given
  463. // Use .main so that completion can't beat cancellation.
  464. let session = Session(rootQueue: .main)
  465. var completion: DataStreamRequest.Completion?
  466. let didReceive = expectation(description: "stream should receive")
  467. let didComplete = expectation(description: "stream should complete")
  468. // When
  469. session.streamRequest(.bytes(1)).responseStream { stream in
  470. switch stream.event {
  471. case .stream:
  472. didReceive.fulfill()
  473. stream.token.cancel()
  474. case .complete:
  475. completion = stream.completion
  476. didComplete.fulfill()
  477. }
  478. }
  479. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  480. // Then
  481. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  482. """
  483. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  484. response is: \(completion?.response?.description ?? "none").
  485. """)
  486. }
  487. func testThatOnHTTPResponseCanContinueStream() {
  488. // Given
  489. let expectedSize = 5
  490. var accumulatedData = Data()
  491. var initialResponse: HTTPURLResponse?
  492. var response: HTTPURLResponse?
  493. var streamOnMain = false
  494. var completeOnMain = false
  495. let didReceiveResponse = expectation(description: "stream should receive response once")
  496. let didReceive = expectation(description: "stream should receive once")
  497. let didComplete = expectation(description: "stream should complete")
  498. // When
  499. AF.streamRequest(.bytes(expectedSize))
  500. .onHTTPResponse { response, completionHandler in
  501. initialResponse = response
  502. didReceiveResponse.fulfill()
  503. completionHandler(.allow)
  504. }
  505. .responseStream { stream in
  506. switch stream.event {
  507. case let .stream(result):
  508. streamOnMain = Thread.isMainThread
  509. switch result {
  510. case let .success(data):
  511. accumulatedData.append(data)
  512. }
  513. didReceive.fulfill()
  514. case let .complete(completion):
  515. completeOnMain = Thread.isMainThread
  516. response = completion.response
  517. didComplete.fulfill()
  518. }
  519. }
  520. wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  521. // Then
  522. XCTAssertEqual(response?.statusCode, 200)
  523. XCTAssertEqual(initialResponse, response)
  524. XCTAssertEqual(accumulatedData.count, expectedSize)
  525. XCTAssertTrue(streamOnMain)
  526. XCTAssertTrue(completeOnMain)
  527. }
  528. func testThatOnHTTPResponseCanCancelStream() {
  529. // Given
  530. let expectedSize = 5
  531. var initialResponse: HTTPURLResponse?
  532. var response: HTTPURLResponse?
  533. var completion: DataStreamRequest.Completion?
  534. var didCompleteOnMain = false
  535. let didReceiveResponse = expectation(description: "stream should receive response once")
  536. let didComplete = expectation(description: "stream should complete")
  537. // When
  538. AF.streamRequest(.bytes(expectedSize))
  539. .onHTTPResponse { response, completionHandler in
  540. initialResponse = response
  541. didReceiveResponse.fulfill()
  542. completionHandler(.cancel)
  543. }
  544. .responseStream { stream in
  545. switch stream.event {
  546. case .stream:
  547. XCTFail("should never receive stream in a cancelled request")
  548. case let .complete(comp):
  549. didCompleteOnMain = Thread.isMainThread
  550. completion = comp
  551. response = comp.response
  552. didComplete.fulfill()
  553. }
  554. }
  555. wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
  556. // Then
  557. XCTAssertEqual(response?.statusCode, 200)
  558. XCTAssertEqual(initialResponse, response)
  559. XCTAssertTrue(didCompleteOnMain)
  560. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled stream should be explicitly cancelled")
  561. }
  562. }
  563. // MARK: - Serialization Tests
  564. final class DataStreamSerializationTests: BaseTestCase {
  565. func testThatDataStreamsCanBeAString() {
  566. // Given
  567. var responseString: String?
  568. var streamOnMain = false
  569. var completeOnMain = false
  570. var response: HTTPURLResponse?
  571. let didStream = expectation(description: "did stream")
  572. let didComplete = expectation(description: "stream complete")
  573. // When
  574. AF.streamRequest(.stream(1))
  575. .responseStreamString { stream in
  576. switch stream.event {
  577. case let .stream(result):
  578. streamOnMain = Thread.isMainThread
  579. switch result {
  580. case let .success(string):
  581. responseString = string
  582. }
  583. didStream.fulfill()
  584. case let .complete(completion):
  585. completeOnMain = Thread.isMainThread
  586. response = completion.response
  587. didComplete.fulfill()
  588. }
  589. }
  590. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  591. // Then
  592. XCTAssertTrue(streamOnMain)
  593. XCTAssertTrue(completeOnMain)
  594. XCTAssertNotNil(responseString)
  595. XCTAssertEqual(response?.statusCode, 200)
  596. }
  597. func testThatDataStreamsCanBeDecoded() {
  598. // Given
  599. var response: TestResponse?
  600. var httpResponse: HTTPURLResponse?
  601. var decodingError: AFError?
  602. var streamOnMain = false
  603. var completeOnMain = false
  604. let didReceive = expectation(description: "stream did receive")
  605. let didComplete = expectation(description: "stream complete")
  606. // When
  607. AF.streamRequest(.stream(1))
  608. .responseStreamDecodable(of: TestResponse.self) { stream in
  609. switch stream.event {
  610. case let .stream(result):
  611. streamOnMain = Thread.isMainThread
  612. switch result {
  613. case let .success(value):
  614. response = value
  615. case let .failure(error):
  616. decodingError = error
  617. }
  618. didReceive.fulfill()
  619. case let .complete(completion):
  620. completeOnMain = Thread.isMainThread
  621. httpResponse = completion.response
  622. didComplete.fulfill()
  623. }
  624. }
  625. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  626. // Then
  627. XCTAssertTrue(streamOnMain)
  628. XCTAssertTrue(completeOnMain)
  629. XCTAssertNotNil(response)
  630. XCTAssertEqual(httpResponse?.statusCode, 200)
  631. XCTAssertNil(decodingError)
  632. }
  633. func testThatDataStreamSerializerCanBeUsedDirectly() {
  634. // Given
  635. var response: HTTPURLResponse?
  636. var decodedResponse: TestResponse?
  637. var decodingError: AFError?
  638. var streamOnMain = false
  639. var completeOnMain = false
  640. let serializer = DecodableStreamSerializer<TestResponse>()
  641. let didReceive = expectation(description: "stream did receive")
  642. let didComplete = expectation(description: "stream complete")
  643. // When
  644. AF.streamRequest(.stream(1))
  645. .responseStream(using: serializer) { stream in
  646. switch stream.event {
  647. case let .stream(result):
  648. streamOnMain = Thread.isMainThread
  649. switch result {
  650. case let .success(value):
  651. decodedResponse = value
  652. case let .failure(error):
  653. decodingError = error
  654. }
  655. didReceive.fulfill()
  656. case let .complete(completion):
  657. completeOnMain = Thread.isMainThread
  658. response = completion.response
  659. didComplete.fulfill()
  660. }
  661. }
  662. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  663. // Then
  664. XCTAssertTrue(streamOnMain)
  665. XCTAssertTrue(completeOnMain)
  666. XCTAssertNotNil(decodedResponse)
  667. XCTAssertEqual(response?.statusCode, 200)
  668. XCTAssertNil(decodingError)
  669. }
  670. }
  671. // MARK: - Integration Tests
  672. final class DataStreamIntegrationTests: BaseTestCase {
  673. func testThatDataStreamCanFailValidation() {
  674. // Given
  675. var dataSeen = false
  676. var error: AFError?
  677. let didComplete = expectation(description: "stream should complete")
  678. // When
  679. AF.streamRequest(.status(401))
  680. .validate()
  681. .responseStream { stream in
  682. switch stream.event {
  683. case .stream:
  684. dataSeen = true
  685. case let .complete(completion):
  686. error = completion.error
  687. didComplete.fulfill()
  688. }
  689. }
  690. waitForExpectations(timeout: timeout)
  691. // Then
  692. XCTAssertNotNil(error, "error should not be nil")
  693. XCTAssertTrue(error?.isResponseValidationError == true, "error should be response validation error")
  694. XCTAssertFalse(dataSeen, "no data should be seen")
  695. }
  696. func testThatDataStreamsCanBeRetried() {
  697. // Given
  698. final class GoodRetry: RequestInterceptor {
  699. var hasRetried = false
  700. func adapt(_ urlRequest: URLRequest, for session: Session, completion: @escaping (Result<URLRequest, Error>) -> Void) {
  701. if hasRetried {
  702. completion(.success(Endpoint.bytes(1000).urlRequest))
  703. } else {
  704. completion(.success(urlRequest))
  705. }
  706. }
  707. func retry(_ request: Request, for session: Session, dueTo error: Error, completion: @escaping (RetryResult) -> Void) {
  708. hasRetried = true
  709. completion(.retry)
  710. }
  711. }
  712. let session = Session(interceptor: GoodRetry())
  713. var accumulatedData = Data()
  714. var streamOnMain = false
  715. var completeOnMain = false
  716. var response: HTTPURLResponse?
  717. let didReceive = expectation(description: "stream should receive")
  718. let didComplete = expectation(description: "stream should complete")
  719. // When
  720. session.streamRequest(.status(401))
  721. .validate()
  722. .responseStream { stream in
  723. switch stream.event {
  724. case let .stream(result):
  725. streamOnMain = Thread.isMainThread
  726. switch result {
  727. case let .success(data):
  728. accumulatedData.append(data)
  729. }
  730. didReceive.fulfill()
  731. case let .complete(completion):
  732. completeOnMain = Thread.isMainThread
  733. response = completion.response
  734. didComplete.fulfill()
  735. }
  736. }
  737. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  738. // Then
  739. XCTAssertTrue(streamOnMain)
  740. XCTAssertTrue(completeOnMain)
  741. XCTAssertEqual(accumulatedData.count, 1000)
  742. XCTAssertEqual(response?.statusCode, 200)
  743. }
  744. func testThatDataStreamCanBeRedirected() {
  745. // Given
  746. var response: HTTPURLResponse?
  747. var decodedResponse: TestResponse?
  748. var decodingError: AFError?
  749. var streamOnMain = false
  750. var completeOnMain = false
  751. let didRedirect = expectation(description: "stream redirected")
  752. let redirector = Redirector(behavior: .modify { _, _, _ in
  753. didRedirect.fulfill()
  754. return Endpoint.stream(1).urlRequest
  755. })
  756. let didReceive = expectation(description: "stream should receive")
  757. let didComplete = expectation(description: "stream should complete")
  758. // When
  759. AF.streamRequest(.status(301))
  760. .redirect(using: redirector)
  761. .responseStreamDecodable(of: TestResponse.self) { stream in
  762. switch stream.event {
  763. case let .stream(result):
  764. streamOnMain = Thread.isMainThread
  765. switch result {
  766. case let .success(value):
  767. decodedResponse = value
  768. case let .failure(error):
  769. decodingError = error
  770. }
  771. didReceive.fulfill()
  772. case let .complete(completion):
  773. completeOnMain = Thread.isMainThread
  774. response = completion.response
  775. didComplete.fulfill()
  776. }
  777. }
  778. wait(for: [didRedirect, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  779. // Then
  780. XCTAssertTrue(streamOnMain)
  781. XCTAssertTrue(completeOnMain)
  782. XCTAssertNotNil(decodedResponse)
  783. XCTAssertEqual(response?.statusCode, 200)
  784. XCTAssertNil(decodingError)
  785. }
  786. func testThatDataStreamCallsCachedResponseHandler() {
  787. // Given
  788. var response: HTTPURLResponse?
  789. var decodedResponse: TestResponse?
  790. var decodingError: AFError?
  791. var streamOnMain = false
  792. var completeOnMain = false
  793. let cached = expectation(description: "stream called cacher")
  794. let cacher = ResponseCacher(behavior: .modify { _, _ in
  795. cached.fulfill()
  796. return nil
  797. })
  798. let didReceive = expectation(description: "stream did receive")
  799. let didComplete = expectation(description: "stream complete")
  800. // When
  801. AF.streamRequest(.stream(1))
  802. .cacheResponse(using: cacher)
  803. .responseStreamDecodable(of: TestResponse.self) { stream in
  804. switch stream.event {
  805. case let .stream(result):
  806. streamOnMain = Thread.isMainThread
  807. switch result {
  808. case let .success(value):
  809. decodedResponse = value
  810. case let .failure(error):
  811. decodingError = error
  812. }
  813. didReceive.fulfill()
  814. case let .complete(completion):
  815. completeOnMain = Thread.isMainThread
  816. response = completion.response
  817. didComplete.fulfill()
  818. }
  819. }
  820. // willCacheResponse called after receiving all Data, so may be called before or after the asynchronous stream
  821. // handlers.
  822. wait(for: [cached], timeout: timeout)
  823. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  824. // Then
  825. XCTAssertTrue(streamOnMain)
  826. XCTAssertTrue(completeOnMain)
  827. XCTAssertNotNil(decodedResponse)
  828. XCTAssertEqual(response?.statusCode, 200)
  829. XCTAssertNil(decodingError)
  830. }
  831. func testThatDataStreamWorksCorrectlyWithMultipleSerialQueues() {
  832. // Given
  833. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue")
  834. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue")
  835. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  836. var firstResponse: HTTPURLResponse?
  837. var firstDecodedResponse: TestResponse?
  838. var firstDecodingError: AFError?
  839. var firstStreamOnMain = false
  840. var firstCompleteOnMain = false
  841. let firstStream = expectation(description: "first stream")
  842. let firstDidReceive = expectation(description: "first stream did receive")
  843. let firstDidComplete = expectation(description: "first stream complete")
  844. var secondResponse: HTTPURLResponse?
  845. var secondDecodedResponse: TestResponse?
  846. var secondDecodingError: AFError?
  847. var secondStreamOnMain = false
  848. var secondCompleteOnMain = false
  849. let secondStream = expectation(description: "second stream")
  850. let secondDidReceive = expectation(description: "second stream did receive")
  851. let secondDidComplete = expectation(description: "second stream complete")
  852. // When
  853. session.streamRequest(.stream(1))
  854. .responseStreamDecodable(of: TestResponse.self) { stream in
  855. switch stream.event {
  856. case let .stream(result):
  857. firstStreamOnMain = Thread.isMainThread
  858. switch result {
  859. case let .success(value):
  860. firstDecodedResponse = value
  861. case let .failure(error):
  862. firstDecodingError = error
  863. }
  864. firstStream.fulfill()
  865. firstDidReceive.fulfill()
  866. case let .complete(completion):
  867. firstCompleteOnMain = Thread.isMainThread
  868. firstResponse = completion.response
  869. firstDidComplete.fulfill()
  870. }
  871. }
  872. .responseStreamDecodable(of: TestResponse.self) { stream in
  873. switch stream.event {
  874. case let .stream(result):
  875. secondStreamOnMain = Thread.isMainThread
  876. switch result {
  877. case let .success(value):
  878. secondDecodedResponse = value
  879. case let .failure(error):
  880. secondDecodingError = error
  881. }
  882. secondStream.fulfill()
  883. secondDidReceive.fulfill()
  884. case let .complete(completion):
  885. secondCompleteOnMain = Thread.isMainThread
  886. secondResponse = completion.response
  887. secondDidComplete.fulfill()
  888. }
  889. }
  890. wait(for: [firstStream, secondStream], timeout: timeout, enforceOrder: true)
  891. // Cannot test order of completion events, as one may have been enqueued while the other executed directly.
  892. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  893. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  894. // Then
  895. XCTAssertTrue(firstStreamOnMain)
  896. XCTAssertTrue(firstCompleteOnMain)
  897. XCTAssertNotNil(firstDecodedResponse)
  898. XCTAssertEqual(firstResponse?.statusCode, 200)
  899. XCTAssertNil(firstDecodingError)
  900. XCTAssertTrue(secondStreamOnMain)
  901. XCTAssertTrue(secondCompleteOnMain)
  902. XCTAssertNotNil(secondDecodedResponse)
  903. XCTAssertEqual(secondResponse?.statusCode, 200)
  904. XCTAssertNil(secondDecodingError)
  905. }
  906. func testThatDataStreamWorksCorrectlyWithMultipleConcurrentQueues() {
  907. // Given
  908. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue", attributes: .concurrent)
  909. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue", attributes: .concurrent)
  910. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  911. var firstResponse: HTTPURLResponse?
  912. var firstDecodedResponse: TestResponse?
  913. var firstDecodingError: AFError?
  914. var firstStreamOnMain = false
  915. var firstCompleteOnMain = false
  916. let firstDidReceive = expectation(description: "first stream did receive")
  917. let firstDidComplete = expectation(description: "first stream complete")
  918. var secondResponse: HTTPURLResponse?
  919. var secondDecodedResponse: TestResponse?
  920. var secondDecodingError: AFError?
  921. var secondStreamOnMain = false
  922. var secondCompleteOnMain = false
  923. let secondDidReceive = expectation(description: "second stream did receive")
  924. let secondDidComplete = expectation(description: "second stream complete")
  925. // When
  926. session.streamRequest(.stream(1))
  927. .responseStreamDecodable(of: TestResponse.self) { stream in
  928. switch stream.event {
  929. case let .stream(result):
  930. firstStreamOnMain = Thread.isMainThread
  931. switch result {
  932. case let .success(value):
  933. firstDecodedResponse = value
  934. case let .failure(error):
  935. firstDecodingError = error
  936. }
  937. firstDidReceive.fulfill()
  938. case let .complete(completion):
  939. firstCompleteOnMain = Thread.isMainThread
  940. firstResponse = completion.response
  941. firstDidComplete.fulfill()
  942. }
  943. }
  944. .responseStreamDecodable(of: TestResponse.self) { stream in
  945. switch stream.event {
  946. case let .stream(result):
  947. secondStreamOnMain = Thread.isMainThread
  948. switch result {
  949. case let .success(value):
  950. secondDecodedResponse = value
  951. case let .failure(error):
  952. secondDecodingError = error
  953. }
  954. secondDidReceive.fulfill()
  955. case let .complete(completion):
  956. secondCompleteOnMain = Thread.isMainThread
  957. secondResponse = completion.response
  958. secondDidComplete.fulfill()
  959. }
  960. }
  961. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  962. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  963. // Then
  964. XCTAssertTrue(firstStreamOnMain)
  965. XCTAssertTrue(firstCompleteOnMain)
  966. XCTAssertNotNil(firstDecodedResponse)
  967. XCTAssertEqual(firstResponse?.statusCode, 200)
  968. XCTAssertNil(firstDecodingError)
  969. XCTAssertTrue(secondStreamOnMain)
  970. XCTAssertTrue(secondCompleteOnMain)
  971. XCTAssertNotNil(secondDecodedResponse)
  972. XCTAssertEqual(secondResponse?.statusCode, 200)
  973. XCTAssertNil(secondDecodingError)
  974. }
  975. func testThatDataStreamCanAuthenticate() {
  976. // Given
  977. let user = "userstream", password = "password"
  978. var response: HTTPURLResponse?
  979. var streamOnMain = false
  980. var completeOnMain = false
  981. let didReceive = expectation(description: "stream did receive")
  982. let didComplete = expectation(description: "stream complete")
  983. // When
  984. AF.streamRequest(.basicAuth(forUser: user, password: password))
  985. .authenticate(username: user, password: password)
  986. .responseStream { stream in
  987. switch stream.event {
  988. case .stream:
  989. streamOnMain = Thread.isMainThread
  990. didReceive.fulfill()
  991. case let .complete(completion):
  992. completeOnMain = Thread.isMainThread
  993. response = completion.response
  994. didComplete.fulfill()
  995. }
  996. }
  997. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  998. // Then
  999. XCTAssertTrue(streamOnMain)
  1000. XCTAssertTrue(completeOnMain)
  1001. XCTAssertEqual(response?.statusCode, 200)
  1002. }
  1003. }
  1004. final class DataStreamLifetimeEvents: BaseTestCase {
  1005. func testThatDataStreamRequestHasAppropriateLifetimeEvents() {
  1006. // Given
  1007. final class Monitor: EventMonitor {
  1008. var called: (() -> Void)?
  1009. func request<Value>(_ request: DataStreamRequest, didParseStream result: Result<Value, AFError>) {
  1010. called?()
  1011. }
  1012. }
  1013. let eventMonitor = ClosureEventMonitor()
  1014. let parseMonitor = Monitor()
  1015. let session = Session(eventMonitors: [eventMonitor, parseMonitor])
  1016. // Disable event test until Firewalk supports HTTPS.
  1017. // let didReceiveChallenge = expectation(description: "didReceiveChallenge should fire")
  1018. let taskDidFinishCollecting = expectation(description: "taskDidFinishCollecting should fire")
  1019. let didReceiveData = expectation(description: "didReceiveData should fire")
  1020. let willCacheResponse = expectation(description: "willCacheResponse should fire")
  1021. let didCreateURLRequest = expectation(description: "didCreateInitialURLRequest should fire")
  1022. let didCreateTask = expectation(description: "didCreateTask should fire")
  1023. let didGatherMetrics = expectation(description: "didGatherMetrics should fire")
  1024. let didComplete = expectation(description: "didComplete should fire")
  1025. let didFinish = expectation(description: "didFinish should fire")
  1026. let didResume = expectation(description: "didResume should fire")
  1027. let didResumeTask = expectation(description: "didResumeTask should fire")
  1028. let didValidate = expectation(description: "didValidateRequest should fire")
  1029. didValidate.expectedFulfillmentCount = 2
  1030. let didParse = expectation(description: "streamDidParse should fire")
  1031. let didReceive = expectation(description: "stream should receive")
  1032. let didCompleteStream = expectation(description: "stream should complete")
  1033. var dataReceived = false
  1034. // Disable event test until Firewalk supports HTTPS.
  1035. // eventMonitor.taskDidReceiveChallenge = { _, _, _ in didReceiveChallenge.fulfill() }
  1036. eventMonitor.taskDidFinishCollectingMetrics = { _, _, _ in taskDidFinishCollecting.fulfill() }
  1037. eventMonitor.dataTaskDidReceiveData = { _, _, _ in
  1038. guard !dataReceived else { return }
  1039. // Data may be received many times, fulfill only once.
  1040. dataReceived = true
  1041. didReceiveData.fulfill()
  1042. }
  1043. eventMonitor.dataTaskWillCacheResponse = { _, _, _ in willCacheResponse.fulfill() }
  1044. eventMonitor.requestDidCreateInitialURLRequest = { _, _ in didCreateURLRequest.fulfill() }
  1045. eventMonitor.requestDidCreateTask = { _, _ in didCreateTask.fulfill() }
  1046. eventMonitor.requestDidGatherMetrics = { _, _ in didGatherMetrics.fulfill() }
  1047. eventMonitor.requestDidCompleteTaskWithError = { _, _, _ in didComplete.fulfill() }
  1048. eventMonitor.requestDidFinish = { _ in didFinish.fulfill() }
  1049. eventMonitor.requestDidResume = { _ in didResume.fulfill() }
  1050. eventMonitor.requestDidResumeTask = { _, _ in didResumeTask.fulfill() }
  1051. eventMonitor.requestDidValidateRequestResponseWithResult = { _, _, _, _ in didValidate.fulfill() }
  1052. parseMonitor.called = { didParse.fulfill() }
  1053. // When
  1054. let request = session.streamRequest(.stream(1))
  1055. .validate()
  1056. .responseStreamDecodable(of: TestResponse.self) { stream in
  1057. switch stream.event {
  1058. case .stream:
  1059. didReceive.fulfill()
  1060. case .complete:
  1061. didCompleteStream.fulfill()
  1062. }
  1063. }
  1064. waitForExpectations(timeout: timeout)
  1065. // Then
  1066. XCTAssertEqual(request.state, .finished)
  1067. }
  1068. }