Concurrency.swift 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. //
  2. // Concurrency.swift
  3. //
  4. // Copyright (c) 2021 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. #if compiler(>=5.5.2) && canImport(_Concurrency)
  25. import Foundation
  26. // MARK: - Request Event Streams
  27. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  28. extension Request {
  29. /// Creates a `StreamOf<Progress>` for the instance's upload progress.
  30. ///
  31. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  32. ///
  33. /// - Returns: The `StreamOf<Progress>`.
  34. public func uploadProgress(bufferingPolicy: StreamOf<Progress>.BufferingPolicy = .unbounded) -> StreamOf<Progress> {
  35. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  36. uploadProgress(queue: .singleEventQueue) { progress in
  37. continuation.yield(progress)
  38. }
  39. }
  40. }
  41. /// Creates a `StreamOf<Progress>` for the instance's download progress.
  42. ///
  43. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  44. ///
  45. /// - Returns: The `StreamOf<Progress>`.
  46. public func downloadProgress(bufferingPolicy: StreamOf<Progress>.BufferingPolicy = .unbounded) -> StreamOf<Progress> {
  47. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  48. downloadProgress(queue: .singleEventQueue) { progress in
  49. continuation.yield(progress)
  50. }
  51. }
  52. }
  53. /// Creates a `StreamOf<URLRequest>` for the `URLRequest`s produced for the instance.
  54. ///
  55. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  56. ///
  57. /// - Returns: The `StreamOf<URLRequest>`.
  58. public func urlRequests(bufferingPolicy: StreamOf<URLRequest>.BufferingPolicy = .unbounded) -> StreamOf<URLRequest> {
  59. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  60. onURLRequestCreation(on: .singleEventQueue) { request in
  61. continuation.yield(request)
  62. }
  63. }
  64. }
  65. /// Creates a `StreamOf<URLSessionTask>` for the `URLSessionTask`s produced for the instance.
  66. ///
  67. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  68. ///
  69. /// - Returns: The `StreamOf<URLSessionTask>`.
  70. public func urlSessionTasks(bufferingPolicy: StreamOf<URLSessionTask>.BufferingPolicy = .unbounded) -> StreamOf<URLSessionTask> {
  71. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  72. onURLSessionTaskCreation(on: .singleEventQueue) { task in
  73. continuation.yield(task)
  74. }
  75. }
  76. }
  77. /// Creates a `StreamOf<String>` for the cURL descriptions produced for the instance.
  78. ///
  79. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  80. ///
  81. /// - Returns: The `StreamOf<String>`.
  82. public func cURLDescriptions(bufferingPolicy: StreamOf<String>.BufferingPolicy = .unbounded) -> StreamOf<String> {
  83. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  84. cURLDescription(on: .singleEventQueue) { description in
  85. continuation.yield(description)
  86. }
  87. }
  88. }
  89. private func stream<T>(of type: T.Type = T.self,
  90. bufferingPolicy: StreamOf<T>.BufferingPolicy = .unbounded,
  91. yielder: @escaping (StreamOf<T>.Continuation) -> Void) -> StreamOf<T> {
  92. StreamOf<T>(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  93. yielder(continuation)
  94. // Must come after serializers run in order to catch retry progress.
  95. onFinish {
  96. continuation.finish()
  97. }
  98. }
  99. }
  100. }
  101. // MARK: - DataTask
  102. /// Value used to `await` a `DataResponse` and associated values.
  103. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  104. public struct DataTask<Value> {
  105. /// `DataResponse` produced by the `DataRequest` and its response handler.
  106. public var response: DataResponse<Value, AFError> {
  107. get async {
  108. if shouldAutomaticallyCancel {
  109. return await withTaskCancellationHandler {
  110. self.cancel()
  111. } operation: {
  112. await task.value
  113. }
  114. } else {
  115. return await task.value
  116. }
  117. }
  118. }
  119. /// `Result` of any response serialization performed for the `response`.
  120. public var result: Result<Value, AFError> {
  121. get async { await response.result }
  122. }
  123. /// `Value` returned by the `response`.
  124. public var value: Value {
  125. get async throws {
  126. try await result.get()
  127. }
  128. }
  129. private let request: DataRequest
  130. private let task: Task<DataResponse<Value, AFError>, Never>
  131. private let shouldAutomaticallyCancel: Bool
  132. fileprivate init(request: DataRequest, task: Task<DataResponse<Value, AFError>, Never>, shouldAutomaticallyCancel: Bool) {
  133. self.request = request
  134. self.task = task
  135. self.shouldAutomaticallyCancel = shouldAutomaticallyCancel
  136. }
  137. /// Cancel the underlying `DataRequest` and `Task`.
  138. public func cancel() {
  139. task.cancel()
  140. }
  141. /// Resume the underlying `DataRequest`.
  142. public func resume() {
  143. request.resume()
  144. }
  145. /// Suspend the underlying `DataRequest`.
  146. public func suspend() {
  147. request.suspend()
  148. }
  149. }
  150. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  151. extension DataRequest {
  152. /// Creates a `DataTask` to `await` a `Data` value.
  153. ///
  154. /// - Parameters:
  155. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  156. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  157. /// properties. `false` by default.
  158. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before completion.
  159. /// - emptyResponseCodes: HTTP response codes for which empty responses are allowed. `[204, 205]` by default.
  160. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  161. ///
  162. /// - Returns: The `DataTask`.
  163. public func serializingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = false,
  164. dataPreprocessor: DataPreprocessor = DataResponseSerializer.defaultDataPreprocessor,
  165. emptyResponseCodes: Set<Int> = DataResponseSerializer.defaultEmptyResponseCodes,
  166. emptyRequestMethods: Set<HTTPMethod> = DataResponseSerializer.defaultEmptyRequestMethods) -> DataTask<Data> {
  167. serializingResponse(using: DataResponseSerializer(dataPreprocessor: dataPreprocessor,
  168. emptyResponseCodes: emptyResponseCodes,
  169. emptyRequestMethods: emptyRequestMethods),
  170. automaticallyCancelling: shouldAutomaticallyCancel)
  171. }
  172. /// Creates a `DataTask` to `await` serialization of a `Decodable` value.
  173. ///
  174. /// - Parameters:
  175. /// - type: `Decodable` type to decode from response data.
  176. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  177. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  178. /// properties. `false` by default.
  179. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  180. /// `PassthroughPreprocessor()` by default.
  181. /// - decoder: `DataDecoder` to use to decode the response. `JSONDecoder()` by default.
  182. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  183. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  184. ///
  185. /// - Returns: The `DataTask`.
  186. public func serializingDecodable<Value: Decodable>(_ type: Value.Type = Value.self,
  187. automaticallyCancelling shouldAutomaticallyCancel: Bool = false,
  188. dataPreprocessor: DataPreprocessor = DecodableResponseSerializer<Value>.defaultDataPreprocessor,
  189. decoder: DataDecoder = JSONDecoder(),
  190. emptyResponseCodes: Set<Int> = DecodableResponseSerializer<Value>.defaultEmptyResponseCodes,
  191. emptyRequestMethods: Set<HTTPMethod> = DecodableResponseSerializer<Value>.defaultEmptyRequestMethods) -> DataTask<Value> {
  192. serializingResponse(using: DecodableResponseSerializer<Value>(dataPreprocessor: dataPreprocessor,
  193. decoder: decoder,
  194. emptyResponseCodes: emptyResponseCodes,
  195. emptyRequestMethods: emptyRequestMethods),
  196. automaticallyCancelling: shouldAutomaticallyCancel)
  197. }
  198. /// Creates a `DataTask` to `await` serialization of a `String` value.
  199. ///
  200. /// - Parameters:
  201. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  202. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  203. /// properties. `false` by default.
  204. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  205. /// `PassthroughPreprocessor()` by default.
  206. /// - encoding: `String.Encoding` to use during serialization. Defaults to `nil`, in which case
  207. /// the encoding will be determined from the server response, falling back to the
  208. /// default HTTP character set, `ISO-8859-1`.
  209. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  210. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  211. ///
  212. /// - Returns: The `DataTask`.
  213. public func serializingString(automaticallyCancelling shouldAutomaticallyCancel: Bool = false,
  214. dataPreprocessor: DataPreprocessor = StringResponseSerializer.defaultDataPreprocessor,
  215. encoding: String.Encoding? = nil,
  216. emptyResponseCodes: Set<Int> = StringResponseSerializer.defaultEmptyResponseCodes,
  217. emptyRequestMethods: Set<HTTPMethod> = StringResponseSerializer.defaultEmptyRequestMethods) -> DataTask<String> {
  218. serializingResponse(using: StringResponseSerializer(dataPreprocessor: dataPreprocessor,
  219. encoding: encoding,
  220. emptyResponseCodes: emptyResponseCodes,
  221. emptyRequestMethods: emptyRequestMethods),
  222. automaticallyCancelling: shouldAutomaticallyCancel)
  223. }
  224. /// Creates a `DataTask` to `await` serialization using the provided `ResponseSerializer` instance.
  225. ///
  226. /// - Parameters:
  227. /// - serializer: `ResponseSerializer` responsible for serializing the request, response, and data.
  228. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  229. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  230. /// properties. `false` by default.
  231. ///
  232. /// - Returns: The `DataTask`.
  233. public func serializingResponse<Serializer: ResponseSerializer>(using serializer: Serializer,
  234. automaticallyCancelling shouldAutomaticallyCancel: Bool = false)
  235. -> DataTask<Serializer.SerializedObject> {
  236. dataTask(automaticallyCancelling: shouldAutomaticallyCancel) {
  237. self.response(queue: .singleEventQueue,
  238. responseSerializer: serializer,
  239. completionHandler: $0)
  240. }
  241. }
  242. /// Creates a `DataTask` to `await` serialization using the provided `DataResponseSerializerProtocol` instance.
  243. ///
  244. /// - Parameters:
  245. /// - serializer: `DataResponseSerializerProtocol` responsible for serializing the request,
  246. /// response, and data.
  247. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  248. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  249. /// properties. `false` by default.
  250. ///
  251. /// - Returns: The `DataTask`.
  252. public func serializingResponse<Serializer: DataResponseSerializerProtocol>(using serializer: Serializer,
  253. automaticallyCancelling shouldAutomaticallyCancel: Bool = false)
  254. -> DataTask<Serializer.SerializedObject> {
  255. dataTask(automaticallyCancelling: shouldAutomaticallyCancel) {
  256. self.response(queue: .singleEventQueue,
  257. responseSerializer: serializer,
  258. completionHandler: $0)
  259. }
  260. }
  261. private func dataTask<Value>(automaticallyCancelling shouldAutomaticallyCancel: Bool,
  262. forResponse onResponse: @escaping (@escaping (DataResponse<Value, AFError>) -> Void) -> Void)
  263. -> DataTask<Value> {
  264. let task = Task {
  265. await withTaskCancellationHandler {
  266. self.cancel()
  267. } operation: {
  268. await withCheckedContinuation { continuation in
  269. onResponse {
  270. continuation.resume(returning: $0)
  271. }
  272. }
  273. }
  274. }
  275. return DataTask<Value>(request: self, task: task, shouldAutomaticallyCancel: shouldAutomaticallyCancel)
  276. }
  277. }
  278. // MARK: - DownloadTask
  279. /// Value used to `await` a `DownloadResponse` and associated values.
  280. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  281. public struct DownloadTask<Value> {
  282. /// `DownloadResponse` produced by the `DownloadRequest` and its response handler.
  283. public var response: DownloadResponse<Value, AFError> {
  284. get async {
  285. if shouldAutomaticallyCancel {
  286. return await withTaskCancellationHandler {
  287. self.cancel()
  288. } operation: {
  289. await task.value
  290. }
  291. } else {
  292. return await task.value
  293. }
  294. }
  295. }
  296. /// `Result` of any response serialization performed for the `response`.
  297. public var result: Result<Value, AFError> {
  298. get async { await response.result }
  299. }
  300. /// `Value` returned by the `response`.
  301. public var value: Value {
  302. get async throws {
  303. try await result.get()
  304. }
  305. }
  306. private let task: Task<AFDownloadResponse<Value>, Never>
  307. private let request: DownloadRequest
  308. private let shouldAutomaticallyCancel: Bool
  309. fileprivate init(request: DownloadRequest, task: Task<AFDownloadResponse<Value>, Never>, shouldAutomaticallyCancel: Bool) {
  310. self.request = request
  311. self.task = task
  312. self.shouldAutomaticallyCancel = shouldAutomaticallyCancel
  313. }
  314. /// Cancel the underlying `DownloadRequest` and `Task`.
  315. public func cancel() {
  316. task.cancel()
  317. }
  318. /// Resume the underlying `DownloadRequest`.
  319. public func resume() {
  320. request.resume()
  321. }
  322. /// Suspend the underlying `DownloadRequest`.
  323. public func suspend() {
  324. request.suspend()
  325. }
  326. }
  327. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  328. extension DownloadRequest {
  329. /// Creates a `DownloadTask` to `await` a `Data` value.
  330. ///
  331. /// - Parameters:
  332. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  333. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  334. /// properties. `false` by default.
  335. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before completion.
  336. /// - emptyResponseCodes: HTTP response codes for which empty responses are allowed. `[204, 205]` by default.
  337. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  338. ///
  339. /// - Returns: The `DownloadTask`.
  340. public func serializingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = false,
  341. dataPreprocessor: DataPreprocessor = DataResponseSerializer.defaultDataPreprocessor,
  342. emptyResponseCodes: Set<Int> = DataResponseSerializer.defaultEmptyResponseCodes,
  343. emptyRequestMethods: Set<HTTPMethod> = DataResponseSerializer.defaultEmptyRequestMethods) -> DownloadTask<Data> {
  344. serializingDownload(using: DataResponseSerializer(dataPreprocessor: dataPreprocessor,
  345. emptyResponseCodes: emptyResponseCodes,
  346. emptyRequestMethods: emptyRequestMethods),
  347. automaticallyCancelling: shouldAutomaticallyCancel)
  348. }
  349. /// Creates a `DownloadTask` to `await` serialization of a `Decodable` value.
  350. ///
  351. /// - Note: This serializer reads the entire response into memory before parsing.
  352. ///
  353. /// - Parameters:
  354. /// - type: `Decodable` type to decode from response data.
  355. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  356. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  357. /// properties. `false` by default.
  358. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  359. /// `PassthroughPreprocessor()` by default.
  360. /// - decoder: `DataDecoder` to use to decode the response. `JSONDecoder()` by default.
  361. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  362. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  363. ///
  364. /// - Returns: The `DownloadTask`.
  365. public func serializingDecodable<Value: Decodable>(_ type: Value.Type = Value.self,
  366. automaticallyCancelling shouldAutomaticallyCancel: Bool = false,
  367. dataPreprocessor: DataPreprocessor = DecodableResponseSerializer<Value>.defaultDataPreprocessor,
  368. decoder: DataDecoder = JSONDecoder(),
  369. emptyResponseCodes: Set<Int> = DecodableResponseSerializer<Value>.defaultEmptyResponseCodes,
  370. emptyRequestMethods: Set<HTTPMethod> = DecodableResponseSerializer<Value>.defaultEmptyRequestMethods) -> DownloadTask<Value> {
  371. serializingDownload(using: DecodableResponseSerializer<Value>(dataPreprocessor: dataPreprocessor,
  372. decoder: decoder,
  373. emptyResponseCodes: emptyResponseCodes,
  374. emptyRequestMethods: emptyRequestMethods),
  375. automaticallyCancelling: shouldAutomaticallyCancel)
  376. }
  377. /// Creates a `DownloadTask` to `await` serialization of the downloaded file's `URL` on disk.
  378. ///
  379. /// - Returns: The `DownloadTask`.
  380. public func serializingDownloadedFileURL() -> DownloadTask<URL> {
  381. serializingDownload(using: URLResponseSerializer())
  382. }
  383. /// Creates a `DownloadTask` to `await` serialization of a `String` value.
  384. ///
  385. /// - Parameters:
  386. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  387. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  388. /// properties. `false` by default.
  389. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the
  390. /// serializer. `PassthroughPreprocessor()` by default.
  391. /// - encoding: `String.Encoding` to use during serialization. Defaults to `nil`, in which case
  392. /// the encoding will be determined from the server response, falling back to the
  393. /// default HTTP character set, `ISO-8859-1`.
  394. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  395. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  396. ///
  397. /// - Returns: The `DownloadTask`.
  398. public func serializingString(automaticallyCancelling shouldAutomaticallyCancel: Bool = false,
  399. dataPreprocessor: DataPreprocessor = StringResponseSerializer.defaultDataPreprocessor,
  400. encoding: String.Encoding? = nil,
  401. emptyResponseCodes: Set<Int> = StringResponseSerializer.defaultEmptyResponseCodes,
  402. emptyRequestMethods: Set<HTTPMethod> = StringResponseSerializer.defaultEmptyRequestMethods) -> DownloadTask<String> {
  403. serializingDownload(using: StringResponseSerializer(dataPreprocessor: dataPreprocessor,
  404. encoding: encoding,
  405. emptyResponseCodes: emptyResponseCodes,
  406. emptyRequestMethods: emptyRequestMethods),
  407. automaticallyCancelling: shouldAutomaticallyCancel)
  408. }
  409. /// Creates a `DownloadTask` to `await` serialization using the provided `ResponseSerializer` instance.
  410. ///
  411. /// - Parameters:
  412. /// - serializer: `ResponseSerializer` responsible for serializing the request, response, and data.
  413. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  414. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  415. /// properties. `false` by default.
  416. ///
  417. /// - Returns: The `DownloadTask`.
  418. public func serializingDownload<Serializer: ResponseSerializer>(using serializer: Serializer,
  419. automaticallyCancelling shouldAutomaticallyCancel: Bool = false)
  420. -> DownloadTask<Serializer.SerializedObject> {
  421. downloadTask(automaticallyCancelling: shouldAutomaticallyCancel) {
  422. self.response(queue: .singleEventQueue,
  423. responseSerializer: serializer,
  424. completionHandler: $0)
  425. }
  426. }
  427. /// Creates a `DownloadTask` to `await` serialization using the provided `DownloadResponseSerializerProtocol`
  428. /// instance.
  429. ///
  430. /// - Parameters:
  431. /// - serializer: `DownloadResponseSerializerProtocol` responsible for serializing the request,
  432. /// response, and data.
  433. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  434. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  435. /// properties. `false` by default.
  436. ///
  437. /// - Returns: The `DownloadTask`.
  438. public func serializingDownload<Serializer: DownloadResponseSerializerProtocol>(using serializer: Serializer,
  439. automaticallyCancelling shouldAutomaticallyCancel: Bool = false)
  440. -> DownloadTask<Serializer.SerializedObject> {
  441. downloadTask(automaticallyCancelling: shouldAutomaticallyCancel) {
  442. self.response(queue: .singleEventQueue,
  443. responseSerializer: serializer,
  444. completionHandler: $0)
  445. }
  446. }
  447. private func downloadTask<Value>(automaticallyCancelling shouldAutomaticallyCancel: Bool,
  448. forResponse onResponse: @escaping (@escaping (DownloadResponse<Value, AFError>) -> Void) -> Void)
  449. -> DownloadTask<Value> {
  450. let task = Task {
  451. await withTaskCancellationHandler {
  452. self.cancel()
  453. } operation: {
  454. await withCheckedContinuation { continuation in
  455. onResponse {
  456. continuation.resume(returning: $0)
  457. }
  458. }
  459. }
  460. }
  461. return DownloadTask<Value>(request: self, task: task, shouldAutomaticallyCancel: shouldAutomaticallyCancel)
  462. }
  463. }
  464. // MARK: - DataStreamTask
  465. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  466. public struct DataStreamTask {
  467. // Type of created streams.
  468. public typealias Stream<Success, Failure: Error> = StreamOf<DataStreamRequest.Stream<Success, Failure>>
  469. private let request: DataStreamRequest
  470. fileprivate init(request: DataStreamRequest) {
  471. self.request = request
  472. }
  473. /// Creates a `Stream` of `Data` values from the underlying `DataStreamRequest`.
  474. ///
  475. /// - Parameters:
  476. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  477. /// which observation of the stream stops. `true` by default.
  478. /// - bufferingPolicy: ` BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  479. ///
  480. /// - Returns: The `Stream`.
  481. public func streamingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true, bufferingPolicy: Stream<Data, Never>.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
  482. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  483. self.request.responseStream(on: .streamCompletionQueue(forRequestID: request.id), stream: onStream)
  484. }
  485. }
  486. /// Creates a `Stream` of `UTF-8` `String`s from the underlying `DataStreamRequest`.
  487. ///
  488. /// - Parameters:
  489. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  490. /// which observation of the stream stops. `true` by default.
  491. /// - bufferingPolicy: ` BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  492. /// - Returns:
  493. public func streamingStrings(automaticallyCancelling shouldAutomaticallyCancel: Bool = true, bufferingPolicy: Stream<String, Never>.BufferingPolicy = .unbounded) -> Stream<String, Never> {
  494. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  495. self.request.responseStreamString(on: .streamCompletionQueue(forRequestID: request.id), stream: onStream)
  496. }
  497. }
  498. /// Creates a `Stream` of `Decodable` values from the underlying `DataStreamRequest`.
  499. ///
  500. /// - Parameters:
  501. /// - type: `Decodable` type to be serialized from stream payloads.
  502. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  503. /// which observation of the stream stops. `true` by default.
  504. /// - bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  505. ///
  506. /// - Returns: The `Stream`.
  507. public func streamingDecodables<T>(_ type: T.Type = T.self,
  508. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  509. bufferingPolicy: Stream<T, AFError>.BufferingPolicy = .unbounded)
  510. -> Stream<T, AFError> where T: Decodable {
  511. streamingResponses(serializedUsing: DecodableStreamSerializer<T>(),
  512. automaticallyCancelling: shouldAutomaticallyCancel,
  513. bufferingPolicy: bufferingPolicy)
  514. }
  515. /// Creates a `Stream` of values using the provided `DataStreamSerializer` from the underlying `DataStreamRequest`.
  516. ///
  517. /// - Parameters:
  518. /// - serializer: `DataStreamSerializer` to use to serialize incoming `Data`.
  519. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  520. /// which observation of the stream stops. `true` by default.
  521. /// - bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  522. ///
  523. /// - Returns: The `Stream`.
  524. public func streamingResponses<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
  525. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  526. bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.BufferingPolicy = .unbounded)
  527. -> Stream<Serializer.SerializedObject, AFError> {
  528. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  529. self.request.responseStream(using: serializer,
  530. on: .streamCompletionQueue(forRequestID: request.id),
  531. stream: onStream)
  532. }
  533. }
  534. private func createStream<Success, Failure: Error>(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  535. bufferingPolicy: Stream<Success, Failure>.BufferingPolicy = .unbounded,
  536. forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
  537. -> Stream<Success, Failure> {
  538. StreamOf(bufferingPolicy: bufferingPolicy) {
  539. guard shouldAutomaticallyCancel,
  540. request.isInitialized || request.isResumed || request.isSuspended else { return }
  541. cancel()
  542. } builder: { continuation in
  543. onResponse { stream in
  544. continuation.yield(stream)
  545. if case .complete = stream.event {
  546. continuation.finish()
  547. }
  548. }
  549. }
  550. }
  551. /// Cancel the underlying `DataStreamRequest`.
  552. public func cancel() {
  553. request.cancel()
  554. }
  555. /// Resume the underlying `DataStreamRequest`.
  556. public func resume() {
  557. request.resume()
  558. }
  559. /// Suspend the underlying `DataStreamRequest`.
  560. public func suspend() {
  561. request.suspend()
  562. }
  563. }
  564. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  565. extension DataStreamRequest {
  566. /// Creates a `DataStreamTask` used to `await` streams of serialized values.
  567. ///
  568. /// - Returns: The `DataStreamTask`.
  569. public func streamTask() -> DataStreamTask {
  570. DataStreamTask(request: self)
  571. }
  572. }
  573. extension DispatchQueue {
  574. fileprivate static let singleEventQueue = DispatchQueue(label: "org.alamofire.concurrencySingleEventQueue",
  575. attributes: .concurrent)
  576. fileprivate static func streamCompletionQueue(forRequestID id: UUID) -> DispatchQueue {
  577. DispatchQueue(label: "org.alamofire.concurrencyStreamCompletionQueue-\(id)", target: .singleEventQueue)
  578. }
  579. }
  580. /// An asynchronous sequence generated from an underlying `AsyncStream`. Only produced by Alamofire.
  581. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  582. public struct StreamOf<Element>: AsyncSequence {
  583. public typealias AsyncIterator = Iterator
  584. public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
  585. fileprivate typealias Continuation = AsyncStream<Element>.Continuation
  586. private let bufferingPolicy: BufferingPolicy
  587. private let onTermination: (() -> Void)?
  588. private let builder: (Continuation) -> Void
  589. fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
  590. onTermination: (() -> Void)? = nil,
  591. builder: @escaping (Continuation) -> Void) {
  592. self.bufferingPolicy = bufferingPolicy
  593. self.onTermination = onTermination
  594. self.builder = builder
  595. }
  596. public func makeAsyncIterator() -> Iterator {
  597. var continuation: AsyncStream<Element>.Continuation?
  598. let stream = AsyncStream<Element> { innerContinuation in
  599. continuation = innerContinuation
  600. builder(innerContinuation)
  601. }
  602. return Iterator(iterator: stream.makeAsyncIterator()) {
  603. continuation?.finish()
  604. self.onTermination?()
  605. }
  606. }
  607. public struct Iterator: AsyncIteratorProtocol {
  608. private final class Token {
  609. private let onDeinit: () -> Void
  610. init(onDeinit: @escaping () -> Void) {
  611. self.onDeinit = onDeinit
  612. }
  613. deinit {
  614. onDeinit()
  615. }
  616. }
  617. private var iterator: AsyncStream<Element>.AsyncIterator
  618. private let token: Token
  619. init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
  620. self.iterator = iterator
  621. token = Token(onDeinit: onCancellation)
  622. }
  623. public mutating func next() async -> Element? {
  624. await iterator.next()
  625. }
  626. }
  627. }
  628. #endif