DataStreamRequest.swift 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. //
  2. // DataStreamRequest.swift
  3. //
  4. // Copyright (c) 2014-2024 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. import Foundation
  25. /// `Request` subclass which streams HTTP response `Data` through a `Handler` closure.
  26. public final class DataStreamRequest: Request, @unchecked Sendable {
  27. /// Closure type handling `DataStreamRequest.Stream` values.
  28. public typealias Handler<Success, Failure: Error> = @Sendable (Stream<Success, Failure>) throws -> Void
  29. /// Type encapsulating an `Event` as it flows through the stream, as well as a `CancellationToken` which can be used
  30. /// to stop the stream at any time.
  31. public struct Stream<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable {
  32. /// Latest `Event` from the stream.
  33. public let event: Event<Success, Failure>
  34. /// Token used to cancel the stream.
  35. public let token: CancellationToken
  36. /// Cancel the ongoing stream by canceling the underlying `DataStreamRequest`.
  37. public func cancel() {
  38. token.cancel()
  39. }
  40. }
  41. /// Type representing an event flowing through the stream. Contains either the `Result` of processing streamed
  42. /// `Data` or the completion of the stream.
  43. public enum Event<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable {
  44. /// Output produced every time the instance receives additional `Data`. The associated value contains the
  45. /// `Result` of processing the incoming `Data`.
  46. case stream(Result<Success, Failure>)
  47. /// Output produced when the instance has completed, whether due to stream end, cancellation, or an error.
  48. /// Associated `Completion` value contains the final state.
  49. case complete(Completion)
  50. }
  51. /// Value containing the state of a `DataStreamRequest` when the stream was completed.
  52. public struct Completion: Sendable {
  53. /// Last `URLRequest` issued by the instance.
  54. public let request: URLRequest?
  55. /// Last `HTTPURLResponse` received by the instance.
  56. public let response: HTTPURLResponse?
  57. /// Last `URLSessionTaskMetrics` produced for the instance.
  58. public let metrics: URLSessionTaskMetrics?
  59. /// `AFError` produced for the instance, if any.
  60. public let error: AFError?
  61. }
  62. /// Type used to cancel an ongoing stream.
  63. public struct CancellationToken: Sendable {
  64. weak var request: DataStreamRequest?
  65. init(_ request: DataStreamRequest) {
  66. self.request = request
  67. }
  68. /// Cancel the ongoing stream by canceling the underlying `DataStreamRequest`.
  69. public func cancel() {
  70. request?.cancel()
  71. }
  72. }
  73. /// `URLRequestConvertible` value used to create `URLRequest`s for this instance.
  74. public let convertible: any URLRequestConvertible
  75. /// Whether or not the instance will be cancelled if stream parsing encounters an error.
  76. public let automaticallyCancelOnStreamError: Bool
  77. /// Internal mutable state specific to this type.
  78. struct StreamMutableState {
  79. /// `OutputStream` bound to the `InputStream` produced by `asInputStream`, if it has been called.
  80. var outputStream: OutputStream?
  81. /// Stream closures called as `Data` is received.
  82. var streams: [@Sendable (_ data: Data) -> Void] = []
  83. /// Number of currently executing streams. Used to ensure completions are only fired after all streams are
  84. /// enqueued.
  85. var numberOfExecutingStreams = 0
  86. /// Completion calls enqueued while streams are still executing.
  87. var enqueuedCompletionEvents: [@Sendable () -> Void] = []
  88. /// Handler for any `HTTPURLResponse`s received.
  89. var httpResponseHandler: (queue: DispatchQueue,
  90. handler: @Sendable (_ response: HTTPURLResponse,
  91. _ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void)?
  92. }
  93. let streamMutableState = Protected(StreamMutableState())
  94. /// Creates a `DataStreamRequest` using the provided parameters.
  95. ///
  96. /// - Parameters:
  97. /// - id: `UUID` used for the `Hashable` and `Equatable` implementations. `UUID()`
  98. /// by default.
  99. /// - convertible: `URLRequestConvertible` value used to create `URLRequest`s for this
  100. /// instance.
  101. /// - automaticallyCancelOnStreamError: `Bool` indicating whether the instance will be cancelled when an `Error`
  102. /// is thrown while serializing stream `Data`.
  103. /// - underlyingQueue: `DispatchQueue` on which all internal `Request` work is performed.
  104. /// - serializationQueue: `DispatchQueue` on which all serialization work is performed. By default
  105. /// targets
  106. /// `underlyingQueue`, but can be passed another queue from a `Session`.
  107. /// - eventMonitor: `EventMonitor` called for event callbacks from internal `Request` actions.
  108. /// - interceptor: `RequestInterceptor` used throughout the request lifecycle.
  109. /// - shouldAutomaticallyResume: Whether the instance should resume after the first response handler is added.
  110. /// - delegate: `RequestDelegate` that provides an interface to actions not performed by
  111. /// the `Request`.
  112. init(id: UUID = UUID(),
  113. convertible: any URLRequestConvertible,
  114. automaticallyCancelOnStreamError: Bool,
  115. underlyingQueue: DispatchQueue,
  116. serializationQueue: DispatchQueue,
  117. eventMonitor: (any EventMonitor)?,
  118. interceptor: (any RequestInterceptor)?,
  119. shouldAutomaticallyResume: Bool?,
  120. delegate: any RequestDelegate) {
  121. self.convertible = convertible
  122. self.automaticallyCancelOnStreamError = automaticallyCancelOnStreamError
  123. super.init(id: id,
  124. underlyingQueue: underlyingQueue,
  125. serializationQueue: serializationQueue,
  126. eventMonitor: eventMonitor,
  127. interceptor: interceptor,
  128. shouldAutomaticallyResume: shouldAutomaticallyResume,
  129. delegate: delegate)
  130. }
  131. override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
  132. let copiedRequest = request
  133. return session.dataTask(with: copiedRequest)
  134. }
  135. override func finish(error: AFError? = nil) {
  136. streamMutableState.write { state in
  137. state.outputStream?.close()
  138. }
  139. super.finish(error: error)
  140. }
  141. func didReceive(data: Data) {
  142. streamMutableState.write { state in
  143. #if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
  144. if let stream = state.outputStream {
  145. underlyingQueue.async {
  146. var bytes = Array(data)
  147. stream.write(&bytes, maxLength: bytes.count)
  148. }
  149. }
  150. #endif
  151. state.numberOfExecutingStreams += state.streams.count
  152. underlyingQueue.async { [streams = state.streams] in streams.forEach { $0(data) } }
  153. }
  154. }
  155. func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping @Sendable (URLSession.ResponseDisposition) -> Void) {
  156. streamMutableState.read { dataMutableState in
  157. guard let httpResponseHandler = dataMutableState.httpResponseHandler else {
  158. underlyingQueue.async { completionHandler(.allow) }
  159. return
  160. }
  161. httpResponseHandler.queue.async {
  162. httpResponseHandler.handler(response) { disposition in
  163. if disposition == .cancel {
  164. self.mutableState.write { mutableState in
  165. mutableState.state = .cancelled
  166. mutableState.error = mutableState.error ?? AFError.explicitlyCancelled
  167. }
  168. }
  169. self.underlyingQueue.async {
  170. completionHandler(disposition.sessionDisposition)
  171. }
  172. }
  173. }
  174. }
  175. }
  176. /// Validates the `URLRequest` and `HTTPURLResponse` received for the instance using the provided `Validation` closure.
  177. ///
  178. /// - Parameter validation: `Validation` closure used to validate the request and response.
  179. ///
  180. /// - Returns: The `DataStreamRequest`.
  181. @discardableResult
  182. public func validate(_ validation: @escaping Validation) -> Self {
  183. let validator: @Sendable () -> Void = { [unowned self] in
  184. guard error == nil, let response else { return }
  185. let result = validation(request, response)
  186. if case let .failure(error) = result {
  187. self.error = error.asAFError(or: .responseValidationFailed(reason: .customValidationFailed(error: error)))
  188. }
  189. eventMonitor?.request(self,
  190. didValidateRequest: request,
  191. response: response,
  192. withResult: result)
  193. }
  194. validators.write { $0.append(validator) }
  195. return self
  196. }
  197. #if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
  198. /// Produces an `InputStream` that receives the `Data` received by the instance.
  199. ///
  200. /// - Note: The `InputStream` produced by this method must have `open()` called before being able to read `Data`.
  201. /// Additionally, this method will automatically call `resume()` on the instance, regardless of whether or
  202. /// not the creating session has `startRequestsImmediately` set to `true`.
  203. ///
  204. /// - Parameter bufferSize: Size, in bytes, of the buffer between the `OutputStream` and `InputStream`.
  205. ///
  206. /// - Returns: The `InputStream` bound to the internal `OutboundStream`.
  207. public func asInputStream(bufferSize: Int = 1024) -> InputStream? {
  208. defer { resume() }
  209. var inputStream: InputStream?
  210. streamMutableState.write { state in
  211. Foundation.Stream.getBoundStreams(withBufferSize: bufferSize,
  212. inputStream: &inputStream,
  213. outputStream: &state.outputStream)
  214. state.outputStream?.open()
  215. }
  216. return inputStream
  217. }
  218. #endif
  219. /// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse` and providing a completion
  220. /// handler to return a `ResponseDisposition` value.
  221. ///
  222. /// - Parameters:
  223. /// - queue: `DispatchQueue` on which the closure will be called. `.main` by default.
  224. /// - handler: Closure called when the instance produces an `HTTPURLResponse`. The `completionHandler` provided
  225. /// MUST be called, otherwise the request will never complete.
  226. ///
  227. /// - Returns: The instance.
  228. @_disfavoredOverload
  229. @preconcurrency
  230. @discardableResult
  231. public func onHTTPResponse(
  232. on queue: DispatchQueue = .main,
  233. perform handler: @escaping @Sendable (_ response: HTTPURLResponse,
  234. _ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void
  235. ) -> Self {
  236. streamMutableState.write { mutableState in
  237. mutableState.httpResponseHandler = (queue, handler)
  238. }
  239. return self
  240. }
  241. /// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
  242. ///
  243. /// - Parameters:
  244. /// - queue: `DispatchQueue` on which the closure will be called. `.main` by default.
  245. /// - handler: Closure called when the instance produces an `HTTPURLResponse`.
  246. ///
  247. /// - Returns: The instance.
  248. @preconcurrency
  249. @discardableResult
  250. public func onHTTPResponse(on queue: DispatchQueue = .main,
  251. perform handler: @escaping @Sendable (HTTPURLResponse) -> Void) -> Self {
  252. onHTTPResponse(on: queue) { response, completionHandler in
  253. handler(response)
  254. completionHandler(.allow)
  255. }
  256. return self
  257. }
  258. func capturingError(from closure: () throws -> Void) {
  259. do {
  260. try closure()
  261. } catch {
  262. self.error = error.asAFError(or: .responseSerializationFailed(reason: .customSerializationFailed(error: error)))
  263. cancel()
  264. }
  265. }
  266. func appendStreamCompletion<Success, Failure>(on queue: DispatchQueue,
  267. stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable {
  268. appendResponseSerializer {
  269. self.underlyingQueue.async {
  270. self.responseSerializerDidComplete {
  271. self.streamMutableState.write { state in
  272. guard state.numberOfExecutingStreams == 0 else {
  273. state.enqueuedCompletionEvents.append {
  274. self.enqueueCompletion(on: queue, stream: stream)
  275. }
  276. return
  277. }
  278. self.enqueueCompletion(on: queue, stream: stream)
  279. }
  280. }
  281. }
  282. }
  283. }
  284. func enqueueCompletion<Success, Failure>(on queue: DispatchQueue,
  285. stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable {
  286. queue.async {
  287. do {
  288. let completion = Completion(request: self.request,
  289. response: self.response,
  290. metrics: self.metrics,
  291. error: self.error)
  292. try stream(.init(event: .complete(completion), token: .init(self)))
  293. } catch {
  294. // Ignore error, as errors on Completion can't be handled anyway.
  295. }
  296. }
  297. }
  298. // MARK: Response Serialization
  299. /// Adds a `StreamHandler` which performs no parsing on incoming `Data`.
  300. ///
  301. /// - Parameters:
  302. /// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
  303. /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
  304. ///
  305. /// - Returns: The `DataStreamRequest`.
  306. @preconcurrency
  307. @discardableResult
  308. public func responseStream(on queue: DispatchQueue = .main, stream: @escaping Handler<Data, Never>) -> Self {
  309. let parser = { @Sendable [unowned self] (data: Data) in
  310. queue.async {
  311. self.capturingError {
  312. try stream(.init(event: .stream(.success(data)), token: .init(self)))
  313. }
  314. self.updateAndCompleteIfPossible()
  315. }
  316. }
  317. streamMutableState.write { $0.streams.append(parser) }
  318. appendStreamCompletion(on: queue, stream: stream)
  319. return self
  320. }
  321. /// Adds a `StreamHandler` which uses the provided `DataStreamSerializer` to process incoming `Data`.
  322. ///
  323. /// - Parameters:
  324. /// - serializer: `DataStreamSerializer` used to process incoming `Data`. Its work is done on the `serializationQueue`.
  325. /// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
  326. /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
  327. ///
  328. /// - Returns: The `DataStreamRequest`.
  329. @preconcurrency
  330. @discardableResult
  331. public func responseStream<Serializer: DataStreamSerializer>(using serializer: Serializer,
  332. on queue: DispatchQueue = .main,
  333. stream: @escaping Handler<Serializer.SerializedObject, AFError>) -> Self {
  334. let parser = { @Sendable [unowned self] (data: Data) in
  335. serializationQueue.async {
  336. // Start work on serialization queue.
  337. let result = Result { try serializer.serialize(data) }
  338. .mapError { $0.asAFError(or: .responseSerializationFailed(reason: .customSerializationFailed(error: $0))) }
  339. // End work on serialization queue.
  340. self.underlyingQueue.async {
  341. self.eventMonitor?.request(self, didParseStream: result)
  342. if result.isFailure, self.automaticallyCancelOnStreamError {
  343. self.cancel()
  344. }
  345. queue.async {
  346. self.capturingError {
  347. try stream(.init(event: .stream(result), token: .init(self)))
  348. }
  349. self.updateAndCompleteIfPossible()
  350. }
  351. }
  352. }
  353. }
  354. streamMutableState.write { $0.streams.append(parser) }
  355. appendStreamCompletion(on: queue, stream: stream)
  356. return self
  357. }
  358. /// Adds a `StreamHandler` which parses incoming `Data` as a UTF8 `String`.
  359. ///
  360. /// - Parameters:
  361. /// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
  362. /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
  363. ///
  364. /// - Returns: The `DataStreamRequest`.
  365. @preconcurrency
  366. @discardableResult
  367. public func responseStreamString(on queue: DispatchQueue = .main,
  368. stream: @escaping Handler<String, Never>) -> Self {
  369. let parser = { @Sendable [unowned self] (data: Data) in
  370. serializationQueue.async {
  371. // Start work on serialization queue.
  372. let string = String(decoding: data, as: UTF8.self)
  373. // End work on serialization queue.
  374. self.underlyingQueue.async {
  375. self.eventMonitor?.request(self, didParseStream: .success(string))
  376. queue.async {
  377. self.capturingError {
  378. try stream(.init(event: .stream(.success(string)), token: .init(self)))
  379. }
  380. self.updateAndCompleteIfPossible()
  381. }
  382. }
  383. }
  384. }
  385. streamMutableState.write { $0.streams.append(parser) }
  386. appendStreamCompletion(on: queue, stream: stream)
  387. return self
  388. }
  389. private func updateAndCompleteIfPossible() {
  390. streamMutableState.write { state in
  391. state.numberOfExecutingStreams -= 1
  392. guard state.numberOfExecutingStreams == 0, !state.enqueuedCompletionEvents.isEmpty else { return }
  393. let completionEvents = state.enqueuedCompletionEvents
  394. self.underlyingQueue.async { completionEvents.forEach { $0() } }
  395. state.enqueuedCompletionEvents.removeAll()
  396. }
  397. }
  398. /// Adds a `StreamHandler` which parses incoming `Data` using the provided `DataDecoder`.
  399. ///
  400. /// - Parameters:
  401. /// - type: `Decodable` type to parse incoming `Data` into.
  402. /// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
  403. /// - decoder: `DataDecoder` used to decode the incoming `Data`.
  404. /// - preprocessor: `DataPreprocessor` used to process the incoming `Data` before it's passed to the `decoder`.
  405. /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
  406. ///
  407. /// - Returns: The `DataStreamRequest`.
  408. @preconcurrency
  409. @discardableResult
  410. public func responseStreamDecodable<T: Decodable>(of type: T.Type = T.self,
  411. on queue: DispatchQueue = .main,
  412. using decoder: any DataDecoder = JSONDecoder(),
  413. preprocessor: any DataPreprocessor = PassthroughPreprocessor(),
  414. stream: @escaping Handler<T, AFError>) -> Self where T: Sendable {
  415. responseStream(using: DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: preprocessor),
  416. on: queue,
  417. stream: stream)
  418. }
  419. }
  420. extension DataStreamRequest.Stream {
  421. /// Incoming `Result` values from `Event.stream`.
  422. public var result: Result<Success, Failure>? {
  423. guard case let .stream(result) = event else { return nil }
  424. return result
  425. }
  426. /// `Success` value of the instance, if any.
  427. public var value: Success? {
  428. guard case let .success(value) = result else { return nil }
  429. return value
  430. }
  431. /// `Failure` value of the instance, if any.
  432. public var error: Failure? {
  433. guard case let .failure(error) = result else { return nil }
  434. return error
  435. }
  436. /// `Completion` value of the instance, if any.
  437. public var completion: DataStreamRequest.Completion? {
  438. guard case let .complete(completion) = event else { return nil }
  439. return completion
  440. }
  441. }
  442. // MARK: - Serialization
  443. /// A type which can serialize incoming `Data`.
  444. public protocol DataStreamSerializer: Sendable {
  445. /// Type produced from the serialized `Data`.
  446. associatedtype SerializedObject: Sendable
  447. /// Serializes incoming `Data` into a `SerializedObject` value.
  448. ///
  449. /// - Parameter data: `Data` to be serialized.
  450. ///
  451. /// - Throws: Any error produced during serialization.
  452. func serialize(_ data: Data) throws -> SerializedObject
  453. }
  454. /// `DataStreamSerializer` which uses the provided `DataPreprocessor` and `DataDecoder` to serialize the incoming `Data`.
  455. public struct DecodableStreamSerializer<T: Decodable>: DataStreamSerializer where T: Sendable {
  456. /// `DataDecoder` used to decode incoming `Data`.
  457. public let decoder: any DataDecoder
  458. /// `DataPreprocessor` incoming `Data` is passed through before being passed to the `DataDecoder`.
  459. public let dataPreprocessor: any DataPreprocessor
  460. /// Creates an instance with the provided `DataDecoder` and `DataPreprocessor`.
  461. /// - Parameters:
  462. /// - decoder: ` DataDecoder` used to decode incoming `Data`. `JSONDecoder()` by default.
  463. /// - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the
  464. /// `decoder`. `PassthroughPreprocessor()` by default.
  465. public init(decoder: any DataDecoder = JSONDecoder(), dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) {
  466. self.decoder = decoder
  467. self.dataPreprocessor = dataPreprocessor
  468. }
  469. public func serialize(_ data: Data) throws -> T {
  470. let processedData = try dataPreprocessor.preprocess(data)
  471. do {
  472. return try decoder.decode(T.self, from: processedData)
  473. } catch {
  474. throw AFError.responseSerializationFailed(reason: .decodingFailed(error: error))
  475. }
  476. }
  477. }
  478. /// `DataStreamSerializer` which performs no serialization on incoming `Data`.
  479. public struct PassthroughStreamSerializer: DataStreamSerializer {
  480. /// Creates an instance.
  481. public init() {}
  482. public func serialize(_ data: Data) throws -> Data { data }
  483. }
  484. /// `DataStreamSerializer` which serializes incoming stream `Data` into `UTF8`-decoded `String` values.
  485. public struct StringStreamSerializer: DataStreamSerializer {
  486. /// Creates an instance.
  487. public init() {}
  488. public func serialize(_ data: Data) throws -> String {
  489. String(decoding: data, as: UTF8.self)
  490. }
  491. }
  492. extension DataStreamSerializer {
  493. /// Creates a `DecodableStreamSerializer` instance with the provided `DataDecoder` and `DataPreprocessor`.
  494. ///
  495. /// - Parameters:
  496. /// - type: `Decodable` type to decode from stream data.
  497. /// - decoder: ` DataDecoder` used to decode incoming `Data`. `JSONDecoder()` by default.
  498. /// - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the
  499. /// `decoder`. `PassthroughPreprocessor()` by default.
  500. public static func decodable<T: Decodable>(of type: T.Type,
  501. decoder: any DataDecoder = JSONDecoder(),
  502. dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) -> Self where Self == DecodableStreamSerializer<T> {
  503. DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: dataPreprocessor)
  504. }
  505. }
  506. extension DataStreamSerializer where Self == PassthroughStreamSerializer {
  507. /// Provides a `PassthroughStreamSerializer` instance.
  508. public static var passthrough: PassthroughStreamSerializer { PassthroughStreamSerializer() }
  509. }
  510. extension DataStreamSerializer where Self == StringStreamSerializer {
  511. /// Provides a `StringStreamSerializer` instance.
  512. public static var string: StringStreamSerializer { StringStreamSerializer() }
  513. }