Преглед изворни кода

Add Separate Response Hooks (#3762)

### Issue Link :link:
#3401

### Goals :soccer:
This PR adds `onHTTPResponse` closure hooks to `DataRequest` /
`UploadRequest` and `DataStreamRequest` to enable the cancellation of
requests before data is transferred, requests that need to check
response info for later parsing, or peculiar requests that may trigger
multiple response callbacks, like MJPEG streams.

### Implementation Details :construction:
Like the other value hooks, this API accepts a single closure. The only
unique bit here is that there's second, disfavored, version of the API
that allows the user to return `ResponseDisposition` value to cancel or
end the request without the body.

### Testing Details :mag:
Streaming tests are being updated to check these events always fire
before the other events. More tests are needed around cancellation or
ending behavior to ensure that's really possible.
Jon Shier пре 2 година
родитељ
комит
1da2d91fa6

+ 0 - 1
.swiftformat

@@ -10,7 +10,6 @@
 
 # format options
 
---closingparen same-line
 --commas inline
 --comments indent
 --decimalgrouping 3,5

+ 57 - 2
Documentation/AdvancedUsage.md

@@ -435,9 +435,64 @@ AF.request(...)
     }
 ```
 
-#### Response
+#### `HTTPURLResponse`s
 
-Each `Request` may have an `HTTPURLResponse` value available once the request is complete. This value is only available if the request wasn’t cancelled and didn’t fail to make the network request. Additionally, if the request is retried, only the _last_ response is available. Intermediate responses can be derived from the `URLSessionTask`s in the `tasks` property.
+Alamofire receives `HTTPURLResponse` values from its underlying `URLSession` as the session starts the connection to the server. These values can be received in an `onHTTPResponse` closure and used to determine whether the request should continue or be cancelled. Both `DataRequest` / `UploadRequest` and `DataStreamRequest` provide these hooks. 
+
+In the case of `DataStreamRequest`, this event can be used in conjunction with the stream handlers to parse `HTTPURLResponse`s as part of the stream handling. To guarantee proper event ordering in that case, ensure the same `DispatchQueue` is used for both APIs. By default both APIs use `.main` for the completion handler, so this should only be a concern if you customize the queue used in either case.
+
+In the case of multiple `.onHTTPResponse` calls on a request, only the last one will be called.
+
+In it's simplest form `onHTTPResponse` simply provides the `HTTPURLResponse` value.
+
+```swift
+AF.request(...)
+    .onHTTPResponse { response in
+        print(response)
+    }
+```
+
+If control over the future of the request is desired, a completion handler value is provided which returns a `Request.ResponseDisposition` value.
+
+```swift
+AF.request(...)
+    .onHTTPResponse { response, completionHandler in
+        print(response)
+        completionHandler(.allow)
+    }
+```
+
+> The `completionHandler` MUST be called otherwise the request will hang until it times out.
+
+The `completionHandler` can also be used to cancel the request. This acts much like `cancel()` being called on the request itself.
+
+```swift
+AF.request(...)
+    .onHTTPResponse { response, completionHandler in
+        print(response)
+        completionHandler(.cancel)
+    }
+```
+
+Additionally, there are forms of both versions of `onHTTPResponse` available which provide `async` closures, allowing the use of `await` in the body. These versions are available on Swift 5.7 and above.
+
+```swift
+AF.request(...)
+    .onHTTPResponse { response in
+        await someAsyncMethod()
+        return .allow
+    }
+```
+
+Finally, an async stream of `HTTPURLResponse` values can also be used.
+
+```swift
+let responses = AF.request(...).httpResponses()
+
+for await response in responses {
+    print(response)
+}
+```
 
 #### `URLSessionTaskMetrics`
 

+ 131 - 3
Source/Concurrency.swift

@@ -95,9 +95,9 @@ extension Request {
         }
     }
 
-    private func stream<T>(of type: T.Type = T.self,
-                           bufferingPolicy: StreamOf<T>.BufferingPolicy = .unbounded,
-                           yielder: @escaping (StreamOf<T>.Continuation) -> Void) -> StreamOf<T> {
+    fileprivate func stream<T>(of type: T.Type = T.self,
+                               bufferingPolicy: StreamOf<T>.BufferingPolicy = .unbounded,
+                               yielder: @escaping (StreamOf<T>.Continuation) -> Void) -> StreamOf<T> {
         StreamOf<T>(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
             yielder(continuation)
             // Must come after serializers run in order to catch retry progress.
@@ -168,6 +168,71 @@ public struct DataTask<Value> {
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 extension DataRequest {
+    /// Creates a `StreamOf<HTTPURLResponse>` for the instance's responses.
+    ///
+    /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
+    ///
+    /// - Returns:                   The `StreamOf<HTTPURLResponse>`.
+    public func httpResponses(bufferingPolicy: StreamOf<HTTPURLResponse>.BufferingPolicy = .unbounded) -> StreamOf<HTTPURLResponse> {
+        stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
+            onHTTPResponse(on: underlyingQueue) { response in
+                continuation.yield(response)
+            }
+        }
+    }
+
+    #if swift(>=5.7)
+    /// Sets an async closure returning a `Request.ResponseDisposition`, called whenever the `DataRequest` produces an
+    /// `HTTPURLResponse`.
+    ///
+    /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
+    ///         However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
+    ///         where responses after the first will contain the part headers.
+    ///
+    /// - Parameters:
+    ///   - handler: Async closure executed when a new `HTTPURLResponse` is received and returning a
+    ///              `ResponseDisposition` value. This value determines whether to continue the request or cancel it as
+    ///              if `cancel()` had been called on the instance. Note, this closure is called on an arbitrary thread,
+    ///              so any synchronous calls in it will execute in that context.
+    ///
+    /// - Returns:   The instance.
+    @_disfavoredOverload
+    @discardableResult
+    public func onHTTPResponse(
+        perform handler: @escaping @Sendable (_ response: HTTPURLResponse) async -> ResponseDisposition
+    ) -> Self {
+        onHTTPResponse(on: underlyingQueue) { response, completionHandler in
+            Task {
+                let disposition = await handler(response)
+                completionHandler(disposition)
+            }
+        }
+
+        return self
+    }
+
+    /// Sets an async closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
+    ///
+    /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
+    ///         However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
+    ///         where responses after the first will contain the part headers.
+    ///
+    /// - Parameters:
+    ///   - handler: Async closure executed when a new `HTTPURLResponse` is received. Note, this closure is called on an
+    ///              arbitrary thread, so any synchronous calls in it will execute in that context.
+    ///
+    /// - Returns:   The instance.
+    @discardableResult
+    public func onHTTPResponse(perform handler: @escaping @Sendable (_ response: HTTPURLResponse) async -> Void) -> Self {
+        onHTTPResponse { response in
+            await handler(response)
+            return .allow
+        }
+
+        return self
+    }
+    #endif
+
     /// Creates a `DataTask` to `await` a `Data` value.
     ///
     /// - Parameters:
@@ -625,6 +690,69 @@ public struct DataStreamTask {
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 extension DataStreamRequest {
+    /// Creates a `StreamOf<HTTPURLResponse>` for the instance's responses.
+    ///
+    /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
+    ///
+    /// - Returns:                   The `StreamOf<HTTPURLResponse>`.
+    public func httpResponses(bufferingPolicy: StreamOf<HTTPURLResponse>.BufferingPolicy = .unbounded) -> StreamOf<HTTPURLResponse> {
+        stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
+            onHTTPResponse(on: underlyingQueue) { response in
+                continuation.yield(response)
+            }
+        }
+    }
+
+    #if swift(>=5.7)
+    /// Sets an async closure returning a `Request.ResponseDisposition`, called whenever the `DataStreamRequest`
+    /// produces an `HTTPURLResponse`.
+    ///
+    /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
+    ///         However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
+    ///         where responses after the first will contain the part headers.
+    ///
+    /// - Parameters:
+    ///   - handler: Async closure executed when a new `HTTPURLResponse` is received and returning a
+    ///              `ResponseDisposition` value. This value determines whether to continue the request or cancel it as
+    ///              if `cancel()` had been called on the instance. Note, this closure is called on an arbitrary thread,
+    ///              so any synchronous calls in it will execute in that context.
+    ///
+    /// - Returns:   The instance.
+    @_disfavoredOverload
+    @discardableResult
+    public func onHTTPResponse(perform handler: @escaping @Sendable (HTTPURLResponse) async -> ResponseDisposition) -> Self {
+        onHTTPResponse(on: underlyingQueue) { response, completionHandler in
+            Task {
+                let disposition = await handler(response)
+                completionHandler(disposition)
+            }
+        }
+
+        return self
+    }
+
+    /// Sets an async closure called whenever the `DataStreamRequest` produces an `HTTPURLResponse`.
+    ///
+    /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
+    ///         However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
+    ///         where responses after the first will contain the part headers.
+    ///
+    /// - Parameters:
+    ///   - handler: Async closure executed when a new `HTTPURLResponse` is received. Note, this closure is called on an
+    ///              arbitrary thread, so any synchronous calls in it will execute in that context.
+    ///
+    /// - Returns:   The instance.
+    @discardableResult
+    public func onHTTPResponse(perform handler: @escaping @Sendable (HTTPURLResponse) async -> Void) -> Self {
+        onHTTPResponse { response in
+            await handler(response)
+            return .allow
+        }
+
+        return self
+    }
+    #endif
+
     /// Creates a `DataStreamTask` used to `await` streams of serialized values.
     ///
     /// - Returns: The `DataStreamTask`.

+ 15 - 0
Source/EventMonitor.swift

@@ -69,6 +69,9 @@ public protocol EventMonitor {
 
     // MARK: URLSessionDataDelegate Events
 
+    /// Event called during `URLSessionDataDelegate`'s `urlSession(_:dataTask:didReceive:completionHandler:)` method.
+    func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse)
+
     /// Event called during `URLSessionDataDelegate`'s `urlSession(_:dataTask:didReceive:)` method.
     func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data)
 
@@ -244,6 +247,7 @@ extension EventMonitor {
                            didFinishCollecting metrics: URLSessionTaskMetrics) {}
     public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {}
     public func urlSession(_ session: URLSession, taskIsWaitingForConnectivity task: URLSessionTask) {}
+    public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse) {}
     public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {}
     public func urlSession(_ session: URLSession,
                            dataTask: URLSessionDataTask,
@@ -380,6 +384,10 @@ public final class CompositeEventMonitor: EventMonitor {
         performEvent { $0.urlSession(session, taskIsWaitingForConnectivity: task) }
     }
 
+    public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse) {
+        performEvent { $0.urlSession(session, dataTask: dataTask, didReceive: response) }
+    }
+
     public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
         performEvent { $0.urlSession(session, dataTask: dataTask, didReceive: data) }
     }
@@ -593,6 +601,9 @@ open class ClosureEventMonitor: EventMonitor {
     /// Closure called on the `urlSession(_:taskIsWaitingForConnectivity:)` event.
     open var taskIsWaitingForConnectivity: ((URLSession, URLSessionTask) -> Void)?
 
+    /// Closure called on the `urlSession(_:dataTask:didReceive:completionHandler:)` event.
+    open var dataTaskDidReceiveResponse: ((URLSession, URLSessionDataTask, URLResponse) -> Void)?
+
     /// Closure that receives the `urlSession(_:dataTask:didReceive:)` event.
     open var dataTaskDidReceiveData: ((URLSession, URLSessionDataTask, Data) -> Void)?
 
@@ -741,6 +752,10 @@ open class ClosureEventMonitor: EventMonitor {
         taskIsWaitingForConnectivity?(session, task)
     }
 
+    open func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse) {
+        dataTaskDidReceiveResponse?(session, dataTask, response)
+    }
+
     open func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
         dataTaskDidReceiveData?(session, dataTask, data)
     }

+ 2 - 1
Source/NetworkReachabilityManager.swift

@@ -194,7 +194,8 @@ open class NetworkReachabilityManager {
                 let description = weakManager.manager?.flags?.readableDescription ?? "nil"
 
                 return Unmanaged.passRetained(description as CFString)
-            })
+            }
+        )
         let callback: SCNetworkReachabilityCallBack = { _, flags, info in
             guard let info = info else { return }
 

+ 172 - 9
Source/Request.swift

@@ -416,7 +416,9 @@ public class Request {
     func didCancel() {
         dispatchPrecondition(condition: .onQueue(underlyingQueue))
 
-        error = error ?? AFError.explicitlyCancelled
+        $mutableState.write { mutableState in
+            mutableState.error = mutableState.error ?? AFError.explicitlyCancelled
+        }
 
         eventMonitor?.requestDidCancel(self)
     }
@@ -939,6 +941,23 @@ public class Request {
     }
 }
 
+extension Request {
+    /// Type indicating how a `DataRequest` or `DataStreamRequest` should proceed after receiving an `HTTPURLResponse`.
+    public enum ResponseDisposition {
+        /// Allow the request to continue normally.
+        case allow
+        /// Cancel the request, similar to calling `cancel()`.
+        case cancel
+
+        var sessionDisposition: URLSession.ResponseDisposition {
+            switch self {
+            case .allow: return .allow
+            case .cancel: return .cancel
+            }
+        }
+    }
+}
+
 // MARK: - Protocol Conformances
 
 extension Request: Equatable {
@@ -1080,11 +1099,17 @@ public class DataRequest: Request {
     /// `URLRequestConvertible` value used to create `URLRequest`s for this instance.
     public let convertible: URLRequestConvertible
     /// `Data` read from the server so far.
-    public var data: Data? { mutableData }
+    public var data: Data? { $dataMutableState.data }
+
+    private struct DataMutableState {
+        var data: Data?
+        var httpResponseHandler: (queue: DispatchQueue,
+                                  handler: (_ response: HTTPURLResponse,
+                                            _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void)?
+    }
 
-    /// Protected storage for the `Data` read by the instance.
     @Protected
-    private var mutableData: Data? = nil
+    private var dataMutableState = DataMutableState()
 
     /// Creates a `DataRequest` using the provided parameters.
     ///
@@ -1117,7 +1142,9 @@ public class DataRequest: Request {
     override func reset() {
         super.reset()
 
-        mutableData = nil
+        $dataMutableState.write { mutableState in
+            mutableState.data = nil
+        }
     }
 
     /// Called when `Data` is received by this instance.
@@ -1126,15 +1153,41 @@ public class DataRequest: Request {
     ///
     /// - Parameter data: The `Data` received.
     func didReceive(data: Data) {
-        if self.data == nil {
-            mutableData = data
-        } else {
-            $mutableData.write { $0?.append(data) }
+        $dataMutableState.write { mutableState in
+            if mutableState.data == nil {
+                mutableState.data = data
+            } else {
+                mutableState.data?.append(data)
+            }
         }
 
         updateDownloadProgress()
     }
 
+    func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) {
+        $dataMutableState.read { dataMutableState in
+            guard let httpResponseHandler = dataMutableState.httpResponseHandler else {
+                underlyingQueue.async { completionHandler(.allow) }
+                return
+            }
+
+            httpResponseHandler.queue.async {
+                httpResponseHandler.handler(response) { disposition in
+                    if disposition == .cancel {
+                        self.$mutableState.write { mutableState in
+                            mutableState.state = .cancelled
+                            mutableState.error = mutableState.error ?? AFError.explicitlyCancelled
+                        }
+                    }
+
+                    self.underlyingQueue.async {
+                        completionHandler(disposition.sessionDisposition)
+                    }
+                }
+            }
+        }
+    }
+
     override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
         let copiedRequest = request
         return session.dataTask(with: copiedRequest)
@@ -1178,6 +1231,47 @@ public class DataRequest: Request {
 
         return self
     }
+
+    /// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse` and providing a completion
+    /// handler to return a `ResponseDisposition` value.
+    ///
+    /// - Parameters:
+    ///   - queue:   `DispatchQueue` on which the closure will be called. `.main` by default.
+    ///   - handler: Closure called when the instance produces an `HTTPURLResponse`. The `completionHandler` provided
+    ///              MUST be called, otherwise the request will never complete.
+    ///
+    /// - Returns:   The instance.
+    @_disfavoredOverload
+    @discardableResult
+    public func onHTTPResponse(
+        on queue: DispatchQueue = .main,
+        perform handler: @escaping (_ response: HTTPURLResponse,
+                                    _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void
+    ) -> Self {
+        $dataMutableState.write { mutableState in
+            mutableState.httpResponseHandler = (queue, handler)
+        }
+
+        return self
+    }
+
+    /// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
+    ///
+    /// - Parameters:
+    ///   - queue:   `DispatchQueue` on which the closure will be called. `.main` by default.
+    ///   - handler: Closure called when the instance produces an `HTTPURLResponse`.
+    ///
+    /// - Returns:   The instance.
+    @discardableResult
+    public func onHTTPResponse(on queue: DispatchQueue = .main,
+                               perform handler: @escaping (HTTPURLResponse) -> Void) -> Self {
+        onHTTPResponse(on: queue) { response, completionHandler in
+            handler(response)
+            completionHandler(.allow)
+        }
+
+        return self
+    }
 }
 
 // MARK: - DataStreamRequest
