Browse Source

Scaffolding for server interceptors (#1024)

Motivation:

We need a bunch of scaffolding for server interceptors. This is it.
Most of it is lifted from client interceptors.

Modifications:

- Rename 'ClientParts.swift' to 'MessageParts.swift', pull out the
  client message metadata so that it may be shared with the server
  parts. Add server parts.
- Add a server interceptor, server interceptor context, server
  interceptor pipeline
- Fix up a few deprecation warnings following a rebase on main

Result:

Scaffolding for server interceptors.
George Barnett 5 years ago
parent
commit
fe52e588f4

+ 1 - 1
Sources/GRPC/Interceptor/ClientInterceptors.swift

@@ -45,7 +45,7 @@ import NIO
 /// Functions on `context` are not thread safe and **must** be called on the `EventLoop` found on
 /// the `context`. Since each interceptor is invoked on the same `EventLoop` this does not usually
 /// require any extra attention. However, if work is done on a `DispatchQueue` or _other_
-/// `EventLoop` then implementers should be ensure that they use `context` from the correct
+/// `EventLoop` then implementers should ensure that they use `context` from the correct
 /// `EventLoop`.
 open class ClientInterceptor<Request, Response> {
   public init() {}

+ 0 - 58
Sources/GRPC/Interceptor/ClientParts.swift

@@ -1,58 +0,0 @@
-/*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import NIOHPACK
-
-public enum ClientRequestPart<Request> {
-  /// User provided metadata sent at the start of the request stream.
-  case metadata(HPACKHeaders)
-
-  /// A message to send to the server.
-  case message(Request, Metadata)
-
-  /// End the request stream.
-  case end
-
-  /// Metadata associated with a request message.
-  public struct Metadata: Equatable {
-    /// Whether the message should be compressed. If compression has not been enabled on the RPC
-    /// then setting is ignored.
-    public var compress: Bool
-
-    /// Whether the underlying transported should be 'flushed' after writing this message. If a batch
-    /// of messages is to be sent then flushing only after the last message may improve
-    /// performance.
-    public var flush: Bool
-
-    public init(compress: Bool, flush: Bool) {
-      self.compress = compress
-      self.flush = flush
-    }
-  }
-}
-
-public enum ClientResponsePart<Response> {
-  /// The initial metadata returned by the server.
-  case metadata(HPACKHeaders)
-
-  /// A response message from the server.
-  case message(Response)
-
-  /// The end of response stream sent by the server.
-  case end(GRPCStatus, HPACKHeaders)
-
-  /// Error.
-  case error(Error)
-}

+ 80 - 0
Sources/GRPC/Interceptor/MessageParts.swift

@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import NIOHPACK
+
+public enum ClientRequestPart<Request> {
+  /// User provided metadata sent at the start of the request stream.
+  case metadata(HPACKHeaders)
+
+  /// A message to send to the server.
+  case message(Request, MessageMetadata)
+
+  /// The end the request stream.
+  case end
+}
+
+public enum ClientResponsePart<Response> {
+  /// The metadata returned by the server at the start of the RPC.
+  case metadata(HPACKHeaders)
+
+  /// A response message from the server.
+  case message(Response)
+
+  /// The end of response stream sent by the server.
+  case end(GRPCStatus, HPACKHeaders)
+
+  /// Error.
+  case error(Error)
+}
+
+public enum ServerRequestPart<Request> {
+  /// Metadata received from the client at the start of the RPC.
+  case metadata(HPACKHeaders)
+
+  /// A request message sent by the client.
+  case message(Request)
+
+  /// The end the request stream.
+  case end
+}
+
+public enum ServerResponsePart<Response> {
+  /// The metadata to send to the client at the start of the response stream.
+  case metadata(HPACKHeaders)
+
+  /// A response message sent by the server.
+  case message(Response, MessageMetadata)
+
+  /// The end of response stream sent by the server.
+  case end(GRPCStatus, HPACKHeaders)
+}
+
+/// Metadata associated with a request or response message.
+public struct MessageMetadata: Equatable {
+  /// Whether the message should be compressed. If compression has not been enabled on the RPC
+  /// then this setting is ignored.
+  public var compress: Bool
+
+  /// Whether the underlying transported should be 'flushed' after writing this message. If a batch
+  /// of messages is to be sent then flushing only after the last message may improve
+  /// performance.
+  public var flush: Bool
+
+  public init(compress: Bool, flush: Bool) {
+    self.compress = compress
+    self.flush = flush
+  }
+}

+ 107 - 0
Sources/GRPC/Interceptor/ServerInterceptorContext.swift

@@ -0,0 +1,107 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIO
+
+public struct ServerInterceptorContext<Request, Response> {
+  /// The interceptor this context is for.
+  internal let interceptor: AnyServerInterceptor<Request, Response>
+
+  /// The pipeline this context is associated with.
+  private let pipeline: ServerInterceptorPipeline<Request, Response>
+
+  /// The index of this context's interceptor within the pipeline.
+  private let index: Int
+
+  // The next context in the inbound direction, if one exists.
+  private var nextInbound: ServerInterceptorContext<Request, Response>? {
+    return self.pipeline.nextInboundContext(forIndex: self.index)
+  }
+
+  // The next context in the outbound direction, if one exists.
+  private var nextOutbound: ServerInterceptorContext<Request, Response>? {
+    return self.pipeline.nextOutboundContext(forIndex: self.index)
+  }
+
+  /// The `EventLoop` this interceptor pipeline is being executed on.
+  public var eventLoop: EventLoop {
+    return self.pipeline.eventLoop
+  }
+
+  /// A logger.
+  public var logger: Logger {
+    return self.pipeline.logger
+  }
+
+  /// The type of the RPC, e.g. "unary".
+  public var type: GRPCCallType {
+    return self.pipeline.type
+  }
+
+  /// The path of the RPC in the format "/Service/Method", e.g. "/echo.Echo/Get".
+  public var path: String {
+    return self.pipeline.path
+  }
+
+  /// Construct a `ServerInterceptorContext` for the interceptor at the given index within the
+  /// interceptor pipeline.
+  internal init(
+    for interceptor: AnyServerInterceptor<Request, Response>,
+    atIndex index: Int,
+    in pipeline: ServerInterceptorPipeline<Request, Response>
+  ) {
+    self.interceptor = interceptor
+    self.pipeline = pipeline
+    self.index = index
+  }
+
+  /// Forwards the request part to the next inbound interceptor in the pipeline, if there is one.
+  ///
+  /// - Parameter part: The request part to forward.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  public func receive(_ part: ServerRequestPart<Request>) {
+    self.nextInbound?.invokeReceive(part)
+  }
+
+  /// Forwards the response part to the next outbound interceptor in the pipeline, if there is one.
+  ///
+  /// - Parameters:
+  ///   - part: The response part to forward.
+  ///   - promise: The promise the complete when the part has been written.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  public func send(
+    _ part: ServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?
+  ) {
+    if let outbound = self.nextOutbound {
+      outbound.invokeSend(part, promise: promise)
+    } else {
+      promise?.fail(GRPCError.AlreadyComplete())
+    }
+  }
+}
+
+extension ServerInterceptorContext {
+  internal func invokeReceive(_ part: ServerRequestPart<Request>) {
+    self.eventLoop.assertInEventLoop()
+    self.interceptor.receive(part, context: self)
+  }
+
+  internal func invokeSend(_ part: ServerResponsePart<Response>, promise: EventLoopPromise<Void>?) {
+    self.eventLoop.assertInEventLoop()
+    self.interceptor.send(part, promise: promise, context: self)
+  }
+}

+ 106 - 0
Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift

@@ -0,0 +1,106 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Logging
+import NIO
+
+internal final class ServerInterceptorPipeline<Request, Response> {
+  /// The `EventLoop` this RPC is being executed on.
+  internal let eventLoop: EventLoop
+
+  /// The path of the RPC in the format "/Service/Method", e.g. "/echo.Echo/Get".
+  internal let path: String
+
+  /// The type of the RPC, e.g. "unary".
+  internal let type: GRPCCallType
+
+  /// A logger.
+  internal let logger: Logger
+
+  /// The contexts associated with the interceptors stored in this pipeline. Contexts will be
+  /// removed once the RPC has completed. Contexts are ordered from inbound to outbound, that is,
+  /// the head is first and the tail is last.
+  private var contexts: [ServerInterceptorContext<Request, Response>]?
+
+  /// Returns the next context in the outbound direction for the context at the given index, if one
+  /// exists.
+  /// - Parameter index: The index of the `ServerInterceptorContext` which is requesting the next
+  ///   outbound context.
+  /// - Returns: The `ServerInterceptorContext` or `nil` if one does not exist.
+  internal func nextOutboundContext(
+    forIndex index: Int
+  ) -> ServerInterceptorContext<Request, Response>? {
+    return self.context(atIndex: index - 1)
+  }
+
+  /// Returns the next context in the inbound direction for the context at the given index, if one
+  /// exists.
+  /// - Parameter index: The index of the `ServerInterceptorContext` which is requesting the next
+  ///   inbound context.
+  /// - Returns: The `ServerInterceptorContext` or `nil` if one does not exist.
+  internal func nextInboundContext(
+    forIndex index: Int
+  ) -> ServerInterceptorContext<Request, Response>? {
+    return self.context(atIndex: index + 1)
+  }
+
+  /// Returns the context for the given index, if one exists.
+  /// - Parameter index: The index of the `ServerInterceptorContext` to return.
+  /// - Returns: The `ServerInterceptorContext` or `nil` if one does not exist for the given index.
+  private func context(atIndex index: Int) -> ServerInterceptorContext<Request, Response>? {
+    return self.contexts?[checked: index]
+  }
+
+  /// The context closest to the `NIO.Channel`, i.e. where inbound events originate. This will be
+  /// `nil` once the RPC has completed.
+  internal var head: ServerInterceptorContext<Request, Response>? {
+    return self.contexts?.first
+  }
+
+  /// The context closest to the application, i.e. where outbound events originate. This will be
+  /// `nil` once the RPC has completed.
+  internal var tail: ServerInterceptorContext<Request, Response>? {
+    return self.contexts?.last
+  }
+
+  internal init() {
+    fatalError("Not implemented yet")
+  }
+
+  /// Emit a request part message into the interceptor pipeline.
+  ///
+  /// - Parameter part: The part to emit into the pipeline.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  internal func receive(_ part: ServerRequestPart<Request>) {
+    self.eventLoop.assertInEventLoop()
+    self.head?.invokeReceive(part)
+  }
+
+  /// Write a response message into the interceptor pipeline.
+  ///
+  /// - Parameters:
+  ///   - part: The response part to sent.
+  ///   - promise: A promise to complete when the response part has been successfully written.
+  /// - Important: This *must* to be called from the `eventLoop`.
+  internal func send(_ part: ServerResponsePart<Response>, promise: EventLoopPromise<Void>?) {
+    self.eventLoop.assertInEventLoop()
+
+    if let tail = self.tail {
+      tail.invokeSend(part, promise: promise)
+    } else {
+      promise?.fail(GRPCError.AlreadyComplete())
+    }
+  }
+}

+ 117 - 0
Sources/GRPC/Interceptor/ServerInterceptors.swift

@@ -0,0 +1,117 @@
+/*
+ * Copyright 2020, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import NIO
+
+/// A base class for server interceptors.
+///
+/// Interceptors allow request and response and response parts to be observed, mutated or dropped
+/// as necessary. The default behaviour for this base class is to forward any events to the next
+/// interceptor.
+///
+/// Interceptors may observe two different types of event:
+/// - receiving request parts with `receive(_:context:)`,
+/// - sending response parts with `send(_:promise:context:)`.
+///
+/// These events flow through a pipeline of interceptors for each RPC. Request parts will enter
+/// the head of the interceptor pipeline once the request router has determined that there is a
+/// service provider which is able to handle the request stream. Response parts from the service
+/// provider enter the tail of the interceptor pipeline and will be sent to the client after
+/// traversing the pipeline through to the head.
+///
+/// Each of the interceptor functions is provided with a `context` which exposes analogous functions
+/// (`receive(_:)` and `send(_:promise:)`) which may be called to forward events to the next
+/// interceptor.
+///
+/// ### Thread Safety
+///
+/// Functions on `context` are not thread safe and **must** be called on the `EventLoop` found on
+/// the `context`. Since each interceptor is invoked on the same `EventLoop` this does not usually
+/// require any extra attention. However, if work is done on a `DispatchQueue` or _other_
+/// `EventLoop` then implementers should ensure that they use `context` from the correct
+/// `EventLoop`.
+open class ServerInterceptor<Request, Response> {
+  public init() {}
+
+  /// Called when the interceptor has received a request part to handle.
+  /// - Parameters:
+  ///   - part: The request part which has been received from the client.
+  ///   - context: An interceptor context which may be used to forward the response part.
+  open func receive(
+    _ part: ServerRequestPart<Request>,
+    context: ServerInterceptorContext<Request, Response>
+  ) {
+    context.receive(part)
+  }
+
+  /// Called when the interceptor has received a response part to handle.
+  /// - Parameters:
+  ///   - part: The request part which should be sent to the client.
+  ///   - promise: A promise which should be completed when the response part has been written.
+  ///   - context: An interceptor context which may be used to forward the request part.
+  open func send(
+    _ part: ServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?,
+    context: ServerInterceptorContext<Request, Response>
+  ) {
+    context.send(part, promise: promise)
+  }
+}
+
+// MARK: - Any Interceptor
+
+/// A wrapping interceptor which delegates to the implementation of an underlying interceptor.
+internal struct AnyServerInterceptor<Request, Response> {
+  internal enum Implementation {
+    case base(ServerInterceptor<Request, Response>)
+  }
+
+  /// The underlying interceptor implementation.
+  internal let _implementation: Implementation
+
+  /// A user provided interceptor.
+  /// - Parameter interceptor: The interceptor to wrap.
+  /// - Returns: An `AnyServerInterceptor` which wraps `interceptor`.
+  internal static func userProvided(
+    _ interceptor: ServerInterceptor<Request, Response>
+  ) -> AnyServerInterceptor<Request, Response> {
+    return .init(.base(interceptor))
+  }
+
+  private init(_ implementation: Implementation) {
+    self._implementation = implementation
+  }
+
+  internal func receive(
+    _ part: ServerRequestPart<Request>,
+    context: ServerInterceptorContext<Request, Response>
+  ) {
+    switch self._implementation {
+    case let .base(handler):
+      handler.receive(part, context: context)
+    }
+  }
+
+  internal func send(
+    _ part: ServerResponsePart<Response>,
+    promise: EventLoopPromise<Void>?,
+    context: ServerInterceptorContext<Request, Response>
+  ) {
+    switch self._implementation {
+    case let .base(handler):
+      handler.send(part, promise: promise, context: context)
+    }
+  }
+}

+ 1 - 1
Tests/GRPCTests/ClientInterceptorPipelineTests.swift

@@ -341,7 +341,7 @@ extension ClientRequestPart {
     }
   }
 
-  var message: (Request, Metadata)? {
+  var message: (Request, MessageMetadata)? {
     switch self {
     case let .message(request, metadata):
       return (request, metadata)

+ 2 - 2
Tests/GRPCTests/InterceptorsTests.swift

@@ -128,13 +128,13 @@ class HelloWorldAuthProvider: Helloworld_GreeterProvider {
     context: StatusOnlyCallContext
   ) -> EventLoopFuture<Helloworld_HelloReply> {
     // TODO: do this in a server interceptor, when we have one.
-    if context.request.headers.first(name: "authorization") == "Magic" {
+    if context.headers.first(name: "authorization") == "Magic" {
       let response = Helloworld_HelloReply.with {
         $0.message = "Hello, \(request.name), you're authorized!"
       }
       return context.eventLoop.makeSucceededFuture(response)
     } else {
-      context.trailingMetadata.add(name: "www-authenticate", value: "Magic")
+      context.trailers.add(name: "www-authenticate", value: "Magic")
       return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unauthenticated, message: nil))
     }
   }