/* * Copyright 2019, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import Foundation import SwiftProtobuf import NIO import NIOHTTP1 /// Provides a means for decoding incoming gRPC messages into protobuf objects. /// /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses. public class BaseCallHandler: GRPCCallHandler { public func makeGRPCServerCodec() -> ChannelHandler { return GRPCServerCodec() } /// Called whenever a message has been received. /// /// Overridden by subclasses. public func processMessage(_ message: RequestMessage) throws { fatalError("needs to be overridden") } /// Needs to be implemented by this class so that subclasses can override it. /// /// Otherwise, the subclass's implementation will simply never be called (probably because the protocol's default /// implementation in an extension is being used instead). public func handlerAdded(context: ChannelHandlerContext) { } /// Called when the client has half-closed the stream, indicating that they won't send any further data. /// /// Overridden by subclasses if the "end-of-stream" event is relevant. public func endOfStreamReceived() throws { } /// Whether this handler can still write messages to the client. private var serverCanWrite = true /// Called for each error received in `errorCaught(context:error:)`. private weak var errorDelegate: ServerErrorDelegate? public init(errorDelegate: ServerErrorDelegate?) { self.errorDelegate = errorDelegate } /// Sends an error status to the client while ensuring that all call context promises are fulfilled. /// Because only the concrete call subclass knows which promises need to be fulfilled, this method needs to be overridden. func sendErrorStatus(_ status: GRPCStatus) { fatalError("needs to be overridden") } } extension BaseCallHandler: ChannelInboundHandler { public typealias InboundIn = GRPCServerRequestPart /// Passes errors to the user-provided `errorHandler`. After an error has been received an /// appropriate status is written. Errors which don't conform to `GRPCStatusTransformable` /// return a status with code `.internalError`. public func errorCaught(context: ChannelHandlerContext, error: Error) { errorDelegate?.observeLibraryError(error) let status = errorDelegate?.transformLibraryError(error) ?? (error as? GRPCStatusTransformable)?.asGRPCStatus() ?? .processingError sendErrorStatus(status) } public func channelRead(context: ChannelHandlerContext, data: NIOAny) { switch self.unwrapInboundIn(data) { case .head(let requestHead): // Head should have been handled by `GRPCChannelHandler`. self.errorCaught(context: context, error: GRPCError.server(.invalidState("unexpected request head received \(requestHead)"))) case .message(let message): do { try processMessage(message) } catch { self.errorCaught(context: context, error: error) } case .end: do { try endOfStreamReceived() } catch { self.errorCaught(context: context, error: error) } } } } extension BaseCallHandler: ChannelOutboundHandler { public typealias OutboundIn = GRPCServerResponsePart public typealias OutboundOut = GRPCServerResponsePart public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { guard serverCanWrite else { promise?.fail(GRPCError.server(.serverNotWritable)) return } // We can only write one status; make sure we don't write again. if case .status = unwrapOutboundIn(data) { serverCanWrite = false context.writeAndFlush(data, promise: promise) } else { context.write(data, promise: promise) } } }