Browse Source

Add more concurrency support, cleanup.

Jon Shier 2 years ago
parent
commit
7fcbc23e74
3 changed files with 107 additions and 20 deletions
  1. 70 15
      Source/Concurrency.swift
  2. 4 5
      Source/Request.swift
  3. 33 0
      Tests/ConcurrencyTests.swift

+ 70 - 15
Source/Concurrency.swift

@@ -772,23 +772,68 @@ public struct WebSocketTask {
         self.request = request
     }
 
-    public typealias Stream<Success, Failure: Error> = StreamOf<WebSocketRequest.Event<Success, Failure>>
+    public typealias EventStreamOf<Success, Failure: Error> = StreamOf<WebSocketRequest.Event<Success, Failure>>
 
-    public func streamingEvents(
+    public func streamingMessageEvents(
         automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
-        bufferingPolicy: StreamOf<WebSocketRequest.Event<URLSessionWebSocketTask.Message, Never>>.BufferingPolicy = .unbounded
-    ) -> StreamOf<WebSocketRequest.Event<URLSessionWebSocketTask.Message, Never>> {
+        bufferingPolicy: EventStreamOf<URLSessionWebSocketTask.Message, Never>.BufferingPolicy = .unbounded
+    ) -> EventStreamOf<URLSessionWebSocketTask.Message, Never> {
         createStream(automaticallyCancelling: shouldAutomaticallyCancel,
-                     bufferingPolicy: bufferingPolicy) { onEvent in
+                     bufferingPolicy: bufferingPolicy,
+                     transform: { $0 }) { onEvent in
             request.streamMessageEvents(on: .streamCompletionQueue(forRequestID: request.id), handler: onEvent)
         }
     }
 
-    private func createStream(
+    public func streamingMessages(
         automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
-        bufferingPolicy: StreamOf<WebSocketRequest.Event<URLSessionWebSocketTask.Message, Never>>.BufferingPolicy = .unbounded,
-        forResponse onResponse: @escaping (@escaping (WebSocketRequest.Event<URLSessionWebSocketTask.Message, Never>) -> Void) -> Void
-    ) -> StreamOf<WebSocketRequest.Event<URLSessionWebSocketTask.Message, Never>> {
+        bufferingPolicy: StreamOf<URLSessionWebSocketTask.Message>.BufferingPolicy = .unbounded
+    ) -> StreamOf<URLSessionWebSocketTask.Message> {
+        createStream(automaticallyCancelling: shouldAutomaticallyCancel,
+                     bufferingPolicy: bufferingPolicy,
+                     transform: { $0.message }) { onEvent in
+            request.streamMessageEvents(on: .streamCompletionQueue(forRequestID: request.id), handler: onEvent)
+        }
+    }
+
+    public func streamingDecodableEvents<Value: Decodable>(
+        _ type: Value.Type = Value.self,
+        automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
+        using decoder: DataDecoder = JSONDecoder(),
+        bufferingPolicy: EventStreamOf<Value, Error>.BufferingPolicy = .unbounded
+    ) -> EventStreamOf<Value, Error> {
+        createStream(automaticallyCancelling: shouldAutomaticallyCancel,
+                     bufferingPolicy: bufferingPolicy,
+                     transform: { $0 }) { onEvent in
+            request.streamDecodableEvents(Value.self,
+                                          on: .streamCompletionQueue(forRequestID: request.id),
+                                          using: decoder,
+                                          handler: onEvent)
+        }
+    }
+
+    public func streamingDecodable<Value: Decodable>(
+        _ type: Value.Type = Value.self,
+        automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
+        using decoder: DataDecoder = JSONDecoder(),
+        bufferingPolicy: StreamOf<Value>.BufferingPolicy = .unbounded
+    ) -> StreamOf<Value> {
+        createStream(automaticallyCancelling: shouldAutomaticallyCancel,
+                     bufferingPolicy: bufferingPolicy,
+                     transform: { $0.message }) { onEvent in
+            request.streamDecodableEvents(Value.self,
+                                          on: .streamCompletionQueue(forRequestID: request.id),
+                                          using: decoder,
+                                          handler: onEvent)
+        }
+    }
+
+    private func createStream<Success, Value, Failure: Error>(
+        automaticallyCancelling shouldAutomaticallyCancel: Bool,
+        bufferingPolicy: StreamOf<Value>.BufferingPolicy,
+        transform: @escaping (WebSocketRequest.Event<Success, Failure>) -> Value?,
+        forResponse onResponse: @escaping (@escaping (WebSocketRequest.Event<Success, Failure>) -> Void) -> Void
+    ) -> StreamOf<Value> {
         StreamOf(bufferingPolicy: bufferingPolicy) {
             guard shouldAutomaticallyCancel,
                   request.isInitialized || request.isResumed || request.isSuspended else { return }
@@ -796,7 +841,10 @@ public struct WebSocketTask {
             cancel()
         } builder: { continuation in
             onResponse { event in
-                continuation.yield(event)
+                if let value = transform(event) {
+                    continuation.yield(value)
+                }
+
                 if case .completed = event.kind {
                     continuation.finish()
                 }
@@ -804,19 +852,26 @@ public struct WebSocketTask {
         }
     }
 
-    public func send(_ message: URLSessionWebSocketTask.Message) async -> Result<Void, Error> {
-        await withCheckedContinuation { continuation in
-            request.send(message) { result in
-                continuation.resume(returning: result)
+    /// Send a `URLSessionWebSocketTask.Message`.
+    ///
+    /// - Parameter message: The `Message`.
+    ///
+    public func send(_ message: URLSessionWebSocketTask.Message) async throws {
+        try await withCheckedThrowingContinuation { continuation in
+            request.send(message, queue: .streamCompletionQueue(forRequestID: request.id)) { result in
+                continuation.resume(with: result)
             }
         }
     }
 
-    /// Cancel the underlying `WebSocketRequest`.
+    /// Close the underlying `WebSocketRequest`.
     public func close(sending closeCode: URLSessionWebSocketTask.CloseCode, reason: Data? = nil) {
         request.close(sending: closeCode, reason: reason)
     }
 
+    /// Cancel the underlying `WebSocketRequest`.
+    ///
+    /// Cancellation will produce an `AFError.explicitlyCancelled` instance.
     public func cancel() {
         request.cancel()
     }

+ 4 - 5
Source/Request.swift

@@ -1796,15 +1796,12 @@ public final class WebSocketRequest: Request {
             if case let .sessionTaskFailed(error) = mutableState.error, (error as? URLError)?.code == .cancelled {
                 mutableState.error = nil
             }
-//            mutableState.error = mutableState.error ?? AFError.explicitlyCancelled
         }
 
         // TODO: Still issue this event?
         eventMonitor?.requestDidCancel(self)
     }
 
-    // TODO: Distinguish between cancellation and close behavior?
-    // TODO: Reexamine cancellation behavior.
     @discardableResult
     public func close(sending closeCode: URLSessionWebSocketTask.CloseCode, reason: Data? = nil) -> Self {
         cancelTimedPing()
@@ -1842,6 +1839,7 @@ public final class WebSocketRequest: Request {
         dispatchPrecondition(condition: .onQueue(underlyingQueue))
 
         socketMutableState.read { state in
+            // TODO: Capture HTTPURLResponse here too?
             state.handlers.forEach { handler in
                 // Saved handler calls out to serializationQueue immediately, then to handler's queue.
                 handler.handler(.connected(protocol: `protocol`))
@@ -1935,8 +1933,9 @@ public final class WebSocketRequest: Request {
 
                 self.listen(to: task)
             case .failure:
+                // It doesn't seem like any relevant errors are received here, just incorrect garbage, like errors when
+                // the socket disconnects.
                 break
-//                NSLog("Receive for task: \(task), didFailWithError: \(error)")
             }
         }
     }
@@ -2075,7 +2074,7 @@ public final class WebSocketRequest: Request {
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 public protocol WebSocketMessageSerializer {
     associatedtype Output
-    associatedtype Failure = Error
+    associatedtype Failure: Error = Error
 
     func decode(_ message: URLSessionWebSocketTask.Message) throws -> Output
 }

+ 33 - 0
Tests/ConcurrencyTests.swift

@@ -746,6 +746,39 @@ final class UploadConcurrencyTests: BaseTestCase {
 }
 #endif
 
+#if canImport(Darwin) && !canImport(FoundationNetworking) && swift(>=5.8)
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+final class WebSocketConcurrencyTests: BaseTestCase {
+    func testThatMessageEventsCanBeStreamed() async throws {
+        // Given
+        let session = stored(Session())
+        let receivedEvent = expectation(description: "receivedEvent")
+        receivedEvent.expectedFulfillmentCount = 4
+
+        // When
+        for await _ in session.websocketRequest(.websocket()).webSocketTask().streamingMessageEvents() {
+            receivedEvent.fulfill()
+        }
+
+        await fulfillment(of: [receivedEvent])
+
+        // Then
+    }
+
+    func testThatMessagesCanBeStreamed() async throws {
+        // Given
+        let session = stored(Session())
+
+        // When
+        let messages = await session.websocketRequest(.websocket()).webSocketTask().streamingMessages().collect()
+
+        // Then
+        XCTAssertTrue(messages.count == 1)
+    }
+}
+
+#endif
+
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 final class ClosureAPIConcurrencyTests: BaseTestCase {
     func testThatDownloadProgressStreamReturnsProgress() async {