@@ -1254,6 +1348,10 @@ public final class DataStreamRequest: Request {
         var numberOfExecutingStreams = 0
         /// Completion calls enqueued while streams are still executing.
         var enqueuedCompletionEvents: [() -> Void] = []
+        /// Handler for any `HTTPURLResponse`s received.
+        var httpResponseHandler: (queue: DispatchQueue,
+                                  handler: (_ response: HTTPURLResponse,
+                                            _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void)?
     }
 
     @Protected
@@ -1324,6 +1422,30 @@ public final class DataStreamRequest: Request {
         }
     }
 
+    func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) {
+        $streamMutableState.read { dataMutableState in
+            guard let httpResponseHandler = dataMutableState.httpResponseHandler else {
+                underlyingQueue.async { completionHandler(.allow) }
+                return
+            }
+
+            httpResponseHandler.queue.async {
+                httpResponseHandler.handler(response) { disposition in
+                    if disposition == .cancel {
+                        self.$mutableState.write { mutableState in
+                            mutableState.state = .cancelled
+                            mutableState.error = mutableState.error ?? AFError.explicitlyCancelled
+                        }
+                    }
+
+                    self.underlyingQueue.async {
+                        completionHandler(disposition.sessionDisposition)
+                    }
+                }
+            }
+        }
+    }
+
     /// Validates the `URLRequest` and `HTTPURLResponse` received for the instance using the provided `Validation` closure.
     ///
     /// - Parameter validation: `Validation` closure used to validate the request and response.
