Browse Source

Simplify client transport actions and state representation (#1080)

Motivation:

The representation of state and any outcomes as a result of changing
state in 'ClientTransport' is generic over the request and response
types. This turns out to be a non-trivial performance hit.

Modifications:

- Split up state actions into per-function actions
- Move state out of 'ClientTransport'

Result:

- A 5% decrease in instruction for the 'unary_10k_small_requests'
  benchmark (note that this change is client only and yet the benchmark
  includes instructions for client and server).
George Barnett 5 years ago
parent
commit
ea9c7c5ce7
1 changed files with 268 additions and 263 deletions
  1. 268 263
      Sources/GRPC/Interceptor/ClientTransport.swift

+ 268 - 263
Sources/GRPC/Interceptor/ClientTransport.swift

@@ -41,7 +41,7 @@ internal final class ClientTransport<Request, Response> {
   internal let eventLoop: EventLoop
 
   /// The current state of the transport.
-  private var state: State = .idle
+  private var state: ClientTransportState = .idle
 
   /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
   /// and fail it when we transition to `closed`.
@@ -86,6 +86,9 @@ internal final class ClientTransport<Request, Response> {
   @usableFromInline
   internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
 
+  /// The 'ChannelHandlerContext'.
+  private var context: ChannelHandlerContext?
+
   /// Our current state as logging metadata.
   private var stateForLogging: Logger.MetadataValue {
     if self.state.mayBuffer {
@@ -124,7 +127,9 @@ internal final class ClientTransport<Request, Response> {
   /// - Important: This *must* to be called from the `eventLoop`.
   internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
     self.eventLoop.assertInEventLoop()
-    self.act(on: self.state.configureTransport(with: configurator))
+    if self.state.configureTransport() {
+      self.configure(using: configurator)
+    }
   }
 
   /// Send a request part – via the interceptor pipeline – to the server.
@@ -167,7 +172,18 @@ internal final class ClientTransport<Request, Response> {
       self.channelPromise = promise
 
       // Ask the state machine if we can have it.
-      self.act(on: self.state.getChannel())
+      switch self.state.getChannel() {
+      case .succeed:
+        if let channel = self.context?.channel {
+          promise.succeed(channel)
+        }
+
+      case .fail:
+        promise.fail(GRPCError.AlreadyComplete())
+
+      case .doNothing:
+        ()
+      }
 
       return promise.futureResult
     }
@@ -187,7 +203,14 @@ extension ClientTransport {
     promise: EventLoopPromise<Void>?
   ) {
     self.eventLoop.assertInEventLoop()
-    self.act(on: self.state.send(part, promise: promise))
+    switch self.state.send() {
+    case .writeToBuffer:
+      self.buffer(part, promise: promise)
+    case .writeToChannel:
+      self.write(part, promise: promise, flush: self.shouldFlush(after: part))
+    case .alreadyComplete:
+      promise?.fail(GRPCError.AlreadyComplete())
+    }
   }
 
   /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
@@ -195,7 +218,17 @@ extension ClientTransport {
   /// - Important: This *must* to be called from the `eventLoop`.
   private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
     self.eventLoop.assertInEventLoop()
-    self.act(on: self.state.cancel(promise: promise))
+
+    if self.state.cancel() {
+      let error = GRPCError.RPCCancelledByClient()
+      self.forwardErrorToInterceptors(error)
+      self.failBufferedWrites(with: error)
+      self.context?.channel.close(mode: .all, promise: nil)
+      self.channelPromise?.fail(error)
+      promise?.succeed(())
+    } else {
+      promise?.fail(GRPCError.AlreadyComplete())
+    }
   }
 }
 
@@ -208,16 +241,34 @@ extension ClientTransport: ChannelInboundHandler {
   @usableFromInline
   typealias OutboundOut = _GRPCClientRequestPart<Request>
 
+  @usableFromInline
+  func handlerAdded(context: ChannelHandlerContext) {
+    self.context = context
+  }
+
   @usableFromInline
   internal func handlerRemoved(context: ChannelHandlerContext) {
     self.eventLoop.assertInEventLoop()
+    self.context = nil
     // Break the reference cycle.
     self._pipeline = nil
   }
 
   internal func channelError(_ error: Error) {
     self.eventLoop.assertInEventLoop()
-    self.act(on: self.state.channelError(error))
+
+    switch self.state.channelError() {
+    case .doNothing:
+      ()
+    case .propagateError:
+      self.forwardErrorToInterceptors(error)
+      self.failBufferedWrites(with: error)
+
+    case .propagateErrorAndClose:
+      self.forwardErrorToInterceptors(error)
+      self.failBufferedWrites(with: error)
+      self.context?.close(mode: .all, promise: nil)
+    }
   }
 
   @usableFromInline
@@ -229,177 +280,175 @@ extension ClientTransport: ChannelInboundHandler {
   internal func channelActive(context: ChannelHandlerContext) {
     self.eventLoop.assertInEventLoop()
     self.logger.debug("activated stream channel", source: "GRPC")
-    self.act(on: self.state.channelActive(context: context))
+    if self.state.channelActive() {
+      self.unbuffer(to: context.channel)
+    } else {
+      context.close(mode: .all, promise: nil)
+    }
   }
 
   @usableFromInline
   internal func channelInactive(context: ChannelHandlerContext) {
     self.eventLoop.assertInEventLoop()
-    self.act(on: self.state.channelInactive(context: context))
+
+    switch self.state.channelInactive() {
+    case .doNothing:
+      ()
+
+    case .tearDown:
+      let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
+      self.forwardErrorToInterceptors(status)
+      self.failBufferedWrites(with: status)
+      self.channelPromise?.fail(status)
+
+    case .failChannelPromise:
+      self.channelPromise?.fail(GRPCError.AlreadyComplete())
+    }
   }
 
   @usableFromInline
   internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
     self.eventLoop.assertInEventLoop()
     let part = self.unwrapInboundIn(data)
-    self.act(on: self.state.channelRead(part))
-    // (We're the end of the channel. No need to forward anything.)
-  }
-}
-
-// MARK: - State Handling
 
-extension ClientTransport {
-  fileprivate enum State {
-    /// Idle. We're waiting for the RPC to be configured.
-    ///
-    /// Valid transitions:
-    /// - `awaitingTransport` (the transport is being configured)
-    /// - `closed` (the RPC cancels)
-    case idle
-
-    /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
-    /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
-    /// transport in this state is an error.
-    ///
-    /// Valid transitions:
-    /// - `activatingTransport` (the channel becomes active)
-    /// - `closing` (the RPC cancels)
-    /// - `closed` (the channel fails to become active)
-    case awaitingTransport
-
-    /// The transport is active but we're unbuffering any requests to write on that transport.
-    /// We'll continue buffering in this state. Receiving messages from the transport in this state
-    /// is okay.
-    ///
-    /// Valid transitions:
-    /// - `active` (we finish unbuffering)
-    /// - `closing` (the RPC cancels, the channel encounters an error)
-    /// - `closed` (the channel becomes inactive)
-    case activatingTransport(Channel)
-
-    /// Fully active. An RPC is in progress and is communicating over an active transport.
-    ///
-    /// Valid transitions:
-    /// - `closing` (the RPC cancels, the channel encounters an error)
-    /// - `closed` (the channel becomes inactive)
-    case active(Channel)
-
-    /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
-    /// become inactive yet.
-    ///
-    /// Valid transitions:
-    /// - `closed` (the channel becomes inactive)
-    case closing
-
-    /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
-    /// be ignored.
-    ///
-    /// Valid transitions:
-    /// - none: this state is terminal.
-    case closed
-
-    /// Whether writes may be unbuffered in this state.
-    internal var isUnbuffering: Bool {
-      switch self {
-      case .activatingTransport:
-        return true
-      case .idle, .awaitingTransport, .active, .closing, .closed:
-        return false
-      }
+    let isEnd: Bool
+    switch part {
+    case .initialMetadata, .message, .trailingMetadata:
+      isEnd = false
+    case .status:
+      isEnd = true
     }
 
-    /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
-    internal var mayBuffer: Bool {
-      switch self {
-      case .idle, .activatingTransport, .awaitingTransport:
-        return true
-      case .active, .closing, .closed:
-        return false
-      }
+    if self.state.channelRead(isEnd: isEnd) {
+      self.forwardToInterceptors(part)
     }
+
+    // (We're the end of the channel. No need to forward anything.)
   }
 }
 
-extension ClientTransport.State {
-  /// Actions which should be performed as a result telling the state machine something changed.
-  fileprivate enum Action {
-    /// Do nothing.
-    case none
-
-    /// Configure a `Channel` with the configurator.
-    case configure(with: (ChannelHandler) -> EventLoopFuture<Void>)
-
-    /// Append the request part and promise to the write buffer.
-    case buffer(GRPCClientRequestPart<Request>, EventLoopPromise<Void>?)
-
-    /// Write - and flush if necessary – any request parts in the buffer to the `Channel`.
-    case unbufferToChannel(Channel)
-
-    /// Fail any buffered writes with the error.
-    case failBufferedWrites(with: Error)
-
-    /// Write the given operation to the channel.
-    case writeToChannel(Channel, GRPCClientRequestPart<Request>, EventLoopPromise<Void>?)
-
-    /// Write the response part to the RPC.
-    case forwardToInterceptors(_GRPCClientResponsePart<Response>)
-
-    /// Fail the RPC with the given error. This includes failing any outstanding writes.
-    case forwardErrorToInterceptors(Error)
-
-    /// Close the given channel.
-    case close(Channel)
-
-    /// Fail the given promise with the error provided.
-    case completePromise(EventLoopPromise<Void>?, with: Result<Void, Error>)
+// MARK: - State Handling
 
-    /// Complete the lazy channel promise with this result.
-    case completeChannelPromise(with: Result<Channel, Error>)
+private enum ClientTransportState {
+  /// Idle. We're waiting for the RPC to be configured.
+  ///
+  /// Valid transitions:
+  /// - `awaitingTransport` (the transport is being configured)
+  /// - `closed` (the RPC cancels)
+  case idle
+
+  /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
+  /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
+  /// transport in this state is an error.
+  ///
+  /// Valid transitions:
+  /// - `activatingTransport` (the channel becomes active)
+  /// - `closing` (the RPC cancels)
+  /// - `closed` (the channel fails to become active)
+  case awaitingTransport
+
+  /// The transport is active but we're unbuffering any requests to write on that transport.
+  /// We'll continue buffering in this state. Receiving messages from the transport in this state
+  /// is okay.
+  ///
+  /// Valid transitions:
+  /// - `active` (we finish unbuffering)
+  /// - `closing` (the RPC cancels, the channel encounters an error)
+  /// - `closed` (the channel becomes inactive)
+  case activatingTransport
+
+  /// Fully active. An RPC is in progress and is communicating over an active transport.
+  ///
+  /// Valid transitions:
+  /// - `closing` (the RPC cancels, the channel encounters an error)
+  /// - `closed` (the channel becomes inactive)
+  case active
+
+  /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
+  /// become inactive yet.
+  ///
+  /// Valid transitions:
+  /// - `closed` (the channel becomes inactive)
+  case closing
+
+  /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
+  /// be ignored.
+  ///
+  /// Valid transitions:
+  /// - none: this state is terminal.
+  case closed
+
+  /// Whether writes may be unbuffered in this state.
+  internal var isUnbuffering: Bool {
+    switch self {
+    case .activatingTransport:
+      return true
+    case .idle, .awaitingTransport, .active, .closing, .closed:
+      return false
+    }
+  }
 
-    /// Perform multiple actions.
-    indirect case multiple([Action])
+  /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
+  internal var mayBuffer: Bool {
+    switch self {
+    case .idle, .activatingTransport, .awaitingTransport:
+      return true
+    case .active, .closing, .closed:
+      return false
+    }
   }
 }
 
-extension ClientTransport.State {
-  /// The caller would like to configure the transport.
-  mutating func configureTransport(
-    with configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>
-  ) -> Action {
+extension ClientTransportState {
+  /// The caller would like to configure the transport. Returns a boolean indicating whether we
+  /// should configure it or not.
+  mutating func configureTransport() -> Bool {
     switch self {
     // We're idle until we configure. Anything else is just a repeat request to configure.
     case .idle:
       self = .awaitingTransport
-      return .configure(with: configurator)
+      return true
 
     case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
-      return .none
+      return false
     }
   }
 
+  enum SendAction {
+    /// Write the request into the buffer.
+    case writeToBuffer
+    /// Write the request into the channel.
+    case writeToChannel
+    /// The RPC has already completed, fail any promise associated with the write.
+    case alreadyComplete
+  }
+
   /// The pipeline would like to send a request part to the transport.
-  mutating func send(
-    _ part: GRPCClientRequestPart<Request>,
-    promise: EventLoopPromise<Void>?
-  ) -> Action {
+  mutating func send() -> SendAction {
     switch self {
     // We don't have any transport yet, just buffer the part.
     case .idle, .awaitingTransport, .activatingTransport:
-      return .buffer(part, promise)
+      return .writeToBuffer
 
     // We have a `Channel`, we can pipe the write straight through.
-    case let .active(channel):
-      return .writeToChannel(channel, part, promise)
+    case .active:
+      return .writeToChannel
 
     // The transport is going or has gone away. Fail the promise.
     case .closing, .closed:
-      return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
+      return .alreadyComplete
     }
   }
 
+  enum UnbufferedAction {
+    /// Nothing needs to be done.
+    case doNothing
+    /// Succeed the channel promise associated with the transport.
+    case succeedChannelPromise
+  }
+
   /// We finished dealing with the buffered writes.
-  mutating func unbuffered() -> Action {
+  mutating func unbuffered() -> UnbufferedAction {
     switch self {
     // These can't happen since we only begin unbuffering when we transition to
     // '.activatingTransport', which must come after these two states..
@@ -408,9 +457,9 @@ extension ClientTransport.State {
 
     // We dealt with any buffered writes. We can become active now. This is the only way to become
     // active.
-    case let .activatingTransport(channel):
-      self = .active(channel)
-      return .completeChannelPromise(with: .success(channel))
+    case .activatingTransport:
+      self = .active
+      return .succeedChannelPromise
 
     case .active:
       preconditionFailure("Unbuffering completed but the transport is already active")
@@ -418,91 +467,79 @@ extension ClientTransport.State {
     // Something caused us to close while unbuffering, that's okay, we won't take any further
     // action.
     case .closing, .closed:
-      return .none
+      return .doNothing
     }
   }
 
-  /// Cancel the RPC and associated `Channel`, if possible.
-  mutating func cancel(promise: EventLoopPromise<Void>?) -> Action {
+  /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
+  /// cancellation can go ahead (and also whether the channel should be torn down).
+  mutating func cancel() -> Bool {
     switch self {
     case .idle:
       // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
       // we're done, fail any writes, and then deal with the cancellation promise.
       self = .closed
-      let error = GRPCError.RPCCancelledByClient().captureContext()
-      return .multiple([
-        .forwardErrorToInterceptors(error),
-        .failBufferedWrites(with: error.error),
-        .completePromise(promise, with: .success(())),
-        .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete())),
-      ])
+      return true
 
     case .awaitingTransport:
       // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
       // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
       // the `Channel` becoming active (see `channelActive(context:)`).
       self = .closing
-      let error = GRPCError.RPCCancelledByClient().captureContext()
-      return .multiple([
-        .forwardErrorToInterceptors(error),
-        .failBufferedWrites(with: error.error),
-        .completePromise(promise, with: .success(())),
-      ])
-
-    case let .activatingTransport(channel):
+      return true
+
+    case .activatingTransport:
       // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
       // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
-      // any buffered writes and then complete the cancellatiion promise.
+      // any buffered writes and then complete the cancellation promise.
       self = .closing
-      let error = GRPCError.RPCCancelledByClient().captureContext()
-      return .multiple([
-        .forwardErrorToInterceptors(error),
-        .close(channel),
-        .failBufferedWrites(with: error.error),
-        .completePromise(promise, with: .success(())),
-      ])
-
-    case let .active(channel):
+      return true
+
+    case .active:
       // The RPC and channel are up and running. We'll fail the RPC and close the channel.
       self = .closing
-      let error = GRPCError.RPCCancelledByClient().captureContext()
-      return .multiple([
-        .forwardErrorToInterceptors(error),
-        .close(channel),
-        .completePromise(promise, with: .success(())),
-      ])
+      return true
 
     case .closing, .closed:
       // We're already closing or closing. The cancellation is too late.
-      return .completePromise(promise, with: .failure(GRPCError.AlreadyComplete()))
+      return false
     }
   }
 
   /// `channelActive` was invoked on the transport by the `Channel`.
-  mutating func channelActive(context: ChannelHandlerContext) -> Action {
+  mutating func channelActive() -> Bool {
     // The channel has become active: what now?
     switch self {
     case .idle:
       preconditionFailure("Can't activate an idle transport")
 
     case .awaitingTransport:
-      self = .activatingTransport(context.channel)
-      return .unbufferToChannel(context.channel)
+      self = .activatingTransport
+      return true
 
     case .activatingTransport, .active:
       preconditionFailure("Invalid state: stream is already active")
 
     case .closing:
       // We remain in closing: we only transition to closed on 'channelInactive'.
-      return .close(context.channel)
+      return false
 
     case .closed:
       preconditionFailure("Invalid state: stream is already inactive")
     }
   }
 
+  enum ChannelInactiveAction {
+    /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
+    case tearDown
+    /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
+    case failChannelPromise
+    /// Do nothing.
+    case doNothing
+  }
+
   /// `channelInactive` was invoked on the transport by the `Channel`.
-  mutating func channelInactive(context: ChannelHandlerContext) -> Action {
+  mutating func channelInactive() -> ChannelInactiveAction {
     switch self {
     case .idle:
       // We can't become inactive before we've requested a `Channel`.
@@ -513,26 +550,22 @@ extension ClientTransport.State {
       // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
       // synthesize an error status to fail the RPC with.
       self = .closed
-      let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
-      return .multiple([
-        .forwardErrorToInterceptors(status),
-        .failBufferedWrites(with: status),
-        .completeChannelPromise(with: .failure(status)),
-      ])
+      return .tearDown
 
     case .closing:
       // We were already closing, now we're fully closed.
       self = .closed
-      return .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete()))
+      return .failChannelPromise
 
     case .closed:
       // We're already closed.
-      return .none
+      return .doNothing
     }
   }
 
-  /// `channelRead` was invoked on the transport by the `Channel`.
-  mutating func channelRead(_ part: _GRPCClientResponsePart<Response>) -> Action {
+  /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
+  /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
+  mutating func channelRead(isEnd: Bool) -> Bool {
     switch self {
     case .idle, .awaitingTransport:
       // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
@@ -541,23 +574,28 @@ extension ClientTransport.State {
     case .activatingTransport, .active:
       // We have an active `Channel`, we can forward the request part but we may need to start
       // closing if we see the status, since it indicates the call is terminating.
-      switch part {
-      case .initialMetadata, .message, .trailingMetadata:
-        ()
-      case .status:
-        // The status is the final part of the RPC. We will become inactive soon.
+      if isEnd {
         self = .closing
       }
-      return .forwardToInterceptors(part)
+      return true
 
     case .closing, .closed:
-      // We closed early, ignore are reads.
-      return .none
+      // We closed early, ignore any reads.
+      return false
     }
   }
 
+  enum ChannelErrorAction {
+    /// Propagate the error to the interceptor pipeline and fail any buffered writes.
+    case propagateError
+    /// As above, but close the 'Channel' as well.
+    case propagateErrorAndClose
+    /// No action is required.
+    case doNothing
+  }
+
   /// We received an error from the `Channel`.
-  mutating func channelError(_ error: Error) -> Action {
+  mutating func channelError() -> ChannelErrorAction {
     switch self {
     case .idle:
       // The `Channel` can't error if it doesn't exist.
@@ -567,47 +605,49 @@ extension ClientTransport.State {
       // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
       // buffered writes along the way.
       self = .closing
-      return .multiple([
-        .forwardErrorToInterceptors(error),
-        .failBufferedWrites(with: error),
-      ])
+      return .propagateError
 
-    case let .activatingTransport(channel),
-         let .active(channel):
+    case .activatingTransport,
+         .active:
       // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
       self = .closing
-      return .multiple([
-        .forwardErrorToInterceptors(error),
-        .failBufferedWrites(with: error),
-        .close(channel),
-      ])
+      return .propagateErrorAndClose
 
     case .closing, .closed:
       // We're already closing/closed, we can ignore this.
-      return .none
+      return .doNothing
     }
   }
 
+  enum GetChannelAction {
+    /// No action is required.
+    case doNothing
+    /// Succeed the Channel promise.
+    case succeed
+    /// Fail the 'Channel' promise, the RPC is already complete.
+    case fail
+  }
+
   /// The caller has asked for the underlying `Channel`.
-  mutating func getChannel() -> Action {
+  mutating func getChannel() -> GetChannelAction {
     switch self {
     case .idle, .awaitingTransport, .activatingTransport:
       // Do nothing, we'll complete the promise when we become active or closed.
-      return .none
+      return .doNothing
 
-    case let .active(channel):
+    case .active:
       // We're already active, so there was no promise to succeed when we made this transition. We
       // can complete it now.
-      return .completeChannelPromise(with: .success(channel))
+      return .succeed
 
     case .closing:
       // We'll complete the promise when we transition to closed.
-      return .none
+      return .doNothing
 
     case .closed:
       // We're already closed; there was no promise to fail when we made this transition. We can go
       // ahead and fail it now though.
-      return .completeChannelPromise(with: .failure(GRPCError.AlreadyComplete()))
+      return .fail
     }
   }
 }
@@ -615,50 +655,6 @@ extension ClientTransport.State {
 // MARK: - State Actions
 
 extension ClientTransport {
-  /// Act on the action which resulted from prodding the state machine.
-  /// - Parameter action: The action to act on.
-  private func act(on action: State.Action) {
-    switch action {
-    case .none:
-      ()
-
-    case let .configure(configurator):
-      self.configure(using: configurator)
-
-    case let .buffer(part, promise):
-      self.buffer(part, promise: promise)
-
-    case let .unbufferToChannel(channel):
-      self.unbuffer(to: channel)
-
-    case let .failBufferedWrites(with: error):
-      self.failBufferedWrites(with: error)
-
-    case let .writeToChannel(channel, part, promise):
-      self.write(part, to: channel, promise: promise, flush: self.shouldFlush(after: part))
-
-    case let .forwardToInterceptors(response: part):
-      self.forwardToInterceptors(part)
-
-    case let .forwardErrorToInterceptors(error: error):
-      self.forwardErrorToInterceptors(error)
-
-    case let .completePromise(promise, result):
-      promise?.completeWith(result)
-
-    case let .completeChannelPromise(result):
-      self.channelPromise?.completeWith(result)
-
-    case let .close(channel):
-      channel.close(mode: .all, promise: nil)
-
-    case let .multiple(actions):
-      for action in actions {
-        self.act(on: action)
-      }
-    }
-  }
-
   /// Configures this transport with the `configurator`.
   private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
     configurator(self).whenFailure { error in
@@ -710,7 +706,7 @@ extension ClientTransport {
           shouldFlush = self.shouldFlush(after: write.request)
         }
 
-        self.write(write.request, to: channel, promise: write.promise, flush: false)
+        self.write(write.request, promise: write.promise, flush: false)
       }
 
       // Okay, flush now.
@@ -731,7 +727,12 @@ extension ClientTransport {
     }
 
     // We're unbuffered. What now?
-    self.act(on: self.state.unbuffered())
+    switch self.state.unbuffered() {
+    case .doNothing:
+      ()
+    case .succeedChannelPromise:
+      self.channelPromise?.succeed(channel)
+    }
   }
 
   /// Fails any promises that come with buffered writes with `error`.
@@ -754,25 +755,29 @@ extension ClientTransport {
   ///   - flush: Whether to flush the `Channel` after writing.
   private func write(
     _ part: GRPCClientRequestPart<Request>,
-    to channel: Channel,
     promise: EventLoopPromise<Void>?,
     flush: Bool
   ) {
+    guard let context = self.context else {
+      promise?.fail(GRPCError.AlreadyComplete())
+      return
+    }
+
     switch part {
     case let .metadata(headers):
       let head = self.makeRequestHead(with: headers)
-      channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
+      context.channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
 
     case let .message(request, metadata):
       let message = _MessageContext<Request>(request, compressed: metadata.compress)
-      channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
+      context.channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
 
     case .end:
-      channel.write(self.wrapOutboundOut(.end), promise: promise)
+      context.channel.write(self.wrapOutboundOut(.end), promise: promise)
     }
 
     if flush {
-      channel.flush()
+      context.channel.flush()
     }
   }