TaskGroup+CancellableTask.swift 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. extension TaskGroup {
  17. /// Adds a child task to the group which is individually cancellable.
  18. ///
  19. /// - Parameter operation: The task to add to the group.
  20. /// - Returns: A handle which can be used to cancel the task without cancelling the rest of
  21. /// the group.
  22. @inlinable
  23. mutating func addCancellableTask(
  24. _ operation: @Sendable @escaping () async -> ChildTaskResult
  25. ) -> CancellableTaskHandle {
  26. let signal = AsyncStream.makeStream(of: Void.self)
  27. self.addTask {
  28. return await withTaskGroup(
  29. of: _ResultOrCancelled.self,
  30. returning: ChildTaskResult.self
  31. ) { group in
  32. group.addTask {
  33. let childTaskResult = await operation()
  34. return .result(childTaskResult)
  35. }
  36. group.addTask {
  37. for await _ in signal.stream {}
  38. return .cancelled
  39. }
  40. let first = await group.next()!
  41. group.cancelAll()
  42. let second = await group.next()!
  43. switch (first, second) {
  44. case (.result(let result), .cancelled), (.cancelled, .result(let result)):
  45. return result
  46. default:
  47. fatalError("Internal inconsistency")
  48. }
  49. }
  50. }
  51. return CancellableTaskHandle(continuation: signal.continuation)
  52. }
  53. @usableFromInline
  54. enum _ResultOrCancelled: Sendable {
  55. case result(ChildTaskResult)
  56. case cancelled
  57. }
  58. }
  59. @usableFromInline
  60. struct CancellableTaskHandle: Sendable {
  61. @usableFromInline
  62. private(set) var continuation: AsyncStream<Void>.Continuation
  63. @inlinable
  64. init(continuation: AsyncStream<Void>.Continuation) {
  65. self.continuation = continuation
  66. }
  67. @inlinable
  68. func cancel() {
  69. self.continuation.finish()
  70. }
  71. }