DataStreamTests.swift 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954
  1. //
  2. // DataStreamTests.swift
  3. //
  4. // Copyright (c) 2020 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. import Alamofire
  25. import XCTest
  26. final class DataStreamTests: BaseTestCase {
  27. func testThatDataCanBeStreamedOnMainQueue() {
  28. // Given
  29. let expectedSize = 1
  30. var accumulatedData = Data()
  31. var response: HTTPURLResponse?
  32. var streamOnMain = false
  33. var completeOnMain = false
  34. let didReceive = expectation(description: "stream should receive once")
  35. let didComplete = expectation(description: "stream should complete")
  36. // When
  37. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "bytes/\(expectedSize)")).responseStream { stream in
  38. switch stream.event {
  39. case let .stream(result):
  40. streamOnMain = Thread.isMainThread
  41. switch result {
  42. case let .success(data):
  43. accumulatedData.append(data)
  44. }
  45. didReceive.fulfill()
  46. case let .complete(completion):
  47. completeOnMain = Thread.isMainThread
  48. response = completion.response
  49. didComplete.fulfill()
  50. }
  51. }
  52. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  53. // Then
  54. XCTAssertEqual(response?.statusCode, 200)
  55. XCTAssertEqual(accumulatedData.count, expectedSize)
  56. XCTAssertTrue(streamOnMain)
  57. XCTAssertTrue(completeOnMain)
  58. }
  59. func testThatDataCanBeStreamedFromURL() {
  60. // Given
  61. let expectedSize = 1
  62. var accumulatedData = Data()
  63. var response: HTTPURLResponse?
  64. var streamOnMain = false
  65. var completeOnMain = false
  66. let didReceive = expectation(description: "stream should receive")
  67. let didComplete = expectation(description: "stream should complete")
  68. // When
  69. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "/bytes/\(expectedSize)")).responseStream { stream in
  70. switch stream.event {
  71. case let .stream(result):
  72. streamOnMain = Thread.isMainThread
  73. switch result {
  74. case let .success(data):
  75. accumulatedData.append(data)
  76. }
  77. didReceive.fulfill()
  78. case let .complete(completion):
  79. completeOnMain = Thread.isMainThread
  80. response = completion.response
  81. didComplete.fulfill()
  82. }
  83. }
  84. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  85. // Then
  86. XCTAssertEqual(response?.statusCode, 200)
  87. XCTAssertEqual(accumulatedData.count, expectedSize)
  88. XCTAssertTrue(streamOnMain)
  89. XCTAssertTrue(completeOnMain)
  90. }
  91. func testThatDataCanBeStreamedManyTimes() {
  92. // Given
  93. let expectedSize = 1
  94. var firstAccumulatedData = Data()
  95. var firstResponse: HTTPURLResponse?
  96. var firstStreamOnMain = false
  97. var firstCompleteOnMain = false
  98. let firstReceive = expectation(description: "first stream should receive")
  99. let firstCompletion = expectation(description: "first stream should complete")
  100. var secondAccumulatedData = Data()
  101. var secondResponse: HTTPURLResponse?
  102. var secondStreamOnMain = false
  103. var secondCompleteOnMain = false
  104. let secondReceive = expectation(description: "second stream should receive")
  105. let secondCompletion = expectation(description: "second stream should complete")
  106. // When
  107. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "bytes/\(expectedSize)"))
  108. .responseStream { stream in
  109. switch stream.event {
  110. case let .stream(result):
  111. firstStreamOnMain = Thread.isMainThread
  112. switch result {
  113. case let .success(data):
  114. firstAccumulatedData.append(data)
  115. }
  116. firstReceive.fulfill()
  117. case let .complete(completion):
  118. firstCompleteOnMain = Thread.isMainThread
  119. firstResponse = completion.response
  120. firstCompletion.fulfill()
  121. }
  122. }
  123. .responseStream { stream in
  124. switch stream.event {
  125. case let .stream(result):
  126. secondStreamOnMain = Thread.isMainThread
  127. switch result {
  128. case let .success(data):
  129. secondAccumulatedData.append(data)
  130. }
  131. secondReceive.fulfill()
  132. case let .complete(completion):
  133. secondCompleteOnMain = Thread.isMainThread
  134. secondResponse = completion.response
  135. secondCompletion.fulfill()
  136. }
  137. }
  138. wait(for: [firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  139. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  140. // Then
  141. XCTAssertTrue(firstStreamOnMain)
  142. XCTAssertTrue(firstCompleteOnMain)
  143. XCTAssertEqual(firstResponse?.statusCode, 200)
  144. XCTAssertEqual(firstAccumulatedData.count, expectedSize)
  145. XCTAssertTrue(secondStreamOnMain)
  146. XCTAssertTrue(secondCompleteOnMain)
  147. XCTAssertEqual(secondResponse?.statusCode, 200)
  148. XCTAssertEqual(secondAccumulatedData.count, expectedSize)
  149. }
  150. func testThatDataCanBeStreamedAndDecodedAtTheSameTime() {
  151. // Given
  152. var firstAccumulatedData = Data()
  153. var firstResponse: HTTPURLResponse?
  154. var firstStreamOnMain = false
  155. var firstCompleteOnMain = false
  156. let firstReceive = expectation(description: "first stream should receive")
  157. let firstCompletion = expectation(description: "first stream should complete")
  158. var decodedResponse: HTTPBinResponse?
  159. var decodingError: AFError?
  160. var secondResponse: HTTPURLResponse?
  161. var secondStreamOnMain = false
  162. var secondCompleteOnMain = false
  163. let secondReceive = expectation(description: "second stream should receive")
  164. let secondCompletion = expectation(description: "second stream should complete")
  165. // When
  166. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  167. .responseStream { stream in
  168. switch stream.event {
  169. case let .stream(result):
  170. firstStreamOnMain = Thread.isMainThread
  171. switch result {
  172. case let .success(data):
  173. firstAccumulatedData.append(data)
  174. }
  175. firstReceive.fulfill()
  176. case let .complete(completion):
  177. firstCompleteOnMain = Thread.isMainThread
  178. firstResponse = completion.response
  179. firstCompletion.fulfill()
  180. }
  181. }
  182. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  183. switch stream.event {
  184. case let .stream(result):
  185. secondStreamOnMain = Thread.isMainThread
  186. switch result {
  187. case let .success(value):
  188. decodedResponse = value
  189. case let .failure(error):
  190. decodingError = error
  191. }
  192. secondReceive.fulfill()
  193. case let .complete(completion):
  194. secondCompleteOnMain = Thread.isMainThread
  195. secondResponse = completion.response
  196. secondCompletion.fulfill()
  197. }
  198. }
  199. wait(for: [firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
  200. wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
  201. // Then
  202. XCTAssertTrue(firstStreamOnMain)
  203. XCTAssertTrue(firstCompleteOnMain)
  204. XCTAssertEqual(firstResponse?.statusCode, 200)
  205. XCTAssertTrue(!firstAccumulatedData.isEmpty)
  206. XCTAssertTrue(secondStreamOnMain)
  207. XCTAssertTrue(secondCompleteOnMain)
  208. XCTAssertEqual(secondResponse?.statusCode, 200)
  209. XCTAssertNotNil(decodedResponse)
  210. XCTAssertNil(decodingError)
  211. }
  212. func testThatDataStreamRequestProducesWorkingInputStream() {
  213. // Given
  214. let expect = expectation(description: "stream complete")
  215. // When
  216. let stream = AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "xml",
  217. headers: [.contentType("application/xml")]))
  218. .responseStream { stream in
  219. switch stream.event {
  220. case .complete:
  221. expect.fulfill()
  222. default: break
  223. }
  224. }.asInputStream()
  225. waitForExpectations(timeout: timeout)
  226. // Then
  227. let parser = XMLParser(stream: stream!)
  228. let parsed = parser.parse()
  229. XCTAssertTrue(parsed)
  230. XCTAssertNil(parser.parserError)
  231. }
  232. func testThatDataStreamCanBeManuallyResumed() {
  233. // Given
  234. let session = Session(startRequestsImmediately: false)
  235. var response: HTTPURLResponse?
  236. var streamOnMain = false
  237. var completeOnMain = false
  238. let didReceive = expectation(description: "stream did receive")
  239. let didComplete = expectation(description: "stream complete")
  240. // When
  241. session.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  242. .responseStream { stream in
  243. switch stream.event {
  244. case .stream:
  245. streamOnMain = Thread.isMainThread
  246. didReceive.fulfill()
  247. case let .complete(completion):
  248. completeOnMain = Thread.isMainThread
  249. response = completion.response
  250. didComplete.fulfill()
  251. }
  252. }.resume()
  253. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  254. // Then
  255. XCTAssertTrue(streamOnMain)
  256. XCTAssertTrue(completeOnMain)
  257. XCTAssertEqual(response?.statusCode, 200)
  258. }
  259. func testThatDataStreamIsAutomaticallyCanceledOnStreamErrorWhenEnabled() {
  260. var response: HTTPURLResponse?
  261. var complete: DataStreamRequest.Completion?
  262. let didComplete = expectation(description: "stream complete")
  263. // When
  264. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "bytes/50"), automaticallyCancelOnStreamError: true)
  265. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  266. switch stream.event {
  267. case let .complete(completion):
  268. complete = completion
  269. response = completion.response
  270. didComplete.fulfill()
  271. default: break
  272. }
  273. }
  274. waitForExpectations(timeout: timeout)
  275. // Then
  276. XCTAssertEqual(response?.statusCode, 200)
  277. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == true,
  278. "error is not explicitly cancelled but \(complete?.error?.localizedDescription ?? "None")")
  279. }
  280. func testThatDataStreamIsAutomaticallyCanceledOnStreamClosureError() {
  281. // Given
  282. enum LocalError: Error { case failed }
  283. var response: HTTPURLResponse?
  284. var complete: DataStreamRequest.Completion?
  285. let didReceive = expectation(description: "stream did receieve")
  286. let didComplete = expectation(description: "stream complete")
  287. // When
  288. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "bytes/50"))
  289. .responseStream { stream in
  290. switch stream.event {
  291. case .stream:
  292. didReceive.fulfill()
  293. throw LocalError.failed
  294. case let .complete(completion):
  295. complete = completion
  296. response = completion.response
  297. didComplete.fulfill()
  298. }
  299. }
  300. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  301. // Then
  302. XCTAssertEqual(response?.statusCode, 200)
  303. XCTAssertTrue(complete?.error?.isExplicitlyCancelledError == false)
  304. }
  305. func testThatDataStreamCanBeCancelledInClosure() {
  306. // Given
  307. // Use .main so that completion can't beat cancellation.
  308. let session = Session(rootQueue: .main)
  309. var completion: DataStreamRequest.Completion?
  310. let didReceive = expectation(description: "stream should receive")
  311. let didComplete = expectation(description: "stream should complete")
  312. // When
  313. session.streamRequest(URLRequest.makeHTTPBinRequest(path: "/bytes/1")).responseStream { stream in
  314. switch stream.event {
  315. case .stream:
  316. didReceive.fulfill()
  317. stream.cancel()
  318. case .complete:
  319. completion = stream.completion
  320. didComplete.fulfill()
  321. }
  322. }
  323. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  324. // Then
  325. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  326. """
  327. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  328. response is: \(completion?.response?.description ?? "none").
  329. """)
  330. }
  331. func testThatDataStreamCanBeCancelledByToken() {
  332. // Given
  333. // Use .main so that completion can't beat cancellation.
  334. let session = Session(rootQueue: .main)
  335. var completion: DataStreamRequest.Completion?
  336. let didReceive = expectation(description: "stream should receive")
  337. let didComplete = expectation(description: "stream should complete")
  338. // When
  339. session.streamRequest(URLRequest.makeHTTPBinRequest(path: "/bytes/1")).responseStream { stream in
  340. switch stream.event {
  341. case .stream:
  342. didReceive.fulfill()
  343. stream.token.cancel()
  344. case .complete:
  345. completion = stream.completion
  346. didComplete.fulfill()
  347. }
  348. }
  349. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  350. // Then
  351. XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true,
  352. """
  353. error is not explicitly cancelled, instead: \(completion?.error?.localizedDescription ?? "none").
  354. response is: \(completion?.response?.description ?? "none").
  355. """)
  356. }
  357. }
  358. // MARK: - Serialization Tests
  359. final class DataStreamSerializationTests: BaseTestCase {
  360. func testThatDataStreamsCanBeAString() {
  361. // Given
  362. var responseString: String?
  363. var streamOnMain = false
  364. var completeOnMain = false
  365. var response: HTTPURLResponse?
  366. let didStream = expectation(description: "did stream")
  367. let didComplete = expectation(description: "stream complete")
  368. // When
  369. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  370. .responseStreamString { stream in
  371. switch stream.event {
  372. case let .stream(result):
  373. streamOnMain = Thread.isMainThread
  374. switch result {
  375. case let .success(string):
  376. responseString = string
  377. }
  378. didStream.fulfill()
  379. case let .complete(completion):
  380. completeOnMain = Thread.isMainThread
  381. response = completion.response
  382. didComplete.fulfill()
  383. }
  384. }
  385. wait(for: [didStream, didComplete], timeout: timeout, enforceOrder: true)
  386. // Then
  387. XCTAssertTrue(streamOnMain)
  388. XCTAssertTrue(completeOnMain)
  389. XCTAssertNotNil(responseString)
  390. XCTAssertEqual(response?.statusCode, 200)
  391. }
  392. func testThatDataStreamsCanBeDecoded() {
  393. // Given
  394. var response: HTTPBinResponse?
  395. var httpResponse: HTTPURLResponse?
  396. var decodingError: AFError?
  397. var streamOnMain = false
  398. var completeOnMain = false
  399. let didReceive = expectation(description: "stream did receive")
  400. let didComplete = expectation(description: "stream complete")
  401. // When
  402. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  403. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  404. switch stream.event {
  405. case let .stream(result):
  406. streamOnMain = Thread.isMainThread
  407. switch result {
  408. case let .success(value):
  409. response = value
  410. case let .failure(error):
  411. decodingError = error
  412. }
  413. didReceive.fulfill()
  414. case let .complete(completion):
  415. completeOnMain = Thread.isMainThread
  416. httpResponse = completion.response
  417. didComplete.fulfill()
  418. }
  419. }
  420. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  421. // Then
  422. XCTAssertTrue(streamOnMain)
  423. XCTAssertTrue(completeOnMain)
  424. XCTAssertNotNil(response)
  425. XCTAssertEqual(httpResponse?.statusCode, 200)
  426. XCTAssertNil(decodingError)
  427. }
  428. func testThatDataStreamSerializerCanBeUsedDirectly() {
  429. // Given
  430. var response: HTTPURLResponse?
  431. var decodedResponse: HTTPBinResponse?
  432. var decodingError: AFError?
  433. var streamOnMain = false
  434. var completeOnMain = false
  435. let serializer = DecodableStreamSerializer<HTTPBinResponse>()
  436. let didReceive = expectation(description: "stream did receive")
  437. let didComplete = expectation(description: "stream complete")
  438. // When
  439. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  440. .responseStream(using: serializer) { stream in
  441. switch stream.event {
  442. case let .stream(result):
  443. streamOnMain = Thread.isMainThread
  444. switch result {
  445. case let .success(value):
  446. decodedResponse = value
  447. case let .failure(error):
  448. decodingError = error
  449. }
  450. didReceive.fulfill()
  451. case let .complete(completion):
  452. completeOnMain = Thread.isMainThread
  453. response = completion.response
  454. didComplete.fulfill()
  455. }
  456. }
  457. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  458. // Then
  459. XCTAssertTrue(streamOnMain)
  460. XCTAssertTrue(completeOnMain)
  461. XCTAssertNotNil(decodedResponse)
  462. XCTAssertEqual(response?.statusCode, 200)
  463. XCTAssertNil(decodingError)
  464. }
  465. }
  466. // MARK: - Integration Tests
  467. final class DataStreamIntegrationTests: BaseTestCase {
  468. func testThatDataStreamCanFailValidation() {
  469. // Given
  470. let request = URLRequest.makeHTTPBinRequest(path: "status/401")
  471. var dataSeen = false
  472. var error: AFError?
  473. let didComplete = expectation(description: "stream should complete")
  474. // When
  475. AF.streamRequest(request).validate().responseStream { stream in
  476. switch stream.event {
  477. case .stream:
  478. dataSeen = true
  479. case let .complete(completion):
  480. error = completion.error
  481. didComplete.fulfill()
  482. }
  483. }
  484. waitForExpectations(timeout: timeout)
  485. // Then
  486. XCTAssertNotNil(error, "error should not be nil")
  487. XCTAssertTrue(error?.isResponseValidationError == true, "error should be response validation error")
  488. XCTAssertFalse(dataSeen, "no data should be seen")
  489. }
  490. func testThatDataStreamsCanBeRetried() {
  491. // Given
  492. final class GoodRetry: RequestInterceptor {
  493. var hasRetried = false
  494. func adapt(_ urlRequest: URLRequest, for session: Session, completion: @escaping (Result<URLRequest, Error>) -> Void) {
  495. if hasRetried {
  496. completion(.success(URLRequest.makeHTTPBinRequest(path: "bytes/1000")))
  497. } else {
  498. completion(.success(urlRequest))
  499. }
  500. }
  501. func retry(_ request: Request, for session: Session, dueTo error: Error, completion: @escaping (RetryResult) -> Void) {
  502. hasRetried = true
  503. completion(.retry)
  504. }
  505. }
  506. let session = Session(interceptor: GoodRetry())
  507. var accumulatedData = Data()
  508. var streamOnMain = false
  509. var completeOnMain = false
  510. var response: HTTPURLResponse?
  511. let didReceive = expectation(description: "stream should receive")
  512. let didComplete = expectation(description: "stream should complete")
  513. // When
  514. session.streamRequest(URLRequest.makeHTTPBinRequest(path: "status/401"))
  515. .validate()
  516. .responseStream { stream in
  517. switch stream.event {
  518. case let .stream(result):
  519. streamOnMain = Thread.isMainThread
  520. switch result {
  521. case let .success(data):
  522. accumulatedData.append(data)
  523. }
  524. didReceive.fulfill()
  525. case let .complete(completion):
  526. completeOnMain = Thread.isMainThread
  527. response = completion.response
  528. didComplete.fulfill()
  529. }
  530. }
  531. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  532. // Then
  533. XCTAssertTrue(streamOnMain)
  534. XCTAssertTrue(completeOnMain)
  535. XCTAssertEqual(accumulatedData.count, 1000)
  536. XCTAssertEqual(response?.statusCode, 200)
  537. }
  538. func testThatDataStreamCanBeRedirected() {
  539. // Given
  540. var response: HTTPURLResponse?
  541. var decodedResponse: HTTPBinResponse?
  542. var decodingError: AFError?
  543. var streamOnMain = false
  544. var completeOnMain = false
  545. let didRedirect = expectation(description: "stream redirected")
  546. let redirector = Redirector(behavior: .modify { _, _, _ in
  547. didRedirect.fulfill()
  548. return URLRequest.makeHTTPBinRequest(path: "stream/1")
  549. })
  550. let didReceive = expectation(description: "stream should receive")
  551. let didComplete = expectation(description: "stream should complete")
  552. // When
  553. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "status/301"))
  554. .redirect(using: redirector)
  555. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  556. switch stream.event {
  557. case let .stream(result):
  558. streamOnMain = Thread.isMainThread
  559. switch result {
  560. case let .success(value):
  561. decodedResponse = value
  562. case let .failure(error):
  563. decodingError = error
  564. }
  565. didReceive.fulfill()
  566. case let .complete(completion):
  567. completeOnMain = Thread.isMainThread
  568. response = completion.response
  569. didComplete.fulfill()
  570. }
  571. }
  572. wait(for: [didRedirect, didReceive, didComplete], timeout: timeout, enforceOrder: true)
  573. // Then
  574. XCTAssertTrue(streamOnMain)
  575. XCTAssertTrue(completeOnMain)
  576. XCTAssertNotNil(decodedResponse)
  577. XCTAssertEqual(response?.statusCode, 200)
  578. XCTAssertNil(decodingError)
  579. }
  580. func testThatDataStreamCallsCachedResponseHandler() {
  581. // Given
  582. var response: HTTPURLResponse?
  583. var decodedResponse: HTTPBinResponse?
  584. var decodingError: AFError?
  585. var streamOnMain = false
  586. var completeOnMain = false
  587. let cached = expectation(description: "stream called cacher")
  588. let cacher = ResponseCacher(behavior: .modify { _, _ in
  589. cached.fulfill()
  590. return nil
  591. })
  592. let didReceive = expectation(description: "stream did receive")
  593. let didComplete = expectation(description: "stream complete")
  594. // When
  595. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  596. .cacheResponse(using: cacher)
  597. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  598. switch stream.event {
  599. case let .stream(result):
  600. streamOnMain = Thread.isMainThread
  601. switch result {
  602. case let .success(value):
  603. decodedResponse = value
  604. case let .failure(error):
  605. decodingError = error
  606. }
  607. didReceive.fulfill()
  608. case let .complete(completion):
  609. completeOnMain = Thread.isMainThread
  610. response = completion.response
  611. didComplete.fulfill()
  612. }
  613. }
  614. // willCacheResponse called after receiving all Data, so may be called before or after the asynchronous stream
  615. // handlers.
  616. wait(for: [cached], timeout: timeout)
  617. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  618. // Then
  619. XCTAssertTrue(streamOnMain)
  620. XCTAssertTrue(completeOnMain)
  621. XCTAssertNotNil(decodedResponse)
  622. XCTAssertEqual(response?.statusCode, 200)
  623. XCTAssertNil(decodingError)
  624. }
  625. func testThatDataStreamWorksCorrectlyWithMultipleSerialQueues() {
  626. // Given
  627. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue")
  628. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue")
  629. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  630. var firstResponse: HTTPURLResponse?
  631. var firstDecodedResponse: HTTPBinResponse?
  632. var firstDecodingError: AFError?
  633. var firstStreamOnMain = false
  634. var firstCompleteOnMain = false
  635. let firstStream = expectation(description: "first stream")
  636. let firstDidReceive = expectation(description: "first stream did receive")
  637. let firstDidComplete = expectation(description: "first stream complete")
  638. var secondResponse: HTTPURLResponse?
  639. var secondDecodedResponse: HTTPBinResponse?
  640. var secondDecodingError: AFError?
  641. var secondStreamOnMain = false
  642. var secondCompleteOnMain = false
  643. let secondStream = expectation(description: "second stream")
  644. let secondDidReceive = expectation(description: "second stream did receive")
  645. let secondDidComplete = expectation(description: "second stream complete")
  646. // When
  647. session.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  648. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  649. switch stream.event {
  650. case let .stream(result):
  651. firstStreamOnMain = Thread.isMainThread
  652. switch result {
  653. case let .success(value):
  654. firstDecodedResponse = value
  655. case let .failure(error):
  656. firstDecodingError = error
  657. }
  658. firstStream.fulfill()
  659. firstDidReceive.fulfill()
  660. case let .complete(completion):
  661. firstCompleteOnMain = Thread.isMainThread
  662. firstResponse = completion.response
  663. firstDidComplete.fulfill()
  664. }
  665. }
  666. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  667. switch stream.event {
  668. case let .stream(result):
  669. secondStreamOnMain = Thread.isMainThread
  670. switch result {
  671. case let .success(value):
  672. secondDecodedResponse = value
  673. case let .failure(error):
  674. secondDecodingError = error
  675. }
  676. secondStream.fulfill()
  677. secondDidReceive.fulfill()
  678. case let .complete(completion):
  679. secondCompleteOnMain = Thread.isMainThread
  680. secondResponse = completion.response
  681. secondDidComplete.fulfill()
  682. }
  683. }
  684. wait(for: [firstStream, secondStream], timeout: timeout, enforceOrder: true)
  685. // Cannot test order of completion events, as one may have been enqueued while the other executed directly.
  686. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  687. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  688. // Then
  689. XCTAssertTrue(firstStreamOnMain)
  690. XCTAssertTrue(firstCompleteOnMain)
  691. XCTAssertNotNil(firstDecodedResponse)
  692. XCTAssertEqual(firstResponse?.statusCode, 200)
  693. XCTAssertNil(firstDecodingError)
  694. XCTAssertTrue(secondStreamOnMain)
  695. XCTAssertTrue(secondCompleteOnMain)
  696. XCTAssertNotNil(secondDecodedResponse)
  697. XCTAssertEqual(secondResponse?.statusCode, 200)
  698. XCTAssertNil(secondDecodingError)
  699. }
  700. func testThatDataStreamWorksCorrectlyWithMultipleConcurrentQueues() {
  701. // Given
  702. let requestQueue = DispatchQueue(label: "org.alamofire.testRequestQueue", attributes: .concurrent)
  703. let serializationQueue = DispatchQueue(label: "org.alamofire.testSerializationQueue", attributes: .concurrent)
  704. let session = Session(requestQueue: requestQueue, serializationQueue: serializationQueue)
  705. var firstResponse: HTTPURLResponse?
  706. var firstDecodedResponse: HTTPBinResponse?
  707. var firstDecodingError: AFError?
  708. var firstStreamOnMain = false
  709. var firstCompleteOnMain = false
  710. let firstDidReceive = expectation(description: "first stream did receive")
  711. let firstDidComplete = expectation(description: "first stream complete")
  712. var secondResponse: HTTPURLResponse?
  713. var secondDecodedResponse: HTTPBinResponse?
  714. var secondDecodingError: AFError?
  715. var secondStreamOnMain = false
  716. var secondCompleteOnMain = false
  717. let secondDidReceive = expectation(description: "second stream did receive")
  718. let secondDidComplete = expectation(description: "second stream complete")
  719. // When
  720. session.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  721. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  722. switch stream.event {
  723. case let .stream(result):
  724. firstStreamOnMain = Thread.isMainThread
  725. switch result {
  726. case let .success(value):
  727. firstDecodedResponse = value
  728. case let .failure(error):
  729. firstDecodingError = error
  730. }
  731. firstDidReceive.fulfill()
  732. case let .complete(completion):
  733. firstCompleteOnMain = Thread.isMainThread
  734. firstResponse = completion.response
  735. firstDidComplete.fulfill()
  736. }
  737. }
  738. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  739. switch stream.event {
  740. case let .stream(result):
  741. secondStreamOnMain = Thread.isMainThread
  742. switch result {
  743. case let .success(value):
  744. secondDecodedResponse = value
  745. case let .failure(error):
  746. secondDecodingError = error
  747. }
  748. secondDidReceive.fulfill()
  749. case let .complete(completion):
  750. secondCompleteOnMain = Thread.isMainThread
  751. secondResponse = completion.response
  752. secondDidComplete.fulfill()
  753. }
  754. }
  755. wait(for: [firstDidReceive, firstDidComplete], timeout: timeout, enforceOrder: true)
  756. wait(for: [secondDidReceive, secondDidComplete], timeout: timeout, enforceOrder: true)
  757. // Then
  758. XCTAssertTrue(firstStreamOnMain)
  759. XCTAssertTrue(firstCompleteOnMain)
  760. XCTAssertNotNil(firstDecodedResponse)
  761. XCTAssertEqual(firstResponse?.statusCode, 200)
  762. XCTAssertNil(firstDecodingError)
  763. XCTAssertTrue(secondStreamOnMain)
  764. XCTAssertTrue(secondCompleteOnMain)
  765. XCTAssertNotNil(secondDecodedResponse)
  766. XCTAssertEqual(secondResponse?.statusCode, 200)
  767. XCTAssertNil(secondDecodingError)
  768. }
  769. func testThatDataStreamCanAuthenticate() {
  770. // Given
  771. var response: HTTPURLResponse?
  772. var streamOnMain = false
  773. var completeOnMain = false
  774. let didReceive = expectation(description: "stream did receive")
  775. let didComplete = expectation(description: "stream complete")
  776. // When
  777. AF.streamRequest(URLRequest.makeHTTPBinRequest(path: "basic-auth/username/password"))
  778. .authenticate(username: "username", password: "password")
  779. .responseStream { stream in
  780. switch stream.event {
  781. case .stream:
  782. streamOnMain = Thread.isMainThread
  783. didReceive.fulfill()
  784. case let .complete(completion):
  785. completeOnMain = Thread.isMainThread
  786. response = completion.response
  787. didComplete.fulfill()
  788. }
  789. }
  790. wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
  791. // Then
  792. XCTAssertTrue(streamOnMain)
  793. XCTAssertTrue(completeOnMain)
  794. XCTAssertEqual(response?.statusCode, 200)
  795. }
  796. }
  797. final class DataStreamLifetimeEvents: BaseTestCase {
  798. func testThatDataStreamRequestHasAppropriateLifetimeEvents() {
  799. // Given
  800. final class Monitor: EventMonitor {
  801. var called: (() -> Void)?
  802. func request<Value>(_ request: DataStreamRequest, didParseStream result: Result<Value, AFError>) {
  803. called?()
  804. }
  805. }
  806. let eventMonitor = ClosureEventMonitor()
  807. let parseMonitor = Monitor()
  808. let session = Session(eventMonitors: [eventMonitor, parseMonitor])
  809. let didReceiveChallenge = expectation(description: "didReceiveChallenge should fire")
  810. let taskDidFinishCollecting = expectation(description: "taskDidFinishCollecting should fire")
  811. let didReceiveData = expectation(description: "didReceiveData should fire")
  812. let willCacheResponse = expectation(description: "willCacheResponse should fire")
  813. let didCreateURLRequest = expectation(description: "didCreateInitialURLRequest should fire")
  814. let didCreateTask = expectation(description: "didCreateTask should fire")
  815. let didGatherMetrics = expectation(description: "didGatherMetrics should fire")
  816. let didComplete = expectation(description: "didComplete should fire")
  817. let didFinish = expectation(description: "didFinish should fire")
  818. let didResume = expectation(description: "didResume should fire")
  819. let didResumeTask = expectation(description: "didResumeTask should fire")
  820. let didValidate = expectation(description: "didValidateRequest should fire")
  821. didValidate.expectedFulfillmentCount = 2
  822. let didParse = expectation(description: "streamDidParse should fire")
  823. let didReceive = expectation(description: "stream should receive")
  824. let didCompleteStream = expectation(description: "stream should complete")
  825. var dataReceived = false
  826. eventMonitor.taskDidReceiveChallenge = { _, _, _ in didReceiveChallenge.fulfill() }
  827. eventMonitor.taskDidFinishCollectingMetrics = { _, _, _ in taskDidFinishCollecting.fulfill() }
  828. eventMonitor.dataTaskDidReceiveData = { _, _, _ in
  829. guard !dataReceived else { return }
  830. // Data may be received many times, fulfill only once.
  831. dataReceived = true
  832. didReceiveData.fulfill()
  833. }
  834. eventMonitor.dataTaskWillCacheResponse = { _, _, _ in willCacheResponse.fulfill() }
  835. eventMonitor.requestDidCreateInitialURLRequest = { _, _ in didCreateURLRequest.fulfill() }
  836. eventMonitor.requestDidCreateTask = { _, _ in didCreateTask.fulfill() }
  837. eventMonitor.requestDidGatherMetrics = { _, _ in didGatherMetrics.fulfill() }
  838. eventMonitor.requestDidCompleteTaskWithError = { _, _, _ in didComplete.fulfill() }
  839. eventMonitor.requestDidFinish = { _ in didFinish.fulfill() }
  840. eventMonitor.requestDidResume = { _ in didResume.fulfill() }
  841. eventMonitor.requestDidResumeTask = { _, _ in didResumeTask.fulfill() }
  842. eventMonitor.requestDidValidateRequestResponseWithResult = { _, _, _, _ in didValidate.fulfill() }
  843. parseMonitor.called = { didParse.fulfill() }
  844. // When
  845. let request = session.streamRequest(URLRequest.makeHTTPBinRequest(path: "stream/1"))
  846. .validate()
  847. .responseStreamDecodable(of: HTTPBinResponse.self) { stream in
  848. switch stream.event {
  849. case .stream:
  850. didReceive.fulfill()
  851. case .complete:
  852. didCompleteStream.fulfill()
  853. }
  854. }
  855. waitForExpectations(timeout: timeout)
  856. // Then
  857. XCTAssertEqual(request.state, .finished)
  858. }
  859. }