Browse Source

Server handler interceptor state machine (#1394)

Motivation:

The existing async server handler has a state machine which deals with
the interceptor pipeline and the lifecycle of user handler. These are
really two separate entities each with its own state machine.

This PR is the first in a series which will eventually update the async
server handler to make use of the two state machines.

Modifications:

- Add a 'ServerInterceptorStateMachine' and tests. This acts as a filter
  for inbound and outbound request and response messages, both as they
  enter and exit the interceptor pipeline. Message parts delievered in the
  wrong order lead to the RPC being cancelled, parts delievered after
  the RPC has finished lead to them being dropped. Cancelling the RPC
  informs the caller to nil out the associated interceptor pipeline.
- Tests
- This is not used anywhere (yet).

Result:

Interceptor state machine is in place.
George Barnett 3 years ago
parent
commit
49c3af7b47

+ 61 - 0
Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/ServerInterceptorStateMachine+Actions.swift

@@ -0,0 +1,61 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+extension ServerInterceptorStateMachine {
+  enum InterceptAction: Hashable {
+    /// Forward the message to the interceptor pipeline.
+    case intercept
+    /// Cancel the call.
+    case cancel
+    /// Drop the message.
+    case drop
+
+    init(from streamFilter: ServerInterceptorStateMachine.StreamFilter) {
+      switch streamFilter {
+      case .accept:
+        self = .intercept
+      case .reject:
+        self = .cancel
+      }
+    }
+  }
+
+  enum InterceptedAction: Hashable {
+    /// Forward the message to the network or user handler.
+    case forward
+    /// Cancel the call.
+    case cancel
+    /// Drop the message.
+    case drop
+
+    init(from streamFilter: ServerInterceptorStateMachine.StreamFilter) {
+      switch streamFilter {
+      case .accept:
+        self = .forward
+      case .reject:
+        self = .cancel
+      }
+    }
+  }
+
+  enum CancelAction: Hashable {
+    /// Nil out the interceptor pipeline.
+    case nilOutInterceptorPipeline
+    /// Do nothing.
+    case none
+  }
+}
+#endif // compiler(>=5.6)

+ 80 - 0
Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/ServerInterceptorStateMachine+Finished.swift

@@ -0,0 +1,80 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+extension ServerInterceptorStateMachine {
+  /// The 'Finished' state is, as the name suggests, a terminal state. Nothing can happen in this
+  /// state.
+  @usableFromInline
+  struct Finished {
+    typealias NextStateAndOutput<Output> =
+      ServerInterceptorStateMachine.NextStateAndOutput<Self.NextState, Output>
+
+    init(from state: ServerInterceptorStateMachine.Intercepting) {}
+
+    mutating func interceptRequestMetadata() -> Self.NextStateAndOutput<InterceptAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptRequestMessage() -> Self.NextStateAndOutput<InterceptAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptRequestEnd() -> Self.NextStateAndOutput<InterceptAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptedRequestMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptedRequestMessage() -> Self.NextStateAndOutput<InterceptedAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptedRequestEnd() -> Self.NextStateAndOutput<InterceptedAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptResponseMetadata() -> Self.NextStateAndOutput<InterceptAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptResponseMessage() -> Self.NextStateAndOutput<InterceptAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptResponseStatus() -> Self.NextStateAndOutput<InterceptAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptedResponseMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptedResponseMessage() -> Self.NextStateAndOutput<InterceptedAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func interceptedResponseStatus() -> Self.NextStateAndOutput<InterceptedAction> {
+      return .init(nextState: .finished(self), output: .drop)
+    }
+
+    mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
+      return .init(nextState: .finished(self), output: .nilOutInterceptorPipeline)
+    }
+  }
+}
+#endif // compiler(>=5.6)

+ 111 - 0
Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/ServerInterceptorStateMachine+Intercepting.swift

