Browse Source

Tidy up some of the logic in the server call contexts (#1018)

Motivation:

The server call contexts rely on the service provider completing a
promise, either for the response or status depending on the streaming
type. Doing so then writes the relevant parts into the pipeline.
However, we also have to deal with the promises being failed, and since
the logic there is a little circuitous the callback chains were a little
hard to follow and had some redundancy.

Modifications:

- Add a 'processError(_:delegate:)` to the base server call context to
  transform an error into a status and trailers.
- Refactor the promise completion chains for the unary and streaming
  response call contexts.

Result:

- Code is easier to follow. Less duplication.
George Barnett 5 năm trước cách đây
mục cha
commit
6e9fe81720

+ 37 - 0
Sources/GRPC/ServerCallContexts/ServerCallContext.swift

@@ -60,4 +60,41 @@ open class ServerCallContextBase: ServerCallContext {
     self.headers = HPACKHeaders(httpHeaders: request.headers, normalizeHTTPHeaders: false)
     self.logger = logger
   }
+
+  /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
+  internal func processError(
+    _ error: Error,
+    delegate: ServerErrorDelegate?
+  ) -> (GRPCStatus, HPACKHeaders) {
+    // Observe the error if we have a delegate.
+    delegate?.observeRequestHandlerError(error, headers: self.headers)
+
+    // What status are we terminating this RPC with?
+    // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
+    //   them with any on the call context.
+    // - If we don't have a delegate, then try to transform the error to a status.
+    // - Fallback to a generic error.
+    let status: GRPCStatus
+    let trailers: HPACKHeaders
+
+    if let transformed = delegate?.transformRequestHandlerError(error, headers: self.headers) {
+      status = transformed.status
+      if var transformedTrailers = transformed.trailers {
+        // The delegate returned trailers: merge in those from the context as well.
+        transformedTrailers.add(contentsOf: self.trailers)
+        trailers = transformedTrailers
+      } else {
+        trailers = self.trailers
+      }
+    } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
+      status = grpcStatusTransformable.makeGRPCStatus()
+      trailers = self.trailers
+    } else {
+      // Eh... well, we don't what status to use. Use a generic one.
+      status = .processingError
+      trailers = self.trailers
+    }
+
+    return (status, trailers)
+  }
 }

+ 15 - 33
Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift

@@ -75,43 +75,25 @@ open class StreamingResponseCallContextImpl<ResponsePayload>: StreamingResponseC
     self.channel = channel
     super.init(eventLoop: channel.eventLoop, headers: headers, logger: logger)
 
-    statusPromise.futureResult
-      .map {
-        GRPCStatusAndTrailers(status: $0, trailers: nil)
-      }
-      // Ensure that any error provided can be transformed to `GRPCStatus`, using "internal server error" as a fallback.
-      .recover { [weak errorDelegate] error in
-        errorDelegate?.observeRequestHandlerError(error, headers: headers)
-
-        if let transformed = errorDelegate?.transformRequestHandlerError(
-          error,
-          headers: headers
-        ) {
-          return transformed
-        }
-
-        if let grpcStatusTransformable = error as? GRPCStatusTransformable {
-          return GRPCStatusAndTrailers(
-            status: grpcStatusTransformable.makeGRPCStatus(),
-            trailers: nil
-          )
-        }
-
-        return GRPCStatusAndTrailers(status: .processingError, trailers: nil)
-      }
-      // Finish the call by returning the final status.
-      .whenSuccess { statusAndMetadata in
-        if let trailers = statusAndMetadata.trailers {
-          self.trailers.add(contentsOf: trailers)
-        }
+    self.statusPromise.futureResult.whenComplete { result in
+      switch result {
+      case let .success(status):
         self.channel.writeAndFlush(
-          NIOAny(
-            WrappedResponse
-              .statusAndTrailers(statusAndMetadata.status, self.trailers)
-          ),
+          self.wrap(.statusAndTrailers(status, self.trailers)),
           promise: nil
         )
+
+      case let .failure(error):
+        let (status, trailers) = self.processError(error, delegate: errorDelegate)
+        self.channel.writeAndFlush(self.wrap(.statusAndTrailers(status, trailers)), promise: nil)
       }
+    }
+  }
+
+  /// Wrap the response part in a `NIOAny`. This is useful in order to avoid explicitly spelling
+  /// out `NIOAny(WrappedResponse(...))`.
+  private func wrap(_ response: WrappedResponse) -> NIOAny {
+    return NIOAny(response)
   }
 
   @available(*, deprecated, renamed: "init(channel:headers:errorDelegate:logger:)")

+ 33 - 42
Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift

@@ -92,49 +92,40 @@ open class UnaryResponseCallContextImpl<ResponsePayload>: UnaryResponseCallConte
     self.channel = channel
     super.init(eventLoop: channel.eventLoop, headers: headers, logger: logger)
 
-    self.responsePromise.futureResult
-      .whenComplete { [self, weak errorDelegate] result in
-        let statusAndMetadata: GRPCStatusAndTrailers
-
-        switch result {
-        case let .success(responseMessage):
-          self.channel.write(
-            NIOAny(
-              WrappedResponse
-                .message(.init(responseMessage, compressed: self.compressionEnabled))
-            ),
-            promise: nil
-          )
-          statusAndMetadata = GRPCStatusAndTrailers(status: self.responseStatus, trailers: nil)
-        case let .failure(error):
-          errorDelegate?.observeRequestHandlerError(error, headers: headers)
-
-          if let transformed: GRPCStatusAndTrailers = errorDelegate?.transformRequestHandlerError(
-            error,
-            headers: headers
-          ) {
-            statusAndMetadata = transformed
-          } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
-            statusAndMetadata = GRPCStatusAndTrailers(
-              status: grpcStatusTransformable.makeGRPCStatus(),
-              trailers: nil
-            )
-          } else {
-            statusAndMetadata = GRPCStatusAndTrailers(status: .processingError, trailers: nil)
-          }
-        }
-
-        if let trailers = statusAndMetadata.trailers {
-          self.trailers.add(contentsOf: trailers)
-        }
-        self.channel.writeAndFlush(
-          NIOAny(
-            WrappedResponse
-              .statusAndTrailers(statusAndMetadata.status, self.trailers)
-          ),
-          promise: nil
-        )
+    self.responsePromise.futureResult.whenComplete { [self, weak errorDelegate] result in
+      switch result {
+      case let .success(message):
+        self.handleResponse(message)
+
+      case let .failure(error):
+        self.handleError(error, delegate: errorDelegate)
       }
+    }
+  }
+
+  /// Handle the response from the service provider.
+  private func handleResponse(_ response: ResponsePayload) {
+    self.channel.write(
+      self.wrap(.message(.init(response, compressed: self.compressionEnabled))),
+      promise: nil
+    )
+
+    self.channel.writeAndFlush(
+      self.wrap(.statusAndTrailers(self.responseStatus, self.trailers)),
+      promise: nil
+    )
+  }
+
+  /// Handle an error from the service provider.
+  private func handleError(_ error: Error, delegate: ServerErrorDelegate?) {
+    let (status, trailers) = self.processError(error, delegate: delegate)
+    self.channel.writeAndFlush(self.wrap(.statusAndTrailers(status, trailers)), promise: nil)
+  }
+
+  /// Wrap the response part in a `NIOAny`. This is useful in order to avoid explicitly spelling
+  /// out `NIOAny(WrappedResponse(...))`.
+  private func wrap(_ response: WrappedResponse) -> NIOAny {
+    return NIOAny(response)
   }
 
   @available(*, deprecated, renamed: "init(channel:headers:errorDelegate:logger:)")