فهرست منبع

Rename the client interceptor methods (#1019)

Motivation:

The gRPC API usually refers to sending and receiving messages. The
interceptor API uses 'write' and 'read' which are a little closer to the
NIO API. We should bias to being more consistent with terms already used
in the API.

Modifications:

- Rename 'write' to 'send', 'read' to 'receive'
- Make the 'ClientInterceptor' methods 'open'
- 'ClientInterceptorContext' methods delegate to a private
  implementation, this was done as the public methods ensured we were on
  the 'EventLoop' first. This is no longer the case so the indirection
  has been removed.

Result:

- Better naming
George Barnett 5 سال پیش
والد
کامیت
c200c23f4d

+ 16 - 31
Sources/GRPC/Interceptor/ClientInterceptorContext.swift

@@ -73,8 +73,9 @@ public struct ClientInterceptorContext<Request, Response> {
   ///
   /// - Parameter part: The response part to forward.
   /// - Important: This *must* to be called from the `eventLoop`.
-  public func read(_ part: ClientResponsePart<Response>) {
-    self._read(part)
+  public func receive(_ part: ClientResponsePart<Response>) {
+    self.eventLoop.assertInEventLoop()
+    self.nextInbound?.invokeReceive(part)
   }
 
   /// Forwards the request part to the next outbound interceptor in the pipeline, if there is one.
@@ -83,42 +84,24 @@ public struct ClientInterceptorContext<Request, Response> {
   ///   - part: The request part to forward.
   ///   - promise: The promise the complete when the part has been written.
   /// - Important: This *must* to be called from the `eventLoop`.
-  public func write(
-    _ part: ClientRequestPart<Request>,
-    promise: EventLoopPromise<Void>?
-  ) {
-    self._write(part, promise: promise)
-  }
-
-  /// Forwards a request to cancel the RPC to the next outbound interceptor in the pipeline.
-  ///
-  /// - Parameter promise: The promise to complete with the outcome of the cancellation request.
-  /// - Important: This *must* to be called from the `eventLoop`.
-  public func cancel(promise: EventLoopPromise<Void>?) {
-    self._cancel(promise: promise)
-  }
-}
-
-extension ClientInterceptorContext {
-  private func _read(_ part: ClientResponsePart<Response>) {
-    self.eventLoop.assertInEventLoop()
-    self.nextInbound?.invokeRead(part)
-  }
-
-  private func _write(
+  public func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?
   ) {
     self.eventLoop.assertInEventLoop()
 
     if let outbound = self.nextOutbound {
-      outbound.invokeWrite(part, promise: promise)
+      outbound.invokeSend(part, promise: promise)
     } else {
       promise?.fail(GRPCStatus(code: .unavailable, message: "The RPC has already completed"))
     }
   }
 
-  private func _cancel(promise: EventLoopPromise<Void>?) {
+  /// Forwards a request to cancel the RPC to the next outbound interceptor in the pipeline.
+  ///
+  /// - Parameter promise: The promise to complete with the outcome of the cancellation request.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  public func cancel(promise: EventLoopPromise<Void>?) {
     self.eventLoop.assertInEventLoop()
 
     if let outbound = self.nextOutbound {
@@ -128,16 +111,18 @@ extension ClientInterceptorContext {
       promise?.succeed(())
     }
   }
+}
 
-  internal func invokeRead(_ part: ClientResponsePart<Response>) {
+extension ClientInterceptorContext {
+  internal func invokeReceive(_ part: ClientResponsePart<Response>) {
     self.eventLoop.assertInEventLoop()
-    self.interceptor.read(part, context: self)
+    self.interceptor.receive(part, context: self)
   }
 
   @inlinable
-  internal func invokeWrite(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
+  internal func invokeSend(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
     self.eventLoop.assertInEventLoop()
-    self.interceptor.write(part, promise: promise, context: self)
+    self.interceptor.send(part, promise: promise, context: self)
   }
 
   internal func invokeCancel(promise: EventLoopPromise<Void>?) {

+ 8 - 8
Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

@@ -29,7 +29,7 @@ import NIOHTTP2
 /// ┌───────────────────────────────────────────────────────────────────┐
 /// │                                Call                               │
 /// └────────────────────────────────────────────────────────┬──────────┘
-///                                                          │ write(_:promise) /
+///                                                          │ send(_:promise) /
 ///                                                          │ cancel(promise:)
 /// ┌────────────────────────────────────────────────────────▼──────────┐
 /// │                         InterceptorPipeline            ╎          │
@@ -49,9 +49,9 @@ import NIOHTTP2
 /// │ ┌────────┴─────────────────────────────────────────────▼────────┐ │
 /// │ │          Head Interceptor (interacts with transport)          │ │
 /// │ └────────▲─────────────────────────────────────────────┬────────┘ │
-/// │  read(_:)╎                                             │          │
+/// │           receive(_:)                                 │          │
 /// └──────────▲─────────────────────────────────────────────┼──────────┘
-///    read(_:)│                                             │ write(_:promise:) /
+///            │ receive(_:)                                 │ send(_:promise:) /
 ///            │                                             │ cancel(promise:)
 /// ┌──────────┴─────────────────────────────────────────────▼──────────┐
 /// │                           ClientTransport                         │
@@ -177,9 +177,9 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ///
   /// - Parameter part: The part to emit into the pipeline.
   /// - Important: This *must* to be called from the `eventLoop`.
-  internal func read(_ part: ClientResponsePart<Response>) {
+  internal func receive(_ part: ClientResponsePart<Response>) {
     self.eventLoop.assertInEventLoop()
-    self._head?.invokeRead(part)
+    self._head?.invokeReceive(part)
   }
 
   /// Writes a request message into the interceptor pipeline.
@@ -191,11 +191,11 @@ internal final class ClientInterceptorPipeline<Request, Response> {
   ///   - promise: A promise to complete when the request part has been successfully written.
   /// - Important: This *must* to be called from the `eventLoop`.
   @inlinable
-  internal func write(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
+  internal func send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
     self.eventLoop.assertInEventLoop()
 
     if let tail = self._tail {
-      tail.invokeWrite(part, promise: promise)
+      tail.invokeSend(part, promise: promise)
     } else {
       promise?.fail(GRPCError.AlreadyComplete())
     }
@@ -268,7 +268,7 @@ extension ClientInterceptorPipeline {
     self.scheduledClose = self.eventLoop.scheduleTask(deadline: deadline) {
       // When the error hits the tail we'll call 'close()', this will cancel the transport if
       // necessary.
-      self.read(.error(GRPCError.RPCTimedOut(timeLimit)))
+      self.receive(.error(GRPCError.RPCTimedOut(timeLimit)))
     }
   }
 }

+ 2 - 2
Sources/GRPC/Interceptor/ClientInterceptorProtocol.swift

@@ -20,13 +20,13 @@ internal protocol ClientInterceptorProtocol {
   associatedtype Response
 
   /// Called when the interceptor has received a response part to handle.
-  func read(
+  func receive(
     _ part: ClientResponsePart<Response>,
     context: ClientInterceptorContext<Request, Response>
   )
 
   /// Called when the interceptor has received a request part to handle.
-  func write(
+  func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<Request, Response>

+ 29 - 26
Sources/GRPC/Interceptor/ClientInterceptors.swift

@@ -22,22 +22,23 @@ import NIO
 /// interceptor.
 ///
 /// Interceptors may observe three different types of event:
-/// - reading response parts with `read(_:context:)`,
-/// - writing request parts with `write(_:promise:context:)`, and
+/// - receiving response parts with `receive(_:context:)`,
+/// - sending request parts with `send(_:promise:context:)`, and
 /// - RPC cancellation with `cancel(context:)`.
 ///
 /// These events flow through a pipeline of interceptors for each RPC. Request parts sent from the
 /// call object (such as `UnaryCall` and `BidirectionalStreamingCall`) will traverse the pipeline
-/// from its tail via `write(_:context:)` eventually reaching the head of the pipeline where it will
+/// from its tail via `send(_:context:)` eventually reaching the head of the pipeline where it will
 /// be sent sent to the server.
 ///
 /// Response parts, or errors, received from the transport fill be fired back through the
-/// interceptor pipeline via `read(_:context:)`. Note that the `end` and `error` response parts are
-/// terminal: the pipeline will be torn down once these parts reach the the tail of the pipeline.
+/// interceptor pipeline via `receive(_:context:)`. Note that the `end` and `error` response parts
+/// are terminal: the pipeline will be torn down once these parts reach the the tail of the
+/// pipeline.
 ///
-/// Each of interceptor functions is provided with a `context` which exposes analogous functions
-/// (`read(_:)`, `write(_:promise:)`, and `cancel(promise:)`) which may be called to forward events
-/// to the next interceptor.
+/// Each of the interceptor functions is provided with a `context` which exposes analogous functions
+/// (`receive(_:)`, `send(_:promise:)`, and `cancel(promise:)`) which may be called to forward
+/// events to the next interceptor.
 ///
 /// ### Thread Safety
 ///
@@ -47,15 +48,17 @@ import NIO
 /// `EventLoop` then implementers should be ensure that they use `context` from the correct
 /// `EventLoop`.
 open class ClientInterceptor<Request, Response> {
+  public init() {}
+
   /// Called when the interceptor has received a response part to handle.
   /// - Parameters:
   ///   - part: The response part which has been received from the server.
   ///   - context: An interceptor context which may be used to forward the response part.
-  public func read(
+  open func receive(
     _ part: ClientResponsePart<Response>,
     context: ClientInterceptorContext<Request, Response>
   ) {
-    context.read(part)
+    context.receive(part)
   }
 
   /// Called when the interceptor has received a request part to handle.
@@ -63,12 +66,12 @@ open class ClientInterceptor<Request, Response> {
   ///   - part: The request part which should be sent to the server.
   ///   - promise: A promise which should be completed when the response part has been handled.
   ///   - context: An interceptor context which may be used to forward the request part.
-  public func write(
+  open func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<Request, Response>
   ) {
-    context.write(part, promise: promise)
+    context.send(part, promise: promise)
   }
 
   /// Called when the interceptor has received a request to cancel the RPC.
@@ -105,7 +108,7 @@ internal struct HeadClientInterceptor<Request, Response>: ClientInterceptorProto
   }
 
   @inlinable
-  internal func write(
+  internal func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<Request, Response>
@@ -120,11 +123,11 @@ internal struct HeadClientInterceptor<Request, Response>: ClientInterceptorProto
     self.onCancel(promise)
   }
 
-  internal func read(
+  internal func receive(
     _ part: ClientResponsePart<Response>,
     context: ClientInterceptorContext<Request, Response>
   ) {
-    context.read(part)
+    context.receive(part)
   }
 }
 
@@ -153,7 +156,7 @@ internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProto
     self.onResponsePart = onResponsePart
   }
 
-  internal func read(
+  internal func receive(
     _ part: ClientResponsePart<Response>,
     context: ClientInterceptorContext<Request, Response>
   ) {
@@ -196,12 +199,12 @@ internal struct TailClientInterceptor<Request, Response>: ClientInterceptorProto
   }
 
   @inlinable
-  internal func write(
+  internal func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<Request, Response>
   ) {
-    context.write(part, promise: promise)
+    context.send(part, promise: promise)
   }
 
   internal func cancel(
@@ -265,33 +268,33 @@ internal struct AnyClientInterceptor<Request, Response>: ClientInterceptorProtoc
     self._implementation = implementation
   }
 
-  internal func read(
+  internal func receive(
     _ part: ClientResponsePart<Response>,
     context: ClientInterceptorContext<Request, Response>
   ) {
     switch self._implementation {
     case let .head(handler):
-      handler.read(part, context: context)
+      handler.receive(part, context: context)
     case let .tail(handler):
-      handler.read(part, context: context)
+      handler.receive(part, context: context)
     case let .base(handler):
-      handler.read(part, context: context)
+      handler.receive(part, context: context)
     }
   }
 
   @inlinable
-  internal func write(
+  internal func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<Request, Response>
   ) {
     switch self._implementation {
     case let .head(handler):
-      handler.write(part, promise: promise, context: context)
+      handler.send(part, promise: promise, context: context)
     case let .tail(handler):
-      handler.write(part, promise: promise, context: context)
+      handler.send(part, promise: promise, context: context)
     case let .base(handler):
-      handler.write(part, promise: promise, context: context)
+      handler.send(part, promise: promise, context: context)
     }
   }
 

+ 5 - 5
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -134,7 +134,7 @@ internal final class ClientTransport<Request, Response> {
   internal func send(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
     self.eventLoop.assertInEventLoop()
     if let pipeline = self._pipeline {
-      pipeline.write(part, promise: promise)
+      pipeline.send(part, promise: promise)
     } else {
       promise?.fail(GRPCError.AlreadyComplete())
     }
@@ -779,10 +779,10 @@ extension ClientTransport {
   private func forwardToInterceptors(_ part: _GRPCClientResponsePart<Response>) {
     switch part {
     case let .initialMetadata(metadata):
-      self._pipeline?.read(.metadata(metadata))
+      self._pipeline?.receive(.metadata(metadata))
 
     case let .message(context):
-      self._pipeline?.read(.message(context.message))
+      self._pipeline?.receive(.message(context.message))
 
     case let .trailingMetadata(trailers):
       // The `Channel` delivers trailers and `GRPCStatus`, we want to emit them together in the
@@ -792,14 +792,14 @@ extension ClientTransport {
     case let .status(status):
       let trailers = self.trailers ?? [:]
       self.trailers = nil
-      self._pipeline?.read(.end(status, trailers))
+      self._pipeline?.receive(.end(status, trailers))
     }
   }
 
   /// Forward the error to the interceptor pipeline.
   /// - Parameter error: The error to forward.
   private func forwardErrorToInterceptors(_ error: Error) {
-    self._pipeline?.read(.error(error))
+    self._pipeline?.receive(.error(error))
   }
 }
 

+ 21 - 21
Tests/GRPCTests/ClientInterceptorPipelineTests.swift

@@ -73,9 +73,9 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     )
 
     // Write some request parts.
-    pipeline.write(.metadata([:]), promise: nil)
-    pipeline.write(.message("foo", .init(compress: false, flush: false)), promise: nil)
-    pipeline.write(.end, promise: nil)
+    pipeline.send(.metadata([:]), promise: nil)
+    pipeline.send(.message("foo", .init(compress: false, flush: false)), promise: nil)
+    pipeline.send(.end, promise: nil)
 
     XCTAssertEqual(requestParts.count, 3)
     XCTAssertEqual(requestParts[0].metadata, [:])
@@ -85,9 +85,9 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     XCTAssertTrue(requestParts[2].isEnd)
 
     // Write some responses parts.
-    pipeline.read(.metadata([:]))
-    pipeline.read(.message("bar"))
-    pipeline.read(.end(.ok, [:]))
+    pipeline.receive(.metadata([:]))
+    pipeline.receive(.message("bar"))
+    pipeline.receive(.end(.ok, [:]))
 
     XCTAssertEqual(responseParts.count, 3)
     XCTAssertEqual(responseParts[0].metadata, [:])
@@ -111,11 +111,11 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
 
     // Fire an error; this should close the pipeline.
     struct DummyError: Error {}
-    pipeline.read(.error(DummyError()))
+    pipeline.receive(.error(DummyError()))
 
     // We're closed, writes should fail.
     let writePromise = pipeline.eventLoop.makePromise(of: Void.self)
-    pipeline.write(.end, promise: writePromise)
+    pipeline.send(.end, promise: writePromise)
     XCTAssertThrowsError(try writePromise.futureResult.wait())
 
     // As should cancellation.
@@ -124,7 +124,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     XCTAssertThrowsError(try cancelPromise.futureResult.wait())
 
     // And reads should be ignored. (We only expect errors in the response handler.)
-    pipeline.read(.metadata([:]))
+    pipeline.receive(.metadata([:]))
   }
 
   func testPipelineWithTimeout() throws {
@@ -173,7 +173,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
 
     // Pipeline should be torn down. Writes and cancellation should fail.
     let p1 = pipeline.eventLoop.makePromise(of: Void.self)
-    pipeline.write(.end, promise: p1)
+    pipeline.send(.end, promise: p1)
     assertThat(try p1.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
 
     let p2 = pipeline.eventLoop.makePromise(of: Void.self)
@@ -181,7 +181,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     assertThat(try p2.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
 
     // Reads should be ignored too. (We'll fail in `onRequestPart` if this goes through.)
-    pipeline.read(.metadata([:]))
+    pipeline.receive(.metadata([:]))
   }
 
   func testTimeoutIsCancelledOnCompletion() throws {
@@ -208,7 +208,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
     )
 
     // Read the end part.
-    pipeline.read(.end(.ok, [:]))
+    pipeline.receive(.end(.ok, [:]))
     // Just a single cancellation.
     assertThat(cancellations, .is(1))
 
@@ -228,7 +228,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
       onResponsePart: { _ in }
     )
 
-    pipeline.write(.message("foo", .init(compress: false, flush: false)), promise: nil)
+    pipeline.send(.message("foo", .init(compress: false, flush: false)), promise: nil)
     XCTAssertEqual(recorder.requestParts.count, 1)
     let (message, _) = try assertNotNil(recorder.requestParts[0].message)
     XCTAssertEqual(message, "oof")
@@ -270,7 +270,7 @@ class ClientInterceptorPipelineTests: GRPCTestCase {
         onRequestPart: { _, _ in },
         onResponsePart: { _ in }
       )
-      pipeline.read(.error(error))
+      pipeline.receive(.error(error))
     }
 
     let invalidState = GRPCError.InvalidState("invalid state")
@@ -295,36 +295,36 @@ class RecordingInterceptor<Request, Response>: ClientInterceptor<Request, Respon
   var requestParts: [ClientRequestPart<Request>] = []
   var responseParts: [ClientResponsePart<Response>] = []
 
-  override func write(
+  override func send(
     _ part: ClientRequestPart<Request>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<Request, Response>
   ) {
     self.requestParts.append(part)
-    context.write(part, promise: promise)
+    context.send(part, promise: promise)
   }
 
-  override func read(
+  override func receive(
     _ part: ClientResponsePart<Response>,
     context: ClientInterceptorContext<Request, Response>
   ) {
     self.responseParts.append(part)
-    context.read(part)
+    context.receive(part)
   }
 }
 
 /// An interceptor which reverses string request messages.
 class StringRequestReverser: ClientInterceptor<String, String> {
-  override func write(
+  override func send(
     _ part: ClientRequestPart<String>,
     promise: EventLoopPromise<Void>?,
     context: ClientInterceptorContext<String, String>
   ) {
     switch part {
     case let .message(value, metadata):
-      context.write(.message(String(value.reversed()), metadata), promise: promise)
+      context.send(.message(String(value.reversed()), metadata), promise: promise)
     default:
-      context.write(part, promise: promise)
+      context.send(part, promise: promise)
     }
   }
 }