@@ -0,0 +1,111 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+extension ServerInterceptorStateMachine {
+  /// The 'Intercepting' state is responsible for validating that appropriate message parts are
+  /// forwarded to the interceptor pipeline and that messages parts which have been emitted from the
+  /// interceptors are valid to forward to either the network or the user handler (as interceptors
+  /// may emit new message parts).
+  ///
+  /// We only transition to the next state on `cancel` (which happens at the end of every RPC).
+  @usableFromInline
+  struct Intercepting {
+    typealias NextStateAndOutput<Output> =
+      ServerInterceptorStateMachine.NextStateAndOutput<Self.NextState, Output>
+
+    /// From the network into the interceptors.
+    private var requestStreamIn: InboundStreamState
+    /// From the interceptors out to the handler.
+    private var requestStreamOut: InboundStreamState
+
+    /// From the handler into the interceptors.
+    private var responseStreamIn: OutboundStreamState
+    /// From the interceptors out to the network.
+    private var responseStreamOut: OutboundStreamState
+
+    init() {
+      self.requestStreamIn = .idle
+      self.requestStreamOut = .idle
+      self.responseStreamIn = .idle
+      self.responseStreamOut = .idle
+    }
+
+    mutating func interceptRequestMetadata() -> Self.NextStateAndOutput<InterceptAction> {
+      let filter = self.requestStreamIn.receiveMetadata()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptRequestMessage() -> Self.NextStateAndOutput<InterceptAction> {
+      let filter = self.requestStreamIn.receiveMessage()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptRequestEnd() -> Self.NextStateAndOutput<InterceptAction> {
+      let filter = self.requestStreamIn.receiveEnd()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptedRequestMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
+      let filter = self.requestStreamOut.receiveMetadata()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptedRequestMessage() -> Self.NextStateAndOutput<InterceptedAction> {
+      let filter = self.requestStreamOut.receiveMessage()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptedRequestEnd() -> Self.NextStateAndOutput<InterceptedAction> {
+      let filter = self.requestStreamOut.receiveEnd()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptResponseMetadata() -> Self.NextStateAndOutput<InterceptAction> {
+      let filter = self.responseStreamIn.sendMetadata()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptResponseMessage() -> Self.NextStateAndOutput<InterceptAction> {
+      let filter = self.responseStreamIn.sendMessage()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptResponseStatus() -> Self.NextStateAndOutput<InterceptAction> {
+      let filter = self.responseStreamIn.sendEnd()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptedResponseMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
+      let filter = self.responseStreamOut.sendMetadata()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptedResponseMessage() -> Self.NextStateAndOutput<InterceptedAction> {
+      let filter = self.responseStreamOut.sendMessage()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func interceptedResponseStatus() -> Self.NextStateAndOutput<InterceptedAction> {
+      let filter = self.responseStreamOut.sendEnd()
+      return .init(nextState: .intercepting(self), output: .init(from: filter))
+    }
+
+    mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
+      return .init(nextState: .finished(from: self), output: .nilOutInterceptorPipeline)
+    }
+  }
+}
+#endif // compiler(>=5.6)

+ 256 - 0
Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/ServerInterceptorStateMachine.swift

@@ -0,0 +1,256 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+internal struct ServerInterceptorStateMachine {
+  private var state: Self.State
+
+  init() {
+    self.state = .intercepting(.init())
+  }
+
+  mutating func interceptRequestMetadata() -> InterceptAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptRequestMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptRequestMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptRequestMessage() -> InterceptAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptRequestMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptRequestMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptRequestEnd() -> InterceptAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptRequestEnd()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptRequestEnd()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptedRequestMetadata() -> InterceptedAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptedRequestMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptedRequestMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptedRequestMessage() -> InterceptedAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptedRequestMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptedRequestMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptedRequestEnd() -> InterceptedAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptedRequestEnd()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptedRequestEnd()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptResponseMetadata() -> InterceptAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptResponseMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptResponseMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptResponseMessage() -> InterceptAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptResponseMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptResponseMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptResponseStatus() -> InterceptAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptResponseStatus()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptResponseStatus()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptedResponseMetadata() -> InterceptedAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptedResponseMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptedResponseMetadata()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptedResponseMessage() -> InterceptedAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptedResponseMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptedResponseMessage()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func interceptedResponseStatus() -> InterceptedAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.interceptedResponseStatus()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.interceptedResponseStatus()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+
+  mutating func cancel() -> CancelAction {
+    switch self.state {
+    case var .intercepting(intercepting):
+      let nextStateAndOutput = intercepting.cancel()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    case var .finished(finished):
+      let nextStateAndOutput = finished.cancel()
+      self.state = nextStateAndOutput.nextState.state
+      return nextStateAndOutput.output
+    }
+  }
+}
+
+extension ServerInterceptorStateMachine {
+  /// The possible states the state machine may be in.
+  fileprivate enum State {
+    case intercepting(ServerInterceptorStateMachine.Intercepting)
+    case finished(ServerInterceptorStateMachine.Finished)
+  }
+}
+
+extension ServerInterceptorStateMachine {
+  /// The next state to transition to and any output which may be produced as a
+  /// result of a substate handling an action.
+  internal struct NextStateAndOutput<NextState, Output> {
+    internal var nextState: NextState
+    internal var output: Output
+
+    internal init(nextState: NextState, output: Output) {
+      self.nextState = nextState
+      self.output = output
+    }
+  }
+}
+
+extension ServerInterceptorStateMachine.NextStateAndOutput where Output == Void {
+  internal init(nextState: NextState) {
+    self.nextState = nextState
+    self.output = ()
+  }
+}
+
+extension ServerInterceptorStateMachine.Intercepting {
+  /// States which can be reached directly from 'Intercepting'.
+  internal struct NextState {
+    fileprivate let state: ServerInterceptorStateMachine.State
+
+    private init(_ state: ServerInterceptorStateMachine.State) {
+      self.state = state
+    }
+
+    internal static func intercepting(_ state: ServerInterceptorStateMachine.Intercepting) -> Self {
+      return Self(.intercepting(state))
+    }
+
+    internal static func finished(from: ServerInterceptorStateMachine.Intercepting) -> Self {
+      return Self(.finished(.init(from: from)))
+    }
+  }
+}
+
+extension ServerInterceptorStateMachine.Finished {
+  /// States which can be reached directly from 'Finished'.
+  internal struct NextState {
+    fileprivate let state: ServerInterceptorStateMachine.State
+
+    private init(_ state: ServerInterceptorStateMachine.State) {
+      self.state = state
+    }
+
+    internal static func finished(_ state: ServerInterceptorStateMachine.Finished) -> Self {
+      return Self(.finished(state))
+    }
+  }
+}
+#endif // compiler(>=5.6)

+ 93 - 0
Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/StreamState.swift

@@ -0,0 +1,93 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+extension ServerInterceptorStateMachine {
+  internal enum StreamFilter: Hashable {
+    case accept
+    case reject
+  }
+
+  internal enum InboundStreamState: Hashable {
+    case idle
+    case receivingMessages
+    case done
+
+    mutating func receiveMetadata() -> StreamFilter {
+      switch self {
+      case .idle:
+        self = .receivingMessages
+        return .accept
+      case .receivingMessages, .done:
+        return .reject
+      }
+    }
+
+    func receiveMessage() -> StreamFilter {
+      switch self {
+      case .receivingMessages:
+        return .accept
+      case .idle, .done:
+        return .reject
+      }
+    }
+
+    mutating func receiveEnd() -> StreamFilter {
+      switch self {
+      case .idle, .receivingMessages:
+        self = .done
+        return .accept
+      case .done:
+        return .reject
+      }
+    }
+  }
+
+  internal enum OutboundStreamState: Hashable {
+    case idle
+    case writingMessages
+    case done
+
+    mutating func sendMetadata() -> StreamFilter {
+      switch self {
+      case .idle:
+        self = .writingMessages
+        return .accept
+      case .writingMessages, .done:
+        return .reject
+      }
+    }
+
+    func sendMessage() -> StreamFilter {
+      switch self {
+      case .writingMessages:
+        return .accept
+      case .idle, .done:
+        return .reject
+      }
+    }
+
+    mutating func sendEnd() -> StreamFilter {
+      switch self {
+      case .idle, .writingMessages:
+        self = .done
+        return .accept
+      case .done:
+        return .reject
+      }
+    }
+  }
+}
+#endif // compiler(>=5.6)

+ 130 - 0
Tests/GRPCTests/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/ServerInterceptorStateMachineStreamStateTests.swift

@@ -0,0 +1,130 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+@testable import GRPC
+import XCTest
+
+internal final class ServerInterceptorStateMachineStreamStateTests: GRPCTestCase {
+  func testInboundStreamState_receiveMetadataWhileIdle() {
+    var state = ServerInterceptorStateMachine.InboundStreamState.idle
+    XCTAssertEqual(state.receiveMetadata(), .accept)
+    XCTAssertEqual(state, .receivingMessages)
+  }
+
+  func testInboundStreamState_receiveMessageWhileIdle() {
+    let state = ServerInterceptorStateMachine.InboundStreamState.idle
+    XCTAssertEqual(state.receiveMessage(), .reject)
+    XCTAssertEqual(state, .idle)
+  }
+
+  func testInboundStreamState_receiveEndWhileIdle() {
+    var state = ServerInterceptorStateMachine.InboundStreamState.idle
+    XCTAssertEqual(state.receiveEnd(), .accept)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testInboundStreamState_receiveMetadataWhileReceivingMessages() {
+    var state = ServerInterceptorStateMachine.InboundStreamState.receivingMessages
+    XCTAssertEqual(state.receiveMetadata(), .reject)
+    XCTAssertEqual(state, .receivingMessages)
+  }
+
+  func testInboundStreamState_receiveMessageWhileReceivingMessages() {
+    let state = ServerInterceptorStateMachine.InboundStreamState.receivingMessages
+    XCTAssertEqual(state.receiveMessage(), .accept)
+    XCTAssertEqual(state, .receivingMessages)
+  }
+
+  func testInboundStreamState_receiveEndWhileReceivingMessages() {
+    var state = ServerInterceptorStateMachine.InboundStreamState.receivingMessages
+    XCTAssertEqual(state.receiveEnd(), .accept)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testInboundStreamState_receiveMetadataWhileDone() {
+    var state = ServerInterceptorStateMachine.InboundStreamState.done
+    XCTAssertEqual(state.receiveMetadata(), .reject)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testInboundStreamState_receiveMessageWhileDone() {
+    let state = ServerInterceptorStateMachine.InboundStreamState.done
+    XCTAssertEqual(state.receiveMessage(), .reject)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testInboundStreamState_receiveEndWhileDone() {
+    var state = ServerInterceptorStateMachine.InboundStreamState.done
+    XCTAssertEqual(state.receiveEnd(), .reject)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testOutboundStreamState_sendMetadataWhileIdle() {
+    var state = ServerInterceptorStateMachine.OutboundStreamState.idle
+    XCTAssertEqual(state.sendMetadata(), .accept)
+    XCTAssertEqual(state, .writingMessages)
+  }
+
+  func testOutboundStreamState_sendMessageWhileIdle() {
+    let state = ServerInterceptorStateMachine.OutboundStreamState.idle
+    XCTAssertEqual(state.sendMessage(), .reject)
+    XCTAssertEqual(state, .idle)
+  }
+
+  func testOutboundStreamState_sendEndWhileIdle() {
+    var state = ServerInterceptorStateMachine.OutboundStreamState.idle
+    XCTAssertEqual(state.sendEnd(), .accept)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testOutboundStreamState_sendMetadataWhileReceivingMessages() {
+    var state = ServerInterceptorStateMachine.OutboundStreamState.writingMessages
+    XCTAssertEqual(state.sendMetadata(), .reject)
+    XCTAssertEqual(state, .writingMessages)
+  }
+
+  func testOutboundStreamState_sendMessageWhileReceivingMessages() {
+    let state = ServerInterceptorStateMachine.OutboundStreamState.writingMessages
+    XCTAssertEqual(state.sendMessage(), .accept)
+    XCTAssertEqual(state, .writingMessages)
+  }
+
+  func testOutboundStreamState_sendEndWhileReceivingMessages() {
+    var state = ServerInterceptorStateMachine.OutboundStreamState.writingMessages
+    XCTAssertEqual(state.sendEnd(), .accept)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testOutboundStreamState_sendMetadataWhileDone() {
+    var state = ServerInterceptorStateMachine.OutboundStreamState.done
+    XCTAssertEqual(state.sendMetadata(), .reject)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testOutboundStreamState_sendMessageWhileDone() {
+    let state = ServerInterceptorStateMachine.OutboundStreamState.done
+    XCTAssertEqual(state.sendMessage(), .reject)
+    XCTAssertEqual(state, .done)
+  }
+
+  func testOutboundStreamState_sendEndWhileDone() {
+    var state = ServerInterceptorStateMachine.OutboundStreamState.done
+    XCTAssertEqual(state.sendEnd(), .reject)
+    XCTAssertEqual(state, .done)
+  }
+}
+
+#endif // compiler(>=5.6)

+ 185 - 0
Tests/GRPCTests/AsyncAwaitSupport/AsyncServerHandler/ServerInterceptorStateMachine/ServerInterceptorStateMachineTests.swift

@@ -0,0 +1,185 @@
+/*
+ * Copyright 2022, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#if compiler(>=5.6)
+@testable import GRPC
+import NIOEmbedded
+import XCTest
+
+final class ServerInterceptorStateMachineTests: GRPCTestCase {
+  func testInterceptRequestMetadataWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptRequestMetadata().assertCancel() // Can't receive metadata twice.
+  }
+
+  func testInterceptRequestMessageWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMessage().assertCancel()
+  }
+
+  func testInterceptRequestEndWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestEnd().assertIntercept()
+    stateMachine.interceptRequestEnd().assertCancel() // Can't receive end twice.
+  }
+
+  func testInterceptedRequestMetadataWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+    stateMachine.interceptedRequestMetadata().assertCancel() // Can't intercept metadata twice.
+  }
+
+  func testInterceptedRequestMessageWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+    for _ in 0 ..< 100 {
+      stateMachine.interceptRequestMessage().assertIntercept()
+      stateMachine.interceptedRequestMessage().assertForward()
+    }
+  }
+
+  func testInterceptedRequestEndWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+    stateMachine.interceptRequestEnd().assertIntercept()
+    stateMachine.interceptedRequestEnd().assertForward()
+    stateMachine.interceptedRequestEnd().assertCancel() // Can't intercept end twice.
+  }
+
+  func testInterceptResponseMetadataWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+
+    stateMachine.interceptResponseMetadata().assertIntercept()
+    stateMachine.interceptResponseMetadata().assertCancel()
+  }
+
+  func testInterceptedResponseMetadataWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+
+    stateMachine.interceptResponseMetadata().assertIntercept()
+    stateMachine.interceptedResponseMetadata().assertForward()
+    stateMachine.interceptedResponseMetadata().assertCancel()
+  }
+
+  func testInterceptResponseMessageWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+
+    stateMachine.interceptResponseMetadata().assertIntercept()
+    stateMachine.interceptResponseMessage().assertIntercept()
+  }
+
+  func testInterceptedResponseMessageWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+
+    stateMachine.interceptResponseMetadata().assertIntercept()
+    stateMachine.interceptedResponseMetadata().assertForward()
+    stateMachine.interceptResponseMessage().assertIntercept()
+    stateMachine.interceptedResponseMessage().assertForward()
+    // Still fine: interceptor could insert extra message.
+    stateMachine.interceptedResponseMessage().assertForward()
+  }
+
+  func testInterceptResponseStatusWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+
+    stateMachine.interceptResponseMetadata().assertIntercept()
+    stateMachine.interceptResponseMessage().assertIntercept()
+    stateMachine.interceptResponseStatus().assertIntercept()
+
+    stateMachine.interceptResponseMessage().assertCancel()
+    stateMachine.interceptResponseStatus().assertCancel()
+  }
+
+  func testInterceptedResponseStatusWhenIntercepting() {
+    var stateMachine = ServerInterceptorStateMachine()
+    stateMachine.interceptRequestMetadata().assertIntercept()
+    stateMachine.interceptedRequestMetadata().assertForward()
+
+    stateMachine.interceptResponseMetadata().assertIntercept()
+    stateMachine.interceptedResponseMetadata().assertForward()
+    stateMachine.interceptResponseStatus().assertIntercept()
+    stateMachine.interceptedResponseStatus().assertForward()
+  }
+
+  func testAllOperationsDropWhenFinished() {
+    var stateMachine = ServerInterceptorStateMachine()
+    // Get to the finished state.
+    stateMachine.cancel().assertNilOutInterceptorPipeline()
+
+    stateMachine.interceptRequestMetadata().assertDrop()
+    stateMachine.interceptedRequestMetadata().assertDrop()
+    stateMachine.interceptRequestMessage().assertDrop()
+    stateMachine.interceptedRequestMessage().assertDrop()
+    stateMachine.interceptRequestEnd().assertDrop()
+    stateMachine.interceptedRequestEnd().assertDrop()
+
+    stateMachine.interceptResponseMetadata().assertDrop()
+    stateMachine.interceptedResponseMetadata().assertDrop()
+    stateMachine.interceptResponseMessage().assertDrop()
+    stateMachine.interceptedResponseMessage().assertDrop()
+    stateMachine.interceptResponseStatus().assertDrop()
+    stateMachine.interceptedResponseStatus().assertDrop()
+  }
+}
+
+extension ServerInterceptorStateMachine.InterceptAction {
+  func assertIntercept() {
+    XCTAssertEqual(self, .intercept)
+  }
+
+  func assertCancel() {
+    XCTAssertEqual(self, .cancel)
+  }
+
+  func assertDrop() {
+    XCTAssertEqual(self, .drop)
+  }
+}
+
+extension ServerInterceptorStateMachine.InterceptedAction {
+  func assertForward() {
+    XCTAssertEqual(self, .forward)
+  }
+
+  func assertCancel() {
+    XCTAssertEqual(self, .cancel)
+  }
+
+  func assertDrop() {
+    XCTAssertEqual(self, .drop)
+  }
+}
+
+extension ServerInterceptorStateMachine.CancelAction {
+  func assertNilOutInterceptorPipeline() {
+    XCTAssertEqual(self, .nilOutInterceptorPipeline)
+  }
+}
+
+#endif // compiler(>=5.6)