@@ -1376,6 +1498,47 @@ public final class DataStreamRequest: Request {
     }
     #endif
 
+    /// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse` and providing a completion
+    /// handler to return a `ResponseDisposition` value.
+    ///
+    /// - Parameters:
+    ///   - queue:   `DispatchQueue` on which the closure will be called. `.main` by default.
+    ///   - handler: Closure called when the instance produces an `HTTPURLResponse`. The `completionHandler` provided
+    ///              MUST be called, otherwise the request will never complete.
+    ///
+    /// - Returns:   The instance.
+    @_disfavoredOverload
+    @discardableResult
+    public func onHTTPResponse(
+        on queue: DispatchQueue = .main,
+        perform handler: @escaping (_ response: HTTPURLResponse,
+                                    _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void
+    ) -> Self {
+        $streamMutableState.write { mutableState in
+            mutableState.httpResponseHandler = (queue, handler)
+        }
+
+        return self
+    }
+
+    /// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
+    ///
+    /// - Parameters:
+    ///   - queue:   `DispatchQueue` on which the closure will be called. `.main` by default.
+    ///   - handler: Closure called when the instance produces an `HTTPURLResponse`.
+    ///
+    /// - Returns:   The instance.
+    @discardableResult
+    public func onHTTPResponse(on queue: DispatchQueue = .main,
+                               perform handler: @escaping (HTTPURLResponse) -> Void) -> Self {
+        onHTTPResponse(on: queue) { response, completionHandler in
+            handler(response)
+            completionHandler(.allow)
+        }
+
+        return self
+    }
+
     func capturingError(from closure: () throws -> Void) {
         do {
             try closure()

+ 20 - 1
Source/SessionDelegate.swift

@@ -230,6 +230,25 @@ extension SessionDelegate: URLSessionTaskDelegate {
 // MARK: URLSessionDataDelegate
 
 extension SessionDelegate: URLSessionDataDelegate {
+    open func urlSession(_ session: URLSession,
+                         dataTask: URLSessionDataTask,
+                         didReceive response: URLResponse,
+                         completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) {
+        eventMonitor?.urlSession(session, dataTask: dataTask, didReceive: response)
+
+        guard let response = response as? HTTPURLResponse else { completionHandler(.allow); return }
+
+        if let request = request(for: dataTask, as: DataRequest.self) {
+            request.didReceiveResponse(response, completionHandler: completionHandler)
+        } else if let request = request(for: dataTask, as: DataStreamRequest.self) {
+            request.didReceiveResponse(response, completionHandler: completionHandler)
+        } else {
+            assertionFailure("dataTask did not find DataRequest or DataStreamRequest in didReceive response")
+            completionHandler(.allow)
+            return
+        }
+    }
+
     open func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
         eventMonitor?.urlSession(session, dataTask: dataTask, didReceive: data)
 
@@ -238,7 +257,7 @@ extension SessionDelegate: URLSessionDataDelegate {
         } else if let request = request(for: dataTask, as: DataStreamRequest.self) {
             request.didReceive(data: data)
         } else {
-            assertionFailure("dataTask did not find DataRequest or DataStreamRequest in didReceive")
+            assertionFailure("dataTask did not find DataRequest or DataStreamRequest in didReceive data")
             return
         }
     }

+ 95 - 3
Tests/ConcurrencyTests.swift

@@ -433,6 +433,95 @@ final class DataStreamConcurrencyTests: BaseTestCase {
         XCTAssertEqual(datas.count, 2)
     }
 
+    #if swift(>=5.8) && canImport(Darwin)
+    func testThatDataStreamHasAsyncOnHTTPResponse() async {
+        // Given
+        let session = stored(Session())
+        let functionCalled = expectation(description: "doNothing called")
+        @Sendable @MainActor func fulfill() async {
+            functionCalled.fulfill()
+        }
+
+        // When
+        let task = session.streamRequest(.payloads(2))
+            .onHTTPResponse { _ in
+                await fulfill()
+            }
+            .streamTask()
+        var datas: [Data] = []
+
+        for await data in task.streamingData().compactMap(\.value) {
+            datas.append(data)
+        }
+
+        await fulfillment(of: [functionCalled], timeout: timeout)
+
+        // Then
+        XCTAssertEqual(datas.count, 2)
+    }
+
+    func testThatDataOnHTTPResponseCanAllow() async {
+        // Given
+        let session = stored(Session())
+        let functionCalled = expectation(description: "doNothing called")
+        @Sendable @MainActor func fulfill() async {
+            functionCalled.fulfill()
+        }
+
+        // When
+        let task = session.streamRequest(.payloads(2))
+            .onHTTPResponse { _ in
+                await fulfill()
+                return .allow
+            }
+            .streamTask()
+        var datas: [Data] = []
+
+        for await data in task.streamingData().compactMap(\.value) {
+            datas.append(data)
+        }
+
+        await fulfillment(of: [functionCalled], timeout: timeout)
+
+        // Then
+        XCTAssertEqual(datas.count, 2)
+    }
+
+    func testThatDataOnHTTPResponseCanCancel() async {
+        // Given
+        let session = stored(Session())
+        var receivedCompletion: DataStreamRequest.Completion?
+        let functionCalled = expectation(description: "doNothing called")
+        @Sendable @MainActor func fulfill() async {
+            functionCalled.fulfill()
+        }
+
+        // When
+        let request = session.streamRequest(.payloads(2))
+            .onHTTPResponse { _ in
+                await fulfill()
+                return .cancel
+            }
+        let task = request.streamTask()
+
+        for await stream in task.streamingResponses(serializedUsing: .passthrough) {
+            switch stream.event {
+            case .stream:
+                XCTFail("cancelled stream should receive no data")
+            case let .complete(completion):
+                receivedCompletion = completion
+            }
+        }
+
+        await fulfillment(of: [functionCalled], timeout: timeout)
+
+        // Then
+        XCTAssertEqual(receivedCompletion?.response?.statusCode, 200)
+        XCTAssertTrue(request.isCancelled, "onHTTPResponse cancelled request isCancelled should be true")
+        XCTAssertTrue(request.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled request error should be explicitly cancelled")
+    }
+    #endif
+
     func testThatDataStreamTaskCanStreamStrings() async {
         // Given
         let session = stored(Session())
@@ -665,6 +754,7 @@ final class ClosureAPIConcurrencyTests: BaseTestCase {
 
         // When
         let request = session.request(.get)
+        async let httpResponses = request.httpResponses().collect()
         async let uploadProgress = request.uploadProgress().collect()
         async let downloadProgress = request.downloadProgress().collect()
         async let requests = request.urlRequests().collect()
@@ -672,19 +762,21 @@ final class ClosureAPIConcurrencyTests: BaseTestCase {
         async let descriptions = request.cURLDescriptions().collect()
         async let response = request.serializingDecodable(TestResponse.self).response
 
-        let values: (uploadProgresses: [Progress],
+        let values: (httpResponses: [HTTPURLResponse],
+                     uploadProgresses: [Progress],
                      downloadProgresses: [Progress],
                      requests: [URLRequest],
                      tasks: [URLSessionTask],
                      descriptions: [String],
                      response: AFDataResponse<TestResponse>)
         #if swift(>=5.10)
-        values = try! await (uploadProgress, downloadProgress, requests, tasks, descriptions, response)
+        values = try! await (httpResponses, uploadProgress, downloadProgress, requests, tasks, descriptions, response)
         #else
-        values = await (uploadProgress, downloadProgress, requests, tasks, descriptions, response)
+        values = await (httpResponses, uploadProgress, downloadProgress, requests, tasks, descriptions, response)
         #endif
 
         // Then
+        XCTAssertTrue(values.httpResponses.count == 1, "httpResponses should have one response")
         XCTAssertTrue(values.uploadProgresses.isEmpty, "uploadProgresses should be empty")
         XCTAssertNotNil(values.downloadProgresses.last, "downloadProgresses should not be empty")
         XCTAssertTrue(values.downloadProgresses.last?.isFinished == true, "last download progression should be finished")

+ 188 - 60
Tests/DataStreamTests.swift

@@ -28,82 +28,95 @@ import XCTest
 final class DataStreamTests: BaseTestCase {
     func testThatDataCanBeStreamedOnMainQueue() {
         // Given
-        let expectedSize = 10
+        let expectedSize = 5
         var accumulatedData = Data()
+        var initialResponse: HTTPURLResponse?
         var response: HTTPURLResponse?
         var streamOnMain = false
         var completeOnMain = false
+        let didReceiveResponse = expectation(description: "stream should receive response once")
         let didReceive = expectation(description: "stream should receive once")
         let didComplete = expectation(description: "stream should complete")
 
         // When
-        AF.streamRequest(.bytes(expectedSize)).responseStream { stream in
-            switch stream.event {
-            case let .stream(result):
-                streamOnMain = Thread.isMainThread
-                switch result {
-                case let .success(data):
-                    accumulatedData.append(data)
+        AF.streamRequest(.bytes(expectedSize))
+            .onHTTPResponse { response in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+            }
+            .responseStream { stream in
+                switch stream.event {
+                case let .stream(result):
+                    streamOnMain = Thread.isMainThread
+                    switch result {
+                    case let .success(data):
+                        accumulatedData.append(data)
+                    }
+                    didReceive.fulfill()
+                case let .complete(completion):
+                    completeOnMain = Thread.isMainThread
+                    response = completion.response
+                    didComplete.fulfill()
                 }
-                didReceive.fulfill()
-            case let .complete(completion):
-                completeOnMain = Thread.isMainThread
-                response = completion.response
-                didComplete.fulfill()
             }
-        }
 
-        wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
+        wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
 
         // Then
         XCTAssertEqual(response?.statusCode, 200)
+        XCTAssertEqual(initialResponse, response)
         XCTAssertEqual(accumulatedData.count, expectedSize)
         XCTAssertTrue(streamOnMain)
         XCTAssertTrue(completeOnMain)
     }
 
-    func testThatDataCanBeStreamedByByte() {
+    func testThatDataCanBeStreamedByByte() throws {
+        guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
+            throw XCTSkip("Older OSes don't return individual bytes.")
+        }
+
         // Given
-        let expectedSize = 10
+        let expectedSize = 5
         var accumulatedData = Data()
+        var initialResponse: HTTPURLResponse?
         var response: HTTPURLResponse?
         var streamOnMain = false
         var completeOnMain = false
         var streamCalled = 0
+        let didReceiveResponse = expectation(description: "stream should receive response once")
         let didReceive = expectation(description: "stream should receive once")
-        if #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) {
-            didReceive.expectedFulfillmentCount = expectedSize
-        }
+        didReceive.expectedFulfillmentCount = expectedSize
         let didComplete = expectation(description: "stream should complete")
 
         // When
-        AF.streamRequest(.chunked(expectedSize)).responseStream { stream in
-            switch stream.event {
-            case let .stream(result):
-                streamOnMain = Thread.isMainThread
-                switch result {
-                case let .success(data):
-                    accumulatedData.append(data)
+        AF.streamRequest(.chunked(expectedSize))
+            .onHTTPResponse { response in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+            }
+            .responseStream { stream in
+                switch stream.event {
+                case let .stream(result):
+                    streamOnMain = Thread.isMainThread
+                    switch result {
+                    case let .success(data):
+                        accumulatedData.append(data)
+                    }
+                    streamCalled += 1
+                    didReceive.fulfill()
+                case let .complete(completion):
+                    completeOnMain = Thread.isMainThread
+                    response = completion.response
+                    didComplete.fulfill()
                 }
-                streamCalled += 1
-                didReceive.fulfill()
-            case let .complete(completion):
-                completeOnMain = Thread.isMainThread
-                response = completion.response
-                didComplete.fulfill()
             }
-        }
 
-        wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
+        wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
 
         // Then
         XCTAssertEqual(response?.statusCode, 200)
-        // Older OSes don't return individual bytes.
-        if #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) {
-            XCTAssertEqual(streamCalled, expectedSize)
-        } else {
-            XCTAssertEqual(streamCalled, 1)
-        }
+        XCTAssertEqual(streamCalled, expectedSize)
+        XCTAssertEqual(initialResponse, response)
         XCTAssertEqual(accumulatedData.count, expectedSize)
         XCTAssertTrue(streamOnMain)
         XCTAssertTrue(completeOnMain)
@@ -115,18 +128,24 @@ final class DataStreamTests: BaseTestCase {
         }
 
         // Given
-        let expectedSize = 10
+        let expectedSize = 5
         var responses: [TestResponse] = []
+        var initialResponse: HTTPURLResponse?
         var response: HTTPURLResponse?
         var streamOnMain = false
         var completeOnMain = false
         var streamCalled = 0
+        let didReceiveResponse = expectation(description: "stream should receive response once")
         let didReceive = expectation(description: "stream should receive once")
         didReceive.expectedFulfillmentCount = expectedSize
         let didComplete = expectation(description: "stream should complete")
 
         // When
         AF.streamRequest(.payloads(expectedSize))
+            .onHTTPResponse { response in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+            }
             .responseStreamDecodable(of: TestResponse.self) { stream in
                 switch stream.event {
                 case let .stream(result):
@@ -146,12 +165,13 @@ final class DataStreamTests: BaseTestCase {
                 }
             }
 
-        wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
+        wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
 
         // Then
         XCTAssertEqual(response?.statusCode, 200)
         XCTAssertEqual(streamCalled, expectedSize)
         XCTAssertEqual(responses.count, expectedSize)
+        XCTAssertEqual(initialResponse, response)
         XCTAssertTrue(streamOnMain)
         XCTAssertTrue(completeOnMain)
     }
@@ -160,34 +180,42 @@ final class DataStreamTests: BaseTestCase {
         // Given
         let expectedSize = 1
         var accumulatedData = Data()
+        var initialResponse: HTTPURLResponse?
         var response: HTTPURLResponse?
         var streamOnMain = false
         var completeOnMain = false
+        let didReceiveResponse = expectation(description: "stream should receive response once")
         let didReceive = expectation(description: "stream should receive")
         let didComplete = expectation(description: "stream should complete")
 
         // When
-        AF.streamRequest(.bytes(expectedSize)).responseStream { stream in
-            switch stream.event {
-            case let .stream(result):
-                streamOnMain = Thread.isMainThread
-                switch result {
-                case let .success(data):
-                    accumulatedData.append(data)
+        AF.streamRequest(.bytes(expectedSize))
+            .onHTTPResponse { response in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+            }
+            .responseStream { stream in
+                switch stream.event {
+                case let .stream(result):
+                    streamOnMain = Thread.isMainThread
+                    switch result {
+                    case let .success(data):
+                        accumulatedData.append(data)
+                    }
+                    didReceive.fulfill()
+                case let .complete(completion):
+                    completeOnMain = Thread.isMainThread
+                    response = completion.response
+                    didComplete.fulfill()
                 }
-                didReceive.fulfill()
-            case let .complete(completion):
-                completeOnMain = Thread.isMainThread
-                response = completion.response
-                didComplete.fulfill()
             }
-        }
 
-        wait(for: [didReceive, didComplete], timeout: timeout, enforceOrder: true)
+        wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
 
         // Then
         XCTAssertEqual(response?.statusCode, 200)
         XCTAssertEqual(accumulatedData.count, expectedSize)
+        XCTAssertEqual(initialResponse, response)
         XCTAssertTrue(streamOnMain)
         XCTAssertTrue(completeOnMain)
     }
@@ -195,6 +223,8 @@ final class DataStreamTests: BaseTestCase {
     func testThatDataCanBeStreamedManyTimes() {
         // Given
         let expectedSize = 1
+        var initialResponse: HTTPURLResponse?
+        let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
         var firstAccumulatedData = Data()
         var firstResponse: HTTPURLResponse?
         var firstStreamOnMain = false
@@ -210,6 +240,10 @@ final class DataStreamTests: BaseTestCase {
 
         // When
         AF.streamRequest(.bytes(expectedSize))
+            .onHTTPResponse { response in
+                initialResponse = response
+                onHTTPResponse.fulfill()
+            }
             .responseStream { stream in
                 switch stream.event {
                 case let .stream(result):
@@ -241,10 +275,12 @@ final class DataStreamTests: BaseTestCase {
                 }
             }
 
-        wait(for: [firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
+        wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
         wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
 
         // Then
+        XCTAssertEqual(initialResponse, firstResponse)
+        XCTAssertEqual(initialResponse, secondResponse)
         XCTAssertTrue(firstStreamOnMain)
         XCTAssertTrue(firstCompleteOnMain)
         XCTAssertEqual(firstResponse?.statusCode, 200)
@@ -257,6 +293,8 @@ final class DataStreamTests: BaseTestCase {
 
     func testThatDataCanBeStreamedAndDecodedAtTheSameTime() {
         // Given
+        var initialResponse: HTTPURLResponse?
+        let onHTTPResponse = expectation(description: "onHTTPResponse should be called")
         var firstAccumulatedData = Data()
         var firstResponse: HTTPURLResponse?
         var firstStreamOnMain = false
@@ -273,6 +311,10 @@ final class DataStreamTests: BaseTestCase {
 
         // When
         AF.streamRequest(.stream(1))
+            .onHTTPResponse { response in
+                initialResponse = response
+                onHTTPResponse.fulfill()
+            }
             .responseStream { stream in
                 switch stream.event {
                 case let .stream(result):
@@ -306,10 +348,12 @@ final class DataStreamTests: BaseTestCase {
                 }
             }
 
-        wait(for: [firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
+        wait(for: [onHTTPResponse, firstReceive, firstCompletion], timeout: timeout, enforceOrder: true)
         wait(for: [secondReceive, secondCompletion], timeout: timeout, enforceOrder: true)
 
         // Then
+        XCTAssertEqual(initialResponse, firstResponse)
+        XCTAssertEqual(initialResponse, secondResponse)
         XCTAssertTrue(firstStreamOnMain)
         XCTAssertTrue(firstCompleteOnMain)
         XCTAssertEqual(firstResponse?.statusCode, 200)
@@ -334,7 +378,8 @@ final class DataStreamTests: BaseTestCase {
                     expect.fulfill()
                 default: break
                 }
-            }.asInputStream()
+            }
+            .asInputStream()
 
         waitForExpectations(timeout: timeout)
 
@@ -491,6 +536,89 @@ final class DataStreamTests: BaseTestCase {
                       response is: \(completion?.response?.description ?? "none").
                       """)
     }
+
+    func testThatOnHTTPResponseCanContinueStream() {
+        // Given
+        let expectedSize = 5
+        var accumulatedData = Data()
+        var initialResponse: HTTPURLResponse?
+        var response: HTTPURLResponse?
+        var streamOnMain = false
+        var completeOnMain = false
+        let didReceiveResponse = expectation(description: "stream should receive response once")
+        let didReceive = expectation(description: "stream should receive once")
+        let didComplete = expectation(description: "stream should complete")
+
+        // When
+        AF.streamRequest(.bytes(expectedSize))
+            .onHTTPResponse { response, completionHandler in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+                completionHandler(.allow)
+            }
+            .responseStream { stream in
+                switch stream.event {
+                case let .stream(result):
+                    streamOnMain = Thread.isMainThread
+                    switch result {
+                    case let .success(data):
+                        accumulatedData.append(data)
+                    }
+                    didReceive.fulfill()
+                case let .complete(completion):
+                    completeOnMain = Thread.isMainThread
+                    response = completion.response
+                    didComplete.fulfill()
+                }
+            }
+
+        wait(for: [didReceiveResponse, didReceive, didComplete], timeout: timeout, enforceOrder: true)
+
+        // Then
+        XCTAssertEqual(response?.statusCode, 200)
+        XCTAssertEqual(initialResponse, response)
+        XCTAssertEqual(accumulatedData.count, expectedSize)
+        XCTAssertTrue(streamOnMain)
+        XCTAssertTrue(completeOnMain)
+    }
+
+    func testThatOnHTTPResponseCanCancelStream() {
+        // Given
+        let expectedSize = 5
+        var initialResponse: HTTPURLResponse?
+        var response: HTTPURLResponse?
+        var completion: DataStreamRequest.Completion?
+        var didCompleteOnMain = false
+        let didReceiveResponse = expectation(description: "stream should receive response once")
+        let didComplete = expectation(description: "stream should complete")
+
+        // When
+        AF.streamRequest(.bytes(expectedSize))
+            .onHTTPResponse { response, completionHandler in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+                completionHandler(.cancel)
+            }
+            .responseStream { stream in
+                switch stream.event {
+                case .stream:
+                    XCTFail("should never receive stream in a cancelled request")
+                case let .complete(comp):
+                    didCompleteOnMain = Thread.isMainThread
+                    completion = comp
+                    response = comp.response
+                    didComplete.fulfill()
+                }
+            }
+
+        wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
+
+        // Then
+        XCTAssertEqual(response?.statusCode, 200)
+        XCTAssertEqual(initialResponse, response)
+        XCTAssertTrue(didCompleteOnMain)
+        XCTAssertTrue(completion?.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled stream should be explicitly cancelled")
+    }
 }
 
 // MARK: - Serialization Tests

+ 90 - 0
Tests/RequestTests.swift

@@ -49,6 +49,96 @@ final class RequestResponseTestCase: BaseTestCase {
         XCTAssertNil(response?.error)
     }
 
+    func testThatDataRequestReceivesInitialResponse() {
+        // Given
+        let url = Endpoint.get.url
+        var initialResponse: HTTPURLResponse?
+        let didReceiveResponse = expectation(description: "didReceiveResponse")
+        let didComplete = expectation(description: "GET request should succeed: \(url)")
+        var response: DataResponse<Data?, AFError>?
+
+        // When
+        AF.request(url, parameters: ["foo": "bar"])
+            .onHTTPResponse { response in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+            }
+            .response { resp in
+                response = resp
+                didComplete.fulfill()
+            }
+
+        wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
+
+        // Then
+        XCTAssertEqual(initialResponse, response?.response)
+        XCTAssertNotNil(response?.request)
+        XCTAssertNotNil(response?.response)
+        XCTAssertNotNil(response?.data)
+        XCTAssertNil(response?.error)
+    }
+
+    func testThatDataRequestOnHTTPResponseCanAllow() {
+        // Given
+        let url = Endpoint.get.url
+        var initialResponse: HTTPURLResponse?
+        let didReceiveResponse = expectation(description: "didReceiveResponse")
+        let didComplete = expectation(description: "GET request should succeed: \(url)")
+        var response: DataResponse<Data?, AFError>?
+
+        // When
+        AF.request(url, parameters: ["foo": "bar"])
+            .onHTTPResponse { response, completionHandler in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+                completionHandler(.allow)
+            }
+            .response { resp in
+                response = resp
+                didComplete.fulfill()
+            }
+
+        wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
+
+        // Then
+        XCTAssertEqual(initialResponse, response?.response)
+        XCTAssertNotNil(response?.request)
+        XCTAssertNotNil(response?.response)
+        XCTAssertNotNil(response?.data)
+        XCTAssertNil(response?.error)
+    }
+
+    func testThatDataRequestOnHTTPResponseCanCancel() {
+        // Given
+        let url = Endpoint.get.url
+        var initialResponse: HTTPURLResponse?
+        let didReceiveResponse = expectation(description: "didReceiveResponse")
+        let didComplete = expectation(description: "GET request should succeed: \(url)")
+        var response: DataResponse<Data?, AFError>?
+
+        // When
+        let request = AF.request(url, parameters: ["foo": "bar"])
+            .onHTTPResponse { response, completionHandler in
+                initialResponse = response
+                didReceiveResponse.fulfill()
+                completionHandler(.cancel)
+            }
+            .response { resp in
+                response = resp
+                didComplete.fulfill()
+            }
+
+        wait(for: [didReceiveResponse, didComplete], timeout: timeout, enforceOrder: true)
+
+        // Then
+        XCTAssertEqual(initialResponse, response?.response)
+        XCTAssertNotNil(response?.request)
+        XCTAssertNotNil(response?.response)
+        XCTAssertNil(response?.data)
+        XCTAssertTrue(request.isCancelled, "onHTTPResponse cancelled request should have isCancelled == true")
+        XCTAssertTrue(response?.error?.isExplicitlyCancelledError == true, "onHTTPResponse cancelled request should be explicitly cancelled")
+    }
+
     func testRequestResponseWithProgress() {
         // Given
         let byteCount = 50 * 1024