浏览代码

Add a handful of utilities (#1690)

Motivation:

The client rpc executor makes use of a bunch of utilities. Since it will
be a reasonably large PR, in order to make it slightly less large, I'd
like to get some of the utilities reviewed separately. Since most are
too small to be worth reviewing individually this change includes a
few unrelated utilities.

Modifications:

- Add `UnsafeTransfer`
- Adds an optional `cause` error to `RPCError`
- Add extensions to `Metadata` for setting/parsing a few gRPC specific
  metadata fields
- Add extensions to `Result` for working with `async` closures and
  casting errors to a known type
- Add a type-erased closable writer similar to the type-erased writer

Result:

A few handy helpers are in place and the rpc executor PR will be a
little smaller.
George Barnett 2 年之前
父节点
当前提交
959cbd3f82

+ 28 - 0
Sources/GRPCCore/Internal/Concurrency Primitives/UnsafeTransfer.swift

@@ -0,0 +1,28 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@usableFromInline
+struct UnsafeTransfer<Wrapped> {
+  @usableFromInline
+  var wrappedValue: Wrapped
+
+  @inlinable
+  init(_ wrappedValue: Wrapped) {
+    self.wrappedValue = wrappedValue
+  }
+}
+
+extension UnsafeTransfer: @unchecked Sendable {}

+ 86 - 0
Sources/GRPCCore/Internal/Metadata+GRPC.swift

@@ -0,0 +1,86 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+extension Metadata {
+  @inlinable
+  var previousRPCAttempts: Int? {
+    get {
+      self.firstString(forKey: .previousRPCAttempts).flatMap { Int($0) }
+    }
+    set {
+      if let newValue = newValue {
+        self.replaceOrAddString(String(describing: newValue), forKey: .previousRPCAttempts)
+      } else {
+        self.removeAllValues(forKey: .previousRPCAttempts)
+      }
+    }
+  }
+
+  @inlinable
+  var retryPushback: RetryPushback? {
+    return self.firstString(forKey: .retryPushbackMs).map {
+      RetryPushback(milliseconds: $0)
+    }
+  }
+}
+
+extension Metadata {
+  @usableFromInline
+  enum GRPCKey: String, Sendable, Hashable {
+    case retryPushbackMs = "grpc-retry-pushback-ms"
+    case previousRPCAttempts = "grpc-previous-rpc-attempts"
+  }
+
+  @inlinable
+  func firstString(forKey key: GRPCKey) -> String? {
+    self[stringValues: key.rawValue].first(where: { _ in true })
+  }
+
+  @inlinable
+  mutating func replaceOrAddString(_ value: String, forKey key: GRPCKey) {
+    self.replaceOrAddString(value, forKey: key.rawValue)
+  }
+
+  @inlinable
+  mutating func removeAllValues(forKey key: GRPCKey) {
+    self.removeAllValues(forKey: key.rawValue)
+  }
+}
+
+extension Metadata {
+  @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
+  @usableFromInline
+  enum RetryPushback: Hashable, Sendable {
+    case retryAfter(Duration)
+    case stopRetrying
+
+    @inlinable
+    init(milliseconds value: String) {
+      if let milliseconds = Int64(value), milliseconds >= 0 {
+        let (seconds, remainingMilliseconds) = milliseconds.quotientAndRemainder(dividingBy: 1000)
+        // 1e18 attoseconds per second
+        // 1e15 attoseconds per millisecond.
+        let attoseconds = Int64(remainingMilliseconds) * 1_000_000_000_000_000
+        self = .retryAfter(Duration(secondsComponent: seconds, attosecondsComponent: attoseconds))
+      } else {
+        // Negative or not parseable means stop trying.
+        // Source: https://github.com/grpc/proposal/blob/master/A6-client-retries.md
+        self = .stopRetrying
+      }
+    }
+  }
+}

+ 46 - 0
Sources/GRPCCore/Internal/Result+Catching.swift

@@ -0,0 +1,46 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+extension Result where Failure == any Error {
+  /// Like `Result(catching:)`, but `async`.
+  ///
+  /// - Parameter body: An `async` closure to catch the result of.
+  @inlinable
+  init(catching body: () async throws -> Success) async {
+    do {
+      self = .success(try await body())
+    } catch {
+      self = .failure(error)
+    }
+  }
+
+  /// Attempts to map the error to the given error type.
+  ///
+  /// If the cast fails then the provided closure is used to create an error of the given type.
+  ///
+  /// - Parameters:
+  ///   - errorType: The type of error to cast to.
+  ///   - buildError: A closure which constructs the desired error if the cast fails.
+  @inlinable
+  func castError<NewError: Error>(
+    to errorType: NewError.Type = NewError.self,
+    or buildError: (any Error) -> NewError
+  ) -> Result<Success, NewError> {
+    return self.mapError { error in
+      return (error as? NewError) ?? buildError(error)
+    }
+  }
+}

+ 21 - 7
Sources/GRPCCore/RPCError.swift

@@ -59,24 +59,36 @@ public struct RPCError: @unchecked Sendable, Hashable, Error {
     }
   }
 
