Browse Source

Add Sendable conformance to core async/await API (#1378)

Motivation:

The core async/await API should have appropriate 'Sendable' conformance
before we publish it to main. This change adds it to the core parts of
the new API.

There are still numerous types which can be made 'Sendable' but are
either less significant (e.g. configuration) or require depdendencies to
adopt 'Sendable' first (and would otherwise require `@preconcurrency`)

Modifications:

- Add `GRPCSendable` to ease adoption
- Make some plain-old-data types `Sendable` including `GRPCStatus`
- Require `Request` and `Response` to be `Sendable` in the client and
  server async APIs
- Make the async server context `@unchecked Sendable`
- Make the async writer and stream reader `Sendable`

Result:

Core async API is `Sendable`.
George Barnett 3 years ago
parent
commit
b341dbb881

+ 5 - 5
Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

@@ -27,7 +27,7 @@ import NIOCore
 /// may suspend if the writer has been paused.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 @usableFromInline
-internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
+internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
   @usableFromInline
   internal typealias Element = Delegate.Element
 
@@ -36,7 +36,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
 
   /// A value pending a write.
   @usableFromInline
-  internal struct _Pending<Value> {
+  internal struct _Pending<Value: Sendable>: Sendable {
     @usableFromInline
     var value: Value
 
@@ -323,9 +323,9 @@ public struct GRPCAsyncWriterError: Error, Hashable {
 }
 
 @usableFromInline
-internal protocol AsyncWriterDelegate: AnyObject {
-  associatedtype Element
-  associatedtype End
+internal protocol AsyncWriterDelegate: AnyObject, Sendable {
+  associatedtype Element: Sendable
+  associatedtype End: Sendable
 
   @inlinable
   func write(_ element: Element)

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

@@ -19,7 +19,7 @@ import NIOHPACK
 
 /// Async-await variant of BidirectionalStreamingCall.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
+public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable> {
   private let call: Call<Request, Response>
   private let responseParts: StreamingResponseParts<Response>
   private let responseSource: PassthroughMessageSource<Response, Error>

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift

@@ -19,7 +19,7 @@ import NIOHPACK
 
 /// Async-await variant of `ClientStreamingCall`.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncClientStreamingCall<Request, Response> {
+public struct GRPCAsyncClientStreamingCall<Request: Sendable, Response: Sendable> {
   private let call: Call<Request, Response>
   private let responseParts: UnaryResponseParts<Response>
 

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift

@@ -19,7 +19,7 @@
 /// This is currently a wrapper around AsyncThrowingStream because we want to be
 /// able to swap out the implementation for something else in the future.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
+public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
   @usableFromInline
   internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
 

+ 6 - 6
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift

@@ -31,7 +31,7 @@
 /// try await stream.finish()
 /// ```
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncRequestStreamWriter<Request> {
+public struct GRPCAsyncRequestStreamWriter<Request: Sendable> {
   @usableFromInline
   internal let asyncWriter: AsyncWriter<Delegate<Request>>
 
@@ -78,7 +78,7 @@ public struct GRPCAsyncRequestStreamWriter<Request> {
 extension GRPCAsyncRequestStreamWriter {
   /// A delegate for the writer which writes messages to an underlying receiver.`
   @usableFromInline
-  internal final class Delegate<Request>: AsyncWriterDelegate {
+  internal final class Delegate<Request: Sendable>: AsyncWriterDelegate, Sendable {
     @usableFromInline
     internal typealias Element = (Request, Compression)
 
@@ -89,16 +89,16 @@ extension GRPCAsyncRequestStreamWriter {
     internal let _compressionEnabled: Bool
 
     @usableFromInline
-    internal let _send: (Request, MessageMetadata) -> Void
+    internal let _send: @Sendable(Request, MessageMetadata) -> Void
 
     @usableFromInline
-    internal let _finish: () -> Void
+    internal let _finish: @Sendable() -> Void
 
     @inlinable
     internal init(
       compressionEnabled: Bool,
-      send: @escaping (Request, MessageMetadata) -> Void,
-      finish: @escaping () -> Void
+      send: @Sendable @escaping (Request, MessageMetadata) -> Void,
+      finish: @Sendable @escaping () -> Void
     ) {
       self._compressionEnabled = compressionEnabled
       self._send = send

+ 6 - 6
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift

@@ -18,7 +18,7 @@
 
 /// Writer for server-streaming RPC handlers to provide responses.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncResponseStreamWriter<Response> {
+public struct GRPCAsyncResponseStreamWriter<Response: Sendable> {
   @usableFromInline
   internal typealias Element = (Response, Compression)
 
@@ -44,7 +44,7 @@ public struct GRPCAsyncResponseStreamWriter<Response> {
 
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 @usableFromInline
-internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
+internal final class AsyncResponseStreamWriterDelegate<Response: Sendable>: AsyncWriterDelegate {
   @usableFromInline
   internal typealias Element = (Response, Compression)
 
@@ -55,10 +55,10 @@ internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDel
   internal let _context: GRPCAsyncServerCallContext
 
   @usableFromInline
-  internal let _send: (Response, MessageMetadata) -> Void
+  internal let _send: @Sendable(Response, MessageMetadata) -> Void
 
   @usableFromInline
-  internal let _finish: (GRPCStatus) -> Void
+  internal let _finish: @Sendable(GRPCStatus) -> Void
 
   @usableFromInline
   internal let _compressionEnabledOnServer: Bool
@@ -70,8 +70,8 @@ internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDel
   internal init(
     context: GRPCAsyncServerCallContext,
     compressionIsEnabled: Bool,
-    send: @escaping (Response, MessageMetadata) -> Void,
-    finish: @escaping (GRPCStatus) -> Void
+    send: @escaping @Sendable(Response, MessageMetadata) -> Void,
+    finish: @escaping @Sendable(GRPCStatus) -> Void
   ) {
     self._context = context
     self._compressionEnabledOnServer = compressionIsEnabled

+ 4 - 2
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift

@@ -28,10 +28,12 @@ import NIOHPACK
 // make for a surprising API.
 //
 // We also considered an `actor` but that felt clunky at the point of use since adopters would need
-// to `await` the retrieval of a logger or the updating of the trailers and each would requrie a
+// to `await` the retrieval of a logger or the updating of the trailers and each would require a
 // promise to glue the NIO and async-await paradigms in the handler.
+//
+// Note: this is `@unchecked Sendable`; all mutable state is protected by a lock.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public final class GRPCAsyncServerCallContext {
+public final class GRPCAsyncServerCallContext: @unchecked Sendable {
   private let lock = Lock()
 
   /// Metadata for this request.

+ 12 - 11
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift

@@ -21,10 +21,12 @@ import NIOHPACK
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
 public struct GRPCAsyncServerHandler<
   Serializer: MessageSerializer,
-  Deserializer: MessageDeserializer
->: GRPCServerHandlerProtocol {
+  Deserializer: MessageDeserializer,
+  Request: Sendable,
+  Response: Sendable
+>: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
   @usableFromInline
-  internal let _handler: AsyncServerHandler<Serializer, Deserializer>
+  internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
 
   public func receiveMetadata(_ metadata: HPACKHeaders) {
     self._handler.receiveMetadata(metadata)
@@ -153,13 +155,10 @@ extension GRPCAsyncServerHandler {
 @usableFromInline
 internal final class AsyncServerHandler<
   Serializer: MessageSerializer,
-  Deserializer: MessageDeserializer
->: GRPCServerHandlerProtocol {
-  @usableFromInline
-  internal typealias Request = Deserializer.Output
-  @usableFromInline
-  internal typealias Response = Serializer.Input
-
+  Deserializer: MessageDeserializer,
+  Request: Sendable,
+  Response: Sendable
+>: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
   /// A response serializer.
   @usableFromInline
   internal let serializer: Serializer
@@ -182,7 +181,7 @@ internal final class AsyncServerHandler<
 
   /// The user provided function to execute.
   @usableFromInline
-  internal let userHandler: (
+  internal let userHandler: @Sendable(
     GRPCAsyncRequestStream<Request>,
     GRPCAsyncResponseStreamWriter<Response>,
     GRPCAsyncServerCallContext
@@ -531,6 +530,7 @@ internal final class AsyncServerHandler<
     }
   }
 
+  @Sendable
   @inlinable
   internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
     if self.context.eventLoop.inEventLoop {
@@ -587,6 +587,7 @@ internal final class AsyncServerHandler<
     }
   }
 
+  @Sendable
   @inlinable
   internal func responseStreamDrained(_ status: GRPCStatus) {
     if self.context.eventLoop.inEventLoop {

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift

@@ -19,7 +19,7 @@ import NIOHPACK
 
 /// Async-await variant of `ServerStreamingCall`.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncServerStreamingCall<Request, Response> {
+public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
   private let call: Call<Request, Response>
   private let responseParts: StreamingResponseParts<Response>
   private let responseSource: PassthroughMessageSource<Response, Error>

+ 1 - 1
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift

@@ -22,7 +22,7 @@ import NIOHPACK
 /// Note: while this object is a `struct`, its implementation delegates to `Call`. It therefore
 /// has reference semantics.
 @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
-public struct GRPCAsyncUnaryCall<Request, Response> {
+public struct GRPCAsyncUnaryCall<Request: Sendable, Response: Sendable> {
   private let call: Call<Request, Response>
   private let responseParts: UnaryResponseParts<Response>
 

+ 32 - 8
Sources/GRPC/AsyncAwaitSupport/GRPCChannel+AsyncAwaitSupport.swift

@@ -26,7 +26,10 @@ extension GRPCChannel {
   ///   - request: The request to send.
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncUnaryCall<Request: Message, Response: Message>(
+  internal func makeAsyncUnaryCall<
+    Request: Message & Sendable,
+    Response: Message & Sendable
+  >(
     path: String,
     request: Request,
     callOptions: CallOptions,
@@ -50,7 +53,10 @@ extension GRPCChannel {
   ///   - request: The request to send.
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeAsyncUnaryCall<
+    Request: GRPCPayload & Sendable,
+    Response: GRPCPayload & Sendable
+  >(
     path: String,
     request: Request,
     callOptions: CallOptions,
@@ -73,7 +79,10 @@ extension GRPCChannel {
   ///   - path: Path of the RPC, e.g. "/echo.Echo/Get"
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncClientStreamingCall<Request: Message, Response: Message>(
+  internal func makeAsyncClientStreamingCall<
+    Request: Message & Sendable,
+    Response: Message & Sendable
+  >(
     path: String,
     callOptions: CallOptions,
     interceptors: [ClientInterceptor<Request, Response>] = []
@@ -94,7 +103,10 @@ extension GRPCChannel {
   ///   - path: Path of the RPC, e.g. "/echo.Echo/Get"
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeAsyncClientStreamingCall<
+    Request: GRPCPayload & Sendable,
+    Response: GRPCPayload & Sendable
+  >(
     path: String,
     callOptions: CallOptions,
     interceptors: [ClientInterceptor<Request, Response>] = []
@@ -116,7 +128,10 @@ extension GRPCChannel {
   ///   - request: The request to send.
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncServerStreamingCall<Request: Message, Response: Message>(
+  internal func makeAsyncServerStreamingCall<
+    Request: Message & Sendable,
+    Response: Message & Sendable
+  >(
     path: String,
     request: Request,
     callOptions: CallOptions,
@@ -140,7 +155,10 @@ extension GRPCChannel {
   ///   - request: The request to send.
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeAsyncServerStreamingCall<
+    Request: GRPCPayload & Sendable,
+    Response: GRPCPayload & Sendable
+  >(
     path: String,
     request: Request,
     callOptions: CallOptions,
@@ -163,7 +181,10 @@ extension GRPCChannel {
   ///   - path: Path of the RPC, e.g. "/echo.Echo/Get"
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncBidirectionalStreamingCall<Request: Message, Response: Message>(
+  internal func makeAsyncBidirectionalStreamingCall<
+    Request: Message & Sendable,
+    Response: Message & Sendable
+  >(
     path: String,
     callOptions: CallOptions,
     interceptors: [ClientInterceptor<Request, Response>] = []
@@ -184,7 +205,10 @@ extension GRPCChannel {
   ///   - path: Path of the RPC, e.g. "/echo.Echo/Get"
   ///   - callOptions: Options for the RPC.
   ///   - interceptors: A list of interceptors to intercept the request and response stream with.
-  internal func makeAsyncBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeAsyncBidirectionalStreamingCall<
+    Request: GRPCPayload & Sendable,
+    Response: GRPCPayload & Sendable
+  >(
     path: String,
     callOptions: CallOptions,
     interceptors: [ClientInterceptor<Request, Response>] = []

+ 21 - 0
Sources/GRPC/AsyncAwaitSupport/GRPCSendable.swift

@@ -0,0 +1,21 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if compiler(>=5.6)
+@preconcurrency public typealias GRPCSendable = Swift.Sendable
+#else
+public typealias GRPCSendable = Any
+#endif // compiler(>=5.6)

+ 2 - 2
Sources/GRPC/Compression/MessageEncoding.swift

@@ -15,9 +15,9 @@
  */
 
 /// Whether compression should be enabled for the message.
-public struct Compression: Hashable {
+public struct Compression: Hashable, GRPCSendable {
   @usableFromInline
-  internal enum _Wrapped: Hashable {
+  internal enum _Wrapped: Hashable, GRPCSendable {
     case enabled
     case disabled
     case deferToCallDefault

+ 9 - 4
Sources/GRPC/GRPCStatus.swift

@@ -13,13 +13,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import Foundation
 import NIOCore
 import NIOHTTP1
 import NIOHTTP2
 
 /// Encapsulates the result of a gRPC call.
-public struct GRPCStatus: Error {
+public struct GRPCStatus: Error, GRPCSendable {
   /// Storage for message/cause. In the happy case ('ok') there will not be a message or cause
   /// and this will reference a static storage containing nil values. Making it optional makes the
   /// setters for message and cause a little messy.
@@ -58,7 +57,7 @@ public struct GRPCStatus: Error {
   }
 
   // Backing storage for 'message' and 'cause'.
-  private final class Storage {
+  fileprivate final class Storage {
     // On many happy paths there will be no message or cause, so we'll use this shared reference
     // instead of allocating a new storage each time.
     //
@@ -146,7 +145,7 @@ extension GRPCStatus {
 extension GRPCStatus {
   /// Status codes for gRPC operations (replicated from `status_code_enum.h` in the
   /// [gRPC core library](https://github.com/grpc/grpc)).
-  public struct Code: Hashable, CustomStringConvertible {
+  public struct Code: Hashable, CustomStringConvertible, GRPCSendable {
     // `rawValue` must be an `Int` for API reasons and we don't need (or want) to store anything so
     // wide, a `UInt8` is fine.
     private let _rawValue: UInt8
@@ -317,6 +316,12 @@ extension GRPCStatus {
   }
 }
 
+#if compiler(>=5.6)
+// `GRPCStatus` has CoW semantics so it is inherently `Sendable`. Rather than marking `GRPCStatus`
+// as `@unchecked Sendable` we only mark `Storage` as such.
+extension GRPCStatus.Storage: @unchecked Sendable {}
+#endif // compiler(>=5.6)
+
 /// This protocol serves as a customisation point for error types so that gRPC calls may be
 /// terminated with an appropriate status.
 public protocol GRPCStatusTransformable: Error {

+ 1 - 1
Sources/GRPC/Interceptor/MessageParts.swift

@@ -60,7 +60,7 @@ public enum GRPCServerResponsePart<Response> {
 }
 
 /// Metadata associated with a request or response message.
-public struct MessageMetadata: Equatable {
+public struct MessageMetadata: Equatable, GRPCSendable {
   /// Whether the message should be compressed. If compression has not been enabled on the RPC
   /// then this setting is ignored.
   public var compress: Bool

+ 4 - 1
Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift

@@ -245,7 +245,10 @@ internal class AsyncWriterTests: GRPCTestCase {
   }
 }
 
-fileprivate final class CollectingDelegate<Element, End>: AsyncWriterDelegate {
+fileprivate final class CollectingDelegate<
+  Element: Sendable,
+  End: Sendable
+>: AsyncWriterDelegate, @unchecked Sendable {
   private let lock = Lock()
   private var _elements: [Element] = []
   private var _end: End?

+ 1 - 1
Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift

@@ -30,7 +30,7 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase {
       GRPCAsyncResponseStreamWriter<String>,
       GRPCAsyncServerCallContext
     ) async throws -> Void
-  ) -> AsyncServerHandler<StringSerializer, StringDeserializer> {
+  ) -> AsyncServerHandler<StringSerializer, StringDeserializer, String, String> {
     return AsyncServerHandler(
       context: self.makeCallHandlerContext(encoding: encoding),
       requestDeserializer: StringDeserializer(),