Concurrency.swift 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. //
  2. // Concurrency.swift
  3. //
  4. // Copyright (c) 2021 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. #if canImport(_Concurrency)
  25. import Foundation
  26. // MARK: - Request Event Streams
  27. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  28. extension Request {
  29. /// Creates a `StreamOf<Progress>` for the instance's upload progress.
  30. ///
  31. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  32. ///
  33. /// - Returns: The `StreamOf<Progress>`.
  34. public func uploadProgress(bufferingPolicy: StreamOf<Progress>.BufferingPolicy = .unbounded) -> StreamOf<Progress> {
  35. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  36. uploadProgress(queue: underlyingQueue) { progress in
  37. continuation.yield(progress)
  38. }
  39. }
  40. }
  41. /// Creates a `StreamOf<Progress>` for the instance's download progress.
  42. ///
  43. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  44. ///
  45. /// - Returns: The `StreamOf<Progress>`.
  46. public func downloadProgress(bufferingPolicy: StreamOf<Progress>.BufferingPolicy = .unbounded) -> StreamOf<Progress> {
  47. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  48. downloadProgress(queue: underlyingQueue) { progress in
  49. continuation.yield(progress)
  50. }
  51. }
  52. }
  53. /// Creates a `StreamOf<URLRequest>` for the `URLRequest`s produced for the instance.
  54. ///
  55. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  56. ///
  57. /// - Returns: The `StreamOf<URLRequest>`.
  58. public func urlRequests(bufferingPolicy: StreamOf<URLRequest>.BufferingPolicy = .unbounded) -> StreamOf<URLRequest> {
  59. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  60. onURLRequestCreation(on: underlyingQueue) { request in
  61. continuation.yield(request)
  62. }
  63. }
  64. }
  65. /// Creates a `StreamOf<URLSessionTask>` for the `URLSessionTask`s produced for the instance.
  66. ///
  67. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  68. ///
  69. /// - Returns: The `StreamOf<URLSessionTask>`.
  70. public func urlSessionTasks(bufferingPolicy: StreamOf<URLSessionTask>.BufferingPolicy = .unbounded) -> StreamOf<URLSessionTask> {
  71. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  72. onURLSessionTaskCreation(on: underlyingQueue) { task in
  73. continuation.yield(task)
  74. }
  75. }
  76. }
  77. /// Creates a `StreamOf<String>` for the cURL descriptions produced for the instance.
  78. ///
  79. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  80. ///
  81. /// - Returns: The `StreamOf<String>`.
  82. public func cURLDescriptions(bufferingPolicy: StreamOf<String>.BufferingPolicy = .unbounded) -> StreamOf<String> {
  83. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  84. cURLDescription(on: underlyingQueue) { description in
  85. continuation.yield(description)
  86. }
  87. }
  88. }
  89. fileprivate func stream<T>(of type: T.Type = T.self,
  90. bufferingPolicy: StreamOf<T>.BufferingPolicy = .unbounded,
  91. yielder: @escaping (StreamOf<T>.Continuation) -> Void) -> StreamOf<T> {
  92. StreamOf<T>(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  93. yielder(continuation)
  94. // Must come after serializers run in order to catch retry progress.
  95. onFinish {
  96. continuation.finish()
  97. }
  98. }
  99. }
  100. }
  101. // MARK: - DataTask
  102. /// Value used to `await` a `DataResponse` and associated values.
  103. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  104. public struct DataTask<Value> {
  105. /// `DataResponse` produced by the `DataRequest` and its response handler.
  106. public var response: DataResponse<Value, AFError> {
  107. get async {
  108. if shouldAutomaticallyCancel {
  109. return await withTaskCancellationHandler {
  110. await task.value
  111. } onCancel: {
  112. cancel()
  113. }
  114. } else {
  115. return await task.value
  116. }
  117. }
  118. }
  119. /// `Result` of any response serialization performed for the `response`.
  120. public var result: Result<Value, AFError> {
  121. get async { await response.result }
  122. }
  123. /// `Value` returned by the `response`.
  124. public var value: Value {
  125. get async throws {
  126. try await result.get()
  127. }
  128. }
  129. private let request: DataRequest
  130. private let task: Task<DataResponse<Value, AFError>, Never>
  131. private let shouldAutomaticallyCancel: Bool
  132. fileprivate init(request: DataRequest, task: Task<DataResponse<Value, AFError>, Never>, shouldAutomaticallyCancel: Bool) {
  133. self.request = request
  134. self.task = task
  135. self.shouldAutomaticallyCancel = shouldAutomaticallyCancel
  136. }
  137. /// Cancel the underlying `DataRequest` and `Task`.
  138. public func cancel() {
  139. task.cancel()
  140. }
  141. /// Resume the underlying `DataRequest`.
  142. public func resume() {
  143. request.resume()
  144. }
  145. /// Suspend the underlying `DataRequest`.
  146. public func suspend() {
  147. request.suspend()
  148. }
  149. }
  150. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  151. extension DataRequest {
  152. /// Creates a `StreamOf<HTTPURLResponse>` for the instance's responses.
  153. ///
  154. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  155. ///
  156. /// - Returns: The `StreamOf<HTTPURLResponse>`.
  157. public func httpResponses(bufferingPolicy: StreamOf<HTTPURLResponse>.BufferingPolicy = .unbounded) -> StreamOf<HTTPURLResponse> {
  158. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  159. onHTTPResponse(on: underlyingQueue) { response in
  160. continuation.yield(response)
  161. }
  162. }
  163. }
  164. /// Sets an async closure returning a `Request.ResponseDisposition`, called whenever the `DataRequest` produces an
  165. /// `HTTPURLResponse`.
  166. ///
  167. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  168. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  169. /// where responses after the first will contain the part headers.
  170. ///
  171. /// - Parameters:
  172. /// - handler: Async closure executed when a new `HTTPURLResponse` is received and returning a
  173. /// `ResponseDisposition` value. This value determines whether to continue the request or cancel it as
  174. /// if `cancel()` had been called on the instance. Note, this closure is called on an arbitrary thread,
  175. /// so any synchronous calls in it will execute in that context.
  176. ///
  177. /// - Returns: The instance.
  178. @_disfavoredOverload
  179. @discardableResult
  180. public func onHTTPResponse(
  181. perform handler: @escaping @Sendable (_ response: HTTPURLResponse) async -> ResponseDisposition
  182. ) -> Self {
  183. onHTTPResponse(on: underlyingQueue) { response, completionHandler in
  184. Task {
  185. let disposition = await handler(response)
  186. completionHandler(disposition)
  187. }
  188. }
  189. return self
  190. }
  191. /// Sets an async closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
  192. ///
  193. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  194. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  195. /// where responses after the first will contain the part headers.
  196. ///
  197. /// - Parameters:
  198. /// - handler: Async closure executed when a new `HTTPURLResponse` is received. Note, this closure is called on an
  199. /// arbitrary thread, so any synchronous calls in it will execute in that context.
  200. ///
  201. /// - Returns: The instance.
  202. @discardableResult
  203. public func onHTTPResponse(perform handler: @escaping @Sendable (_ response: HTTPURLResponse) async -> Void) -> Self {
  204. onHTTPResponse { response in
  205. await handler(response)
  206. return .allow
  207. }
  208. return self
  209. }
  210. /// Creates a `DataTask` to `await` a `Data` value.
  211. ///
  212. /// - Parameters:
  213. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  214. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  215. /// properties. `true` by default.
  216. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before completion.
  217. /// - emptyResponseCodes: HTTP response codes for which empty responses are allowed. `[204, 205]` by default.
  218. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  219. ///
  220. /// - Returns: The `DataTask`.
  221. public func serializingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  222. dataPreprocessor: any DataPreprocessor = DataResponseSerializer.defaultDataPreprocessor,
  223. emptyResponseCodes: Set<Int> = DataResponseSerializer.defaultEmptyResponseCodes,
  224. emptyRequestMethods: Set<HTTPMethod> = DataResponseSerializer.defaultEmptyRequestMethods) -> DataTask<Data> {
  225. serializingResponse(using: DataResponseSerializer(dataPreprocessor: dataPreprocessor,
  226. emptyResponseCodes: emptyResponseCodes,
  227. emptyRequestMethods: emptyRequestMethods),
  228. automaticallyCancelling: shouldAutomaticallyCancel)
  229. }
  230. /// Creates a `DataTask` to `await` serialization of a `Decodable` value.
  231. ///
  232. /// - Parameters:
  233. /// - type: `Decodable` type to decode from response data.
  234. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  235. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  236. /// properties. `true` by default.
  237. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  238. /// `PassthroughPreprocessor()` by default.
  239. /// - decoder: `DataDecoder` to use to decode the response. `JSONDecoder()` by default.
  240. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  241. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  242. ///
  243. /// - Returns: The `DataTask`.
  244. public func serializingDecodable<Value: Decodable>(_ type: Value.Type = Value.self,
  245. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  246. dataPreprocessor: any DataPreprocessor = DecodableResponseSerializer<Value>.defaultDataPreprocessor,
  247. decoder: any DataDecoder = JSONDecoder(),
  248. emptyResponseCodes: Set<Int> = DecodableResponseSerializer<Value>.defaultEmptyResponseCodes,
  249. emptyRequestMethods: Set<HTTPMethod> = DecodableResponseSerializer<Value>.defaultEmptyRequestMethods) -> DataTask<Value> {
  250. serializingResponse(using: DecodableResponseSerializer<Value>(dataPreprocessor: dataPreprocessor,
  251. decoder: decoder,
  252. emptyResponseCodes: emptyResponseCodes,
  253. emptyRequestMethods: emptyRequestMethods),
  254. automaticallyCancelling: shouldAutomaticallyCancel)
  255. }
  256. /// Creates a `DataTask` to `await` serialization of a `String` value.
  257. ///
  258. /// - Parameters:
  259. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  260. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  261. /// properties. `true` by default.
  262. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  263. /// `PassthroughPreprocessor()` by default.
  264. /// - encoding: `String.Encoding` to use during serialization. Defaults to `nil`, in which case
  265. /// the encoding will be determined from the server response, falling back to the
  266. /// default HTTP character set, `ISO-8859-1`.
  267. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  268. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  269. ///
  270. /// - Returns: The `DataTask`.
  271. public func serializingString(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  272. dataPreprocessor: any DataPreprocessor = StringResponseSerializer.defaultDataPreprocessor,
  273. encoding: String.Encoding? = nil,
  274. emptyResponseCodes: Set<Int> = StringResponseSerializer.defaultEmptyResponseCodes,
  275. emptyRequestMethods: Set<HTTPMethod> = StringResponseSerializer.defaultEmptyRequestMethods) -> DataTask<String> {
  276. serializingResponse(using: StringResponseSerializer(dataPreprocessor: dataPreprocessor,
  277. encoding: encoding,
  278. emptyResponseCodes: emptyResponseCodes,
  279. emptyRequestMethods: emptyRequestMethods),
  280. automaticallyCancelling: shouldAutomaticallyCancel)
  281. }
  282. /// Creates a `DataTask` to `await` serialization using the provided `ResponseSerializer` instance.
  283. ///
  284. /// - Parameters:
  285. /// - serializer: `ResponseSerializer` responsible for serializing the request, response, and data.
  286. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  287. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  288. /// properties. `true` by default.
  289. ///
  290. /// - Returns: The `DataTask`.
  291. public func serializingResponse<Serializer: ResponseSerializer>(using serializer: Serializer,
  292. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  293. -> DataTask<Serializer.SerializedObject> {
  294. dataTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  295. response(queue: underlyingQueue,
  296. responseSerializer: serializer,
  297. completionHandler: $0)
  298. }
  299. }
  300. /// Creates a `DataTask` to `await` serialization using the provided `DataResponseSerializerProtocol` instance.
  301. ///
  302. /// - Parameters:
  303. /// - serializer: `DataResponseSerializerProtocol` responsible for serializing the request,
  304. /// response, and data.
  305. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  306. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  307. /// properties. `true` by default.
  308. ///
  309. /// - Returns: The `DataTask`.
  310. public func serializingResponse<Serializer: DataResponseSerializerProtocol>(using serializer: Serializer,
  311. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  312. -> DataTask<Serializer.SerializedObject> {
  313. dataTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  314. response(queue: underlyingQueue,
  315. responseSerializer: serializer,
  316. completionHandler: $0)
  317. }
  318. }
  319. private func dataTask<Value>(automaticallyCancelling shouldAutomaticallyCancel: Bool,
  320. forResponse onResponse: @escaping (@escaping (DataResponse<Value, AFError>) -> Void) -> Void)
  321. -> DataTask<Value> {
  322. let task = Task {
  323. // await withTaskCancellationHandler(handler: { self.cancel() }, operation: {
  324. await withUnsafeContinuation { continuation in
  325. onResponse {
  326. continuation.resume(returning: $0)
  327. }
  328. }
  329. // })
  330. //
  331. // await withTaskCancellationHandler {
  332. // await withCheckedContinuation { continuation in
  333. // onResponse {
  334. // continuation.resume(returning: $0)
  335. // }
  336. // }
  337. // } onCancel: {
  338. // self.cancel()
  339. // }
  340. }
  341. return DataTask<Value>(request: self, task: task, shouldAutomaticallyCancel: shouldAutomaticallyCancel)
  342. }
  343. }
  344. // MARK: - DownloadTask
  345. /// Value used to `await` a `DownloadResponse` and associated values.
  346. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  347. public struct DownloadTask<Value> {
  348. /// `DownloadResponse` produced by the `DownloadRequest` and its response handler.
  349. public var response: DownloadResponse<Value, AFError> {
  350. get async {
  351. if shouldAutomaticallyCancel {
  352. return await withTaskCancellationHandler {
  353. await task.value
  354. } onCancel: {
  355. cancel()
  356. }
  357. } else {
  358. return await task.value
  359. }
  360. }
  361. }
  362. /// `Result` of any response serialization performed for the `response`.
  363. public var result: Result<Value, AFError> {
  364. get async { await response.result }
  365. }
  366. /// `Value` returned by the `response`.
  367. public var value: Value {
  368. get async throws {
  369. try await result.get()
  370. }
  371. }
  372. private let task: Task<AFDownloadResponse<Value>, Never>
  373. private let request: DownloadRequest
  374. private let shouldAutomaticallyCancel: Bool
  375. fileprivate init(request: DownloadRequest, task: Task<AFDownloadResponse<Value>, Never>, shouldAutomaticallyCancel: Bool) {
  376. self.request = request
  377. self.task = task
  378. self.shouldAutomaticallyCancel = shouldAutomaticallyCancel
  379. }
  380. /// Cancel the underlying `DownloadRequest` and `Task`.
  381. public func cancel() {
  382. task.cancel()
  383. }
  384. /// Resume the underlying `DownloadRequest`.
  385. public func resume() {
  386. request.resume()
  387. }
  388. /// Suspend the underlying `DownloadRequest`.
  389. public func suspend() {
  390. request.suspend()
  391. }
  392. }
  393. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  394. extension DownloadRequest {
  395. /// Creates a `DownloadTask` to `await` a `Data` value.
  396. ///
  397. /// - Parameters:
  398. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  399. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  400. /// properties. `true` by default.
  401. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before completion.
  402. /// - emptyResponseCodes: HTTP response codes for which empty responses are allowed. `[204, 205]` by default.
  403. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  404. ///
  405. /// - Returns: The `DownloadTask`.
  406. public func serializingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  407. dataPreprocessor: any DataPreprocessor = DataResponseSerializer.defaultDataPreprocessor,
  408. emptyResponseCodes: Set<Int> = DataResponseSerializer.defaultEmptyResponseCodes,
  409. emptyRequestMethods: Set<HTTPMethod> = DataResponseSerializer.defaultEmptyRequestMethods) -> DownloadTask<Data> {
  410. serializingDownload(using: DataResponseSerializer(dataPreprocessor: dataPreprocessor,
  411. emptyResponseCodes: emptyResponseCodes,
  412. emptyRequestMethods: emptyRequestMethods),
  413. automaticallyCancelling: shouldAutomaticallyCancel)
  414. }
  415. /// Creates a `DownloadTask` to `await` serialization of a `Decodable` value.
  416. ///
  417. /// - Note: This serializer reads the entire response into memory before parsing.
  418. ///
  419. /// - Parameters:
  420. /// - type: `Decodable` type to decode from response data.
  421. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  422. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  423. /// properties. `true` by default.
  424. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  425. /// `PassthroughPreprocessor()` by default.
  426. /// - decoder: `DataDecoder` to use to decode the response. `JSONDecoder()` by default.
  427. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  428. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  429. ///
  430. /// - Returns: The `DownloadTask`.
  431. public func serializingDecodable<Value: Decodable>(_ type: Value.Type = Value.self,
  432. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  433. dataPreprocessor: any DataPreprocessor = DecodableResponseSerializer<Value>.defaultDataPreprocessor,
  434. decoder: any DataDecoder = JSONDecoder(),
  435. emptyResponseCodes: Set<Int> = DecodableResponseSerializer<Value>.defaultEmptyResponseCodes,
  436. emptyRequestMethods: Set<HTTPMethod> = DecodableResponseSerializer<Value>.defaultEmptyRequestMethods) -> DownloadTask<Value> {
  437. serializingDownload(using: DecodableResponseSerializer<Value>(dataPreprocessor: dataPreprocessor,
  438. decoder: decoder,
  439. emptyResponseCodes: emptyResponseCodes,
  440. emptyRequestMethods: emptyRequestMethods),
  441. automaticallyCancelling: shouldAutomaticallyCancel)
  442. }
  443. /// Creates a `DownloadTask` to `await` serialization of the downloaded file's `URL` on disk.
  444. ///
  445. /// - Parameters:
  446. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  447. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  448. /// properties. `true` by default.
  449. ///
  450. /// - Returns: The `DownloadTask`.
  451. public func serializingDownloadedFileURL(automaticallyCancelling shouldAutomaticallyCancel: Bool = true) -> DownloadTask<URL> {
  452. serializingDownload(using: URLResponseSerializer(),
  453. automaticallyCancelling: shouldAutomaticallyCancel)
  454. }
  455. /// Creates a `DownloadTask` to `await` serialization of a `String` value.
  456. ///
  457. /// - Parameters:
  458. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  459. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  460. /// properties. `true` by default.
  461. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the
  462. /// serializer. `PassthroughPreprocessor()` by default.
  463. /// - encoding: `String.Encoding` to use during serialization. Defaults to `nil`, in which case
  464. /// the encoding will be determined from the server response, falling back to the
  465. /// default HTTP character set, `ISO-8859-1`.
  466. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  467. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  468. ///
  469. /// - Returns: The `DownloadTask`.
  470. public func serializingString(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  471. dataPreprocessor: any DataPreprocessor = StringResponseSerializer.defaultDataPreprocessor,
  472. encoding: String.Encoding? = nil,
  473. emptyResponseCodes: Set<Int> = StringResponseSerializer.defaultEmptyResponseCodes,
  474. emptyRequestMethods: Set<HTTPMethod> = StringResponseSerializer.defaultEmptyRequestMethods) -> DownloadTask<String> {
  475. serializingDownload(using: StringResponseSerializer(dataPreprocessor: dataPreprocessor,
  476. encoding: encoding,
  477. emptyResponseCodes: emptyResponseCodes,
  478. emptyRequestMethods: emptyRequestMethods),
  479. automaticallyCancelling: shouldAutomaticallyCancel)
  480. }
  481. /// Creates a `DownloadTask` to `await` serialization using the provided `ResponseSerializer` instance.
  482. ///
  483. /// - Parameters:
  484. /// - serializer: `ResponseSerializer` responsible for serializing the request, response, and data.
  485. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  486. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  487. /// properties. `true` by default.
  488. ///
  489. /// - Returns: The `DownloadTask`.
  490. public func serializingDownload<Serializer: ResponseSerializer>(using serializer: Serializer,
  491. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  492. -> DownloadTask<Serializer.SerializedObject> {
  493. downloadTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  494. response(queue: underlyingQueue,
  495. responseSerializer: serializer,
  496. completionHandler: $0)
  497. }
  498. }
  499. /// Creates a `DownloadTask` to `await` serialization using the provided `DownloadResponseSerializerProtocol`
  500. /// instance.
  501. ///
  502. /// - Parameters:
  503. /// - serializer: `DownloadResponseSerializerProtocol` responsible for serializing the request,
  504. /// response, and data.
  505. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  506. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  507. /// properties. `true` by default.
  508. ///
  509. /// - Returns: The `DownloadTask`.
  510. public func serializingDownload<Serializer: DownloadResponseSerializerProtocol>(using serializer: Serializer,
  511. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  512. -> DownloadTask<Serializer.SerializedObject> {
  513. downloadTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  514. response(queue: underlyingQueue,
  515. responseSerializer: serializer,
  516. completionHandler: $0)
  517. }
  518. }
  519. private func downloadTask<Value>(automaticallyCancelling shouldAutomaticallyCancel: Bool,
  520. forResponse onResponse: @escaping (@escaping (DownloadResponse<Value, AFError>) -> Void) -> Void)
  521. -> DownloadTask<Value> {
  522. let task = Task {
  523. await withTaskCancellationHandler {
  524. await withCheckedContinuation { continuation in
  525. onResponse {
  526. continuation.resume(returning: $0)
  527. }
  528. }
  529. } onCancel: {
  530. self.cancel()
  531. }
  532. }
  533. return DownloadTask<Value>(request: self, task: task, shouldAutomaticallyCancel: shouldAutomaticallyCancel)
  534. }
  535. }
  536. // MARK: - DataStreamTask
  537. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  538. public struct DataStreamTask {
  539. // Type of created streams.
  540. public typealias Stream<Success, Failure: Error> = StreamOf<DataStreamRequest.Stream<Success, Failure>>
  541. private let request: DataStreamRequest
  542. fileprivate init(request: DataStreamRequest) {
  543. self.request = request
  544. }
  545. /// Creates a `Stream` of `Data` values from the underlying `DataStreamRequest`.
  546. ///
  547. /// - Parameters:
  548. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  549. /// which observation of the stream stops. `true` by default.
  550. /// - bufferingPolicy: ` BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  551. ///
  552. /// - Returns: The `Stream`.
  553. public func streamingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true, bufferingPolicy: Stream<Data, Never>.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
  554. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  555. request.responseStream(on: .streamCompletionQueue(forRequestID: request.id), stream: onStream)
  556. }
  557. }
  558. /// Creates a `Stream` of `UTF-8` `String`s from the underlying `DataStreamRequest`.
  559. ///
  560. /// - Parameters:
  561. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  562. /// which observation of the stream stops. `true` by default.
  563. /// - bufferingPolicy: ` BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  564. /// - Returns:
  565. public func streamingStrings(automaticallyCancelling shouldAutomaticallyCancel: Bool = true, bufferingPolicy: Stream<String, Never>.BufferingPolicy = .unbounded) -> Stream<String, Never> {
  566. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  567. request.responseStreamString(on: .streamCompletionQueue(forRequestID: request.id), stream: onStream)
  568. }
  569. }
  570. /// Creates a `Stream` of `Decodable` values from the underlying `DataStreamRequest`.
  571. ///
  572. /// - Parameters:
  573. /// - type: `Decodable` type to be serialized from stream payloads.
  574. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  575. /// which observation of the stream stops. `true` by default.
  576. /// - bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  577. ///
  578. /// - Returns: The `Stream`.
  579. public func streamingDecodables<T>(_ type: T.Type = T.self,
  580. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  581. bufferingPolicy: Stream<T, AFError>.BufferingPolicy = .unbounded)
  582. -> Stream<T, AFError> where T: Decodable {
  583. streamingResponses(serializedUsing: DecodableStreamSerializer<T>(),
  584. automaticallyCancelling: shouldAutomaticallyCancel,
  585. bufferingPolicy: bufferingPolicy)
  586. }
  587. /// Creates a `Stream` of values using the provided `DataStreamSerializer` from the underlying `DataStreamRequest`.
  588. ///
  589. /// - Parameters:
  590. /// - serializer: `DataStreamSerializer` to use to serialize incoming `Data`.
  591. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  592. /// which observation of the stream stops. `true` by default.
  593. /// - bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  594. ///
  595. /// - Returns: The `Stream`.
  596. public func streamingResponses<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
  597. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  598. bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.BufferingPolicy = .unbounded)
  599. -> Stream<Serializer.SerializedObject, AFError> {
  600. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  601. request.responseStream(using: serializer,
  602. on: .streamCompletionQueue(forRequestID: request.id),
  603. stream: onStream)
  604. }
  605. }
  606. private func createStream<Success, Failure: Error>(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  607. bufferingPolicy: Stream<Success, Failure>.BufferingPolicy = .unbounded,
  608. forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
  609. -> Stream<Success, Failure> {
  610. StreamOf(bufferingPolicy: bufferingPolicy) {
  611. guard shouldAutomaticallyCancel,
  612. request.isInitialized || request.isResumed || request.isSuspended else { return }
  613. cancel()
  614. } builder: { continuation in
  615. onResponse { stream in
  616. continuation.yield(stream)
  617. if case .complete = stream.event {
  618. continuation.finish()
  619. }
  620. }
  621. }
  622. }
  623. /// Cancel the underlying `DataStreamRequest`.
  624. public func cancel() {
  625. request.cancel()
  626. }
  627. /// Resume the underlying `DataStreamRequest`.
  628. public func resume() {
  629. request.resume()
  630. }
  631. /// Suspend the underlying `DataStreamRequest`.
  632. public func suspend() {
  633. request.suspend()
  634. }
  635. }
  636. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  637. extension DataStreamRequest {
  638. /// Creates a `StreamOf<HTTPURLResponse>` for the instance's responses.
  639. ///
  640. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  641. ///
  642. /// - Returns: The `StreamOf<HTTPURLResponse>`.
  643. public func httpResponses(bufferingPolicy: StreamOf<HTTPURLResponse>.BufferingPolicy = .unbounded) -> StreamOf<HTTPURLResponse> {
  644. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  645. onHTTPResponse(on: underlyingQueue) { response in
  646. continuation.yield(response)
  647. }
  648. }
  649. }
  650. /// Sets an async closure returning a `Request.ResponseDisposition`, called whenever the `DataStreamRequest`
  651. /// produces an `HTTPURLResponse`.
  652. ///
  653. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  654. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  655. /// where responses after the first will contain the part headers.
  656. ///
  657. /// - Parameters:
  658. /// - handler: Async closure executed when a new `HTTPURLResponse` is received and returning a
  659. /// `ResponseDisposition` value. This value determines whether to continue the request or cancel it as
  660. /// if `cancel()` had been called on the instance. Note, this closure is called on an arbitrary thread,
  661. /// so any synchronous calls in it will execute in that context.
  662. ///
  663. /// - Returns: The instance.
  664. @_disfavoredOverload
  665. @discardableResult
  666. public func onHTTPResponse(perform handler: @escaping @Sendable (HTTPURLResponse) async -> ResponseDisposition) -> Self {
  667. onHTTPResponse(on: underlyingQueue) { response, completionHandler in
  668. Task {
  669. let disposition = await handler(response)
  670. completionHandler(disposition)
  671. }
  672. }
  673. return self
  674. }
  675. /// Sets an async closure called whenever the `DataStreamRequest` produces an `HTTPURLResponse`.
  676. ///
  677. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  678. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  679. /// where responses after the first will contain the part headers.
  680. ///
  681. /// - Parameters:
  682. /// - handler: Async closure executed when a new `HTTPURLResponse` is received. Note, this closure is called on an
  683. /// arbitrary thread, so any synchronous calls in it will execute in that context.
  684. ///
  685. /// - Returns: The instance.
  686. @discardableResult
  687. public func onHTTPResponse(perform handler: @escaping @Sendable (HTTPURLResponse) async -> Void) -> Self {
  688. onHTTPResponse { response in
  689. await handler(response)
  690. return .allow
  691. }
  692. return self
  693. }
  694. /// Creates a `DataStreamTask` used to `await` streams of serialized values.
  695. ///
  696. /// - Returns: The `DataStreamTask`.
  697. public func streamTask() -> DataStreamTask {
  698. DataStreamTask(request: self)
  699. }
  700. }
  701. #if canImport(Darwin) && !canImport(FoundationNetworking) // Only Apple platforms support URLSessionWebSocketTask.
  702. // - MARK: WebSocketTask
  703. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  704. @_spi(WebSocket) public struct WebSocketTask {
  705. private let request: WebSocketRequest
  706. fileprivate init(request: WebSocketRequest) {
  707. self.request = request
  708. }
  709. public typealias EventStreamOf<Success, Failure: Error> = StreamOf<WebSocketRequest.Event<Success, Failure>>
  710. public func streamingMessageEvents(
  711. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  712. bufferingPolicy: EventStreamOf<URLSessionWebSocketTask.Message, Never>.BufferingPolicy = .unbounded
  713. ) -> EventStreamOf<URLSessionWebSocketTask.Message, Never> {
  714. createStream(automaticallyCancelling: shouldAutomaticallyCancel,
  715. bufferingPolicy: bufferingPolicy,
  716. transform: { $0 }) { onEvent in
  717. request.streamMessageEvents(on: .streamCompletionQueue(forRequestID: request.id), handler: onEvent)
  718. }
  719. }
  720. public func streamingMessages(
  721. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  722. bufferingPolicy: StreamOf<URLSessionWebSocketTask.Message>.BufferingPolicy = .unbounded
  723. ) -> StreamOf<URLSessionWebSocketTask.Message> {
  724. createStream(automaticallyCancelling: shouldAutomaticallyCancel,
  725. bufferingPolicy: bufferingPolicy,
  726. transform: { $0.message }) { onEvent in
  727. request.streamMessageEvents(on: .streamCompletionQueue(forRequestID: request.id), handler: onEvent)
  728. }
  729. }
  730. public func streamingDecodableEvents<Value: Decodable>(
  731. _ type: Value.Type = Value.self,
  732. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  733. using decoder: any DataDecoder = JSONDecoder(),
  734. bufferingPolicy: EventStreamOf<Value, any Error>.BufferingPolicy = .unbounded
  735. ) -> EventStreamOf<Value, any Error> {
  736. createStream(automaticallyCancelling: shouldAutomaticallyCancel,
  737. bufferingPolicy: bufferingPolicy,
  738. transform: { $0 }) { onEvent in
  739. request.streamDecodableEvents(Value.self,
  740. on: .streamCompletionQueue(forRequestID: request.id),
  741. using: decoder,
  742. handler: onEvent)
  743. }
  744. }
  745. public func streamingDecodable<Value: Decodable>(
  746. _ type: Value.Type = Value.self,
  747. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  748. using decoder: any DataDecoder = JSONDecoder(),
  749. bufferingPolicy: StreamOf<Value>.BufferingPolicy = .unbounded
  750. ) -> StreamOf<Value> {
  751. createStream(automaticallyCancelling: shouldAutomaticallyCancel,
  752. bufferingPolicy: bufferingPolicy,
  753. transform: { $0.message }) { onEvent in
  754. request.streamDecodableEvents(Value.self,
  755. on: .streamCompletionQueue(forRequestID: request.id),
  756. using: decoder,
  757. handler: onEvent)
  758. }
  759. }
  760. private func createStream<Success, Value, Failure: Error>(
  761. automaticallyCancelling shouldAutomaticallyCancel: Bool,
  762. bufferingPolicy: StreamOf<Value>.BufferingPolicy,
  763. transform: @escaping (WebSocketRequest.Event<Success, Failure>) -> Value?,
  764. forResponse onResponse: @escaping (@escaping (WebSocketRequest.Event<Success, Failure>) -> Void) -> Void
  765. ) -> StreamOf<Value> {
  766. StreamOf(bufferingPolicy: bufferingPolicy) {
  767. guard shouldAutomaticallyCancel,
  768. request.isInitialized || request.isResumed || request.isSuspended else { return }
  769. cancel()
  770. } builder: { continuation in
  771. onResponse { event in
  772. if let value = transform(event) {
  773. continuation.yield(value)
  774. }
  775. if case .completed = event.kind {
  776. continuation.finish()
  777. }
  778. }
  779. }
  780. }
  781. /// Send a `URLSessionWebSocketTask.Message`.
  782. ///
  783. /// - Parameter message: The `Message`.
  784. ///
  785. public func send(_ message: URLSessionWebSocketTask.Message) async throws {
  786. try await withCheckedThrowingContinuation { continuation in
  787. request.send(message, queue: .streamCompletionQueue(forRequestID: request.id)) { result in
  788. continuation.resume(with: result)
  789. }
  790. }
  791. }
  792. /// Close the underlying `WebSocketRequest`.
  793. public func close(sending closeCode: URLSessionWebSocketTask.CloseCode, reason: Data? = nil) {
  794. request.close(sending: closeCode, reason: reason)
  795. }
  796. /// Cancel the underlying `WebSocketRequest`.
  797. ///
  798. /// Cancellation will produce an `AFError.explicitlyCancelled` instance.
  799. public func cancel() {
  800. request.cancel()
  801. }
  802. /// Resume the underlying `WebSocketRequest`.
  803. public func resume() {
  804. request.resume()
  805. }
  806. /// Suspend the underlying `WebSocketRequest`.
  807. public func suspend() {
  808. request.suspend()
  809. }
  810. }
  811. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  812. extension WebSocketRequest {
  813. public func webSocketTask() -> WebSocketTask {
  814. WebSocketTask(request: self)
  815. }
  816. }
  817. #endif
  818. extension DispatchQueue {
  819. fileprivate static let singleEventQueue = DispatchQueue(label: "org.alamofire.concurrencySingleEventQueue",
  820. attributes: .concurrent)
  821. fileprivate static func streamCompletionQueue(forRequestID id: UUID) -> DispatchQueue {
  822. DispatchQueue(label: "org.alamofire.concurrencyStreamCompletionQueue-\(id)", target: .singleEventQueue)
  823. }
  824. }
  825. /// An asynchronous sequence generated from an underlying `AsyncStream`. Only produced by Alamofire.
  826. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  827. public struct StreamOf<Element>: AsyncSequence {
  828. public typealias AsyncIterator = Iterator
  829. public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
  830. fileprivate typealias Continuation = AsyncStream<Element>.Continuation
  831. private let bufferingPolicy: BufferingPolicy
  832. private let onTermination: (() -> Void)?
  833. private let builder: (Continuation) -> Void
  834. fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
  835. onTermination: (() -> Void)? = nil,
  836. builder: @escaping (Continuation) -> Void) {
  837. self.bufferingPolicy = bufferingPolicy
  838. self.onTermination = onTermination
  839. self.builder = builder
  840. }
  841. public func makeAsyncIterator() -> Iterator {
  842. var continuation: AsyncStream<Element>.Continuation?
  843. let stream = AsyncStream<Element>(bufferingPolicy: bufferingPolicy) { innerContinuation in
  844. continuation = innerContinuation
  845. builder(innerContinuation)
  846. }
  847. return Iterator(iterator: stream.makeAsyncIterator()) {
  848. continuation?.finish()
  849. onTermination?()
  850. }
  851. }
  852. public struct Iterator: AsyncIteratorProtocol {
  853. private final class Token {
  854. private let onDeinit: () -> Void
  855. init(onDeinit: @escaping () -> Void) {
  856. self.onDeinit = onDeinit
  857. }
  858. deinit {
  859. onDeinit()
  860. }
  861. }
  862. private var iterator: AsyncStream<Element>.AsyncIterator
  863. private let token: Token
  864. init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
  865. self.iterator = iterator
  866. token = Token(onDeinit: onCancellation)
  867. }
  868. public mutating func next() async -> Element? {
  869. await iterator.next()
  870. }
  871. }
  872. }
  873. #endif