+  /// The original error which led to this error being thrown.
+  public var cause: Error? {
+    get { self.storage.cause }
+    set {
+      self.ensureStorageIsUnique()
+      self.storage.cause = newValue
+    }
+  }
+
   /// Create a new RPC error.
   ///
   /// - Parameters:
   ///   - code: The status code.
   ///   - message: A message providing additional context about the code.
   ///   - metadata: Any metadata to attach to the error.
-  public init(code: Code, message: String, metadata: Metadata = [:]) {
-    self.storage = Storage(code: code, message: message, metadata: metadata)
+  ///   - cause: An underlying error which led to this error being thrown.
+  public init(code: Code, message: String, metadata: Metadata = [:], cause: Error? = nil) {
+    self.storage = Storage(code: code, message: message, metadata: metadata, cause: cause)
   }
 
   /// Create a new RPC error from the provided ``Status``.
   ///
   /// Returns `nil` if the provided ``Status`` has code ``Status/Code-swift.struct/ok``.
   ///
-  /// - Parameter status: The status to convert.
-  public init?(status: Status) {
+  /// - Parameters:
+  ///   - status: The status to convert.
+  ///   - metadata: Any metadata to attach to the error.
+  public init?(status: Status, metadata: Metadata = [:]) {
     guard let code = Code(status.code) else { return nil }
-    self.init(code: code, message: status.message, metadata: [:])
+    self.init(code: code, message: status.message, metadata: metadata)
   }
 }
 
@@ -91,15 +103,17 @@ extension RPCError {
     var code: RPCError.Code
     var message: String
     var metadata: Metadata
+    var cause: Error?
 
-    init(code: RPCError.Code, message: String, metadata: Metadata) {
+    init(code: RPCError.Code, message: String, metadata: Metadata, cause: Error?) {
       self.code = code
       self.message = message
       self.metadata = metadata
+      self.cause = cause
     }
 
     func copy() -> Self {
-      Self(code: self.code, message: self.message, metadata: self.metadata)
+      Self(code: self.code, message: self.message, metadata: self.metadata, cause: self.cause)
     }
 
     func hash(into hasher: inout Hasher) {

+ 61 - 0
Sources/GRPCCore/Streaming/Internal/RPCWriter+Closable.swift

@@ -0,0 +1,61 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
+extension RPCWriter {
+  @usableFromInline
+  struct Closable: ClosableRPCWriterProtocol {
+    @usableFromInline
+    let writer: any ClosableRPCWriterProtocol<Element>
+
+    /// Creates an ``RPCWriter`` by wrapping the `other` writer.
+    ///
+    /// - Parameter other: The writer to wrap.
+    @inlinable
+    init(wrapping other: some ClosableRPCWriterProtocol<Element>) {
+      self.writer = other
+    }
+
+    /// Writes a sequence of elements.
+    ///
+    /// This function suspends until the elements have been accepted. Implements can use this
+    /// to exert backpressure on callers.
+    ///
+    /// - Parameter elements: The elements to write.
+    @inlinable
+    func write(contentsOf elements: some Sequence<Element>) async throws {
+      try await self.writer.write(contentsOf: elements)
+    }
+
+    /// Indicate to the writer that no more writes are to be accepted.
+    ///
+    /// All writes after ``finish()`` has been called should result in an error
+    /// being thrown.
+    @inlinable
+    func finish() {
+      self.writer.finish()
+    }
+
+    /// Indicate to the writer that no more writes are to be accepted because an error occurred.
+    ///
+    /// All writes after ``finish(throwing:)`` has been called should result in an error
+    /// being thrown.
+    @inlinable
+    func finish(throwing error: Error) {
+      self.writer.finish(throwing: error)
+    }
+  }
+}

+ 86 - 0
Tests/GRPCCoreTests/Internal/Metadata+GRPCTests.swift

@@ -0,0 +1,86 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import XCTest
+
+@testable import GRPCCore
+
+final class MetadataGRPCTests: XCTestCase {
+  func testPreviousRPCAttemptsValidValues() {
+    let testData = [("0", 0), ("1", 1), ("-1", -1)]
+    for (value, expected) in testData {
+      let metadata: Metadata = ["grpc-previous-rpc-attempts": "\(value)"]
+      XCTAssertEqual(metadata.previousRPCAttempts, expected)
+    }
+  }
+
+  func testPreviousRPCAttemptsInvalidValues() {
+    let values = ["foo", "42.0"]
+    for value in values {
+      let metadata: Metadata = ["grpc-previous-rpc-attempts": "\(value)"]
+      XCTAssertNil(metadata.previousRPCAttempts)
+    }
+  }
+
+  func testSetPreviousRPCAttemptsToValue() {
+    var metadata: Metadata = [:]
+
+    metadata.previousRPCAttempts = 42
+    XCTAssertEqual(metadata, ["grpc-previous-rpc-attempts": "42"])
+
+    metadata.previousRPCAttempts = nil
+    XCTAssertEqual(metadata, [:])
+
+    for i in 0 ..< 5 {
+      metadata.addString("\(i)", forKey: "grpc-previous-rpc-attempts")
+    }
+    XCTAssertEqual(metadata.count, 5)
+
+    // Should remove old values.
+    metadata.previousRPCAttempts = 42
+    XCTAssertEqual(metadata, ["grpc-previous-rpc-attempts": "42"])
+  }
+
+  func testRetryPushbackValidDelay() {
+    let testData: [(String, Duration)] = [
+      ("0", .zero),
+      ("1", Duration(secondsComponent: 0, attosecondsComponent: 1_000_000_000_000_000)),
+      ("999", Duration(secondsComponent: 0, attosecondsComponent: 999_000_000_000_000_000)),
+      ("1000", Duration(secondsComponent: 1, attosecondsComponent: 0)),
+      ("1001", Duration(secondsComponent: 1, attosecondsComponent: 1_000_000_000_000_000)),
+      ("1999", Duration(secondsComponent: 1, attosecondsComponent: 999_000_000_000_000_000)),
+    ]
+
+    for (value, expectedDuration) in testData {
+      let metadata: Metadata = ["grpc-retry-pushback-ms": "\(value)"]
+      XCTAssertEqual(metadata.retryPushback, .retryAfter(expectedDuration))
+    }
+  }
+
+  func testRetryPushbackInvalidDelay() {
+    let testData: [String] = ["-1", "-inf", "not-a-number", "42.0"]
+
+    for value in testData {
+      let metadata: Metadata = ["grpc-retry-pushback-ms": "\(value)"]
+      XCTAssertEqual(metadata.retryPushback, .stopRetrying)
+    }
+  }
+
+  func testRetryPushbackNoValuePresent() {
+    let metadata: Metadata = [:]
+    XCTAssertNil(metadata.retryPushback)
+  }
+}

+ 65 - 0
Tests/GRPCCoreTests/Internal/Result+CatchingTests.swift

@@ -0,0 +1,65 @@
+/*
+ * Copyright 2023, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import XCTest
+
+@testable import GRPCCore
+
+final class ResultCatchingTests: XCTestCase {
+  func testResultCatching() async {
+    let result = await Result {
+      try? await Task.sleep(nanoseconds: 1)
+      throw RPCError(code: .unknown, message: "foo")
+    }
+
+    switch result {
+    case .success:
+      XCTFail()
+    case .failure(let error):
+      XCTAssertEqual(error as? RPCError, RPCError(code: .unknown, message: "foo"))
+    }
+  }
+
+  func testCastToErrorOfCorrectType() async {
+    let result = Result<Void, any Error>.failure(RPCError(code: .unknown, message: "foo"))
+    let typedFailure = result.castError(to: RPCError.self) { _ in
+      XCTFail("buildError(_:) was called")
+      return RPCError(code: .failedPrecondition, message: "shouldn't happen")
+    }
+
+    switch typedFailure {
+    case .success:
+      XCTFail()
+    case .failure(let error):
+      XCTAssertEqual(error, RPCError(code: .unknown, message: "foo"))
+    }
+  }
+
+  func testCastToErrorOfIncorrectType() async {
+    struct WrongError: Error {}
+    let result = Result<Void, any Error>.failure(WrongError())
+    let typedFailure = result.castError(to: RPCError.self) { _ in
+      return RPCError(code: .invalidArgument, message: "fallback")
+    }
+
+    switch typedFailure {
+    case .success:
+      XCTFail()
+    case .failure(let error):
+      XCTAssertEqual(error, RPCError(code: .invalidArgument, message: "fallback"))
+    }
+  }
+}