Browse Source

Reorder some of the server logic. (#717)

* Reorder some of the server logic.

Motivation:

Recently we introduced the `GRPCPayload` protocol to abstract over the
request and response payload types so that we can support message types
other than Protobuf (such as Flatbuffers).

However, in doing so we removed our previous assumption that the payload
would (de)serialize from and to a `Data`. To avoid the needless copying
back and forth the payload writing (i.e. length prefixing) was moved to
the server codec (where serializion happens). This however requires the
type of the message to be known and therefore must happen after the
pipeline has been configured for the RPC in question.

In order to support compression on the server the message readers and
writers must be aware of the encoding being used and require access to
the headers. This happens (/happened) in the HTTP1ToRawGRPCServerCodec
before the RPC is configured. It would be possible to pass this through
with a user-event or extra data in the message type but would result in
fairly difficult to follow code with state spread across multiple
handlers.

Modifications:

Reorder the server channel operations.

- Previously the server channel did:
  1. verify content-type, handle message framing (`HTTP1ToRawGRPCServerCodec`)
  2. requests are routed (`GRPCChannelHandler`)
  3. requests are converted to typed gRPC messages (`GRPCServerCodec`)
  4. requests are passed to user handlers
- Now the server does:
  1. requests are routed and validated as gRPC
     (`GRPCServerRequestRoutingHandler`)
  2. requests are converted from HTTP/1, message framing is handler, messages
     are converted typed gRPC messages (`HTTP1ToGRPCServerCodec`)
  3. requests are passed to user handlers

As such:
- `GRPCChannelHandler` was renamed `GRPCServerRequestRoutingHandler`
- `GRPCServerRequestRoutingHandler` now reads `HTTPServerRequestPart` and
  writes `HTTPServerResponsePart`
- `GRPCServerRequestRoutingHandler` verifies the content-type and then
  constructs the pipeline
- `HTTP1ToRawGRPCServerCodec` is renamed `HTTP1ToGRPCServerCodec` and is
  now generic over the request and response type
- `HTTP1ToGRPCServerCodec` now comes after `GRPCServerRequestRoutingHandler`
- `HTTP1ToGRPCServerCodec` deals with message framing as well as
  (de/)serializion
- The `HTTPRequestHead` is now passed from the `HTTP1ToGRPCServerCodec`
  to the `BaseCallHandler`, which now passes it to the appropriate call
  handler (instead of passing it through the generated code as part of
  the call context).
- Tests updated, dead code removed.

This also paves the way for providing a server state machine, similar to
the client (i.e. it would essentially replace the contents of
`HTTP1ToGRPCServerCodec`).

Result:

Functionally, there should be no change. However this change makes it
possible for compression to be added to the server.

* fixup typos, add another test

* fixup some comments
George Barnett 6 years ago
parent
commit
c8b56b6607

+ 22 - 36
Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift

@@ -33,12 +33,9 @@ public class BidirectionalStreamingCallHandler<
   public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
   public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
 
-  private var observerState: ClientStreamingHandlerObserverState<EventObserverFactory, EventObserver> {
-    willSet(newState) {
-      self.logger.debug("observerState changed from \(self.observerState) to \(newState)")
-    }
-  }
   private var callContext: Context?
+  private var eventObserver: EventLoopFuture<EventObserver>?
+  private let eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventLoopFuture<EventObserver>
 
   // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
   // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
@@ -46,49 +43,38 @@ public class BidirectionalStreamingCallHandler<
     callHandlerContext: CallHandlerContext,
     eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventLoopFuture<EventObserver>
   ) {
-    // Delay the creation of the event observer until `handlerAdded(context:)`, otherwise it is
-    // possible for the service to write into the pipeline (by fulfilling the status promise
-    // of the call context outside of the observer) before it has been configured.
-    self.observerState = .pendingCreation(eventObserverFactory)
-
+    // Delay the creation of the event observer until we actually get a request head, otherwise it
+    // would be possible for the observer to write into the pipeline (by completing the status
+    // promise) before the pipeline is configured.
+    self.eventObserverFactory = eventObserverFactory
     super.init(callHandlerContext: callHandlerContext)
+  }
 
-    let context = StreamingResponseCallContextImpl<ResponsePayload>(
-      channel: self.callHandlerContext.channel,
-      request: self.callHandlerContext.request,
+  internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
+    let callContext = StreamingResponseCallContextImpl<ResponsePayload>(
+      channel: context.channel,
+      request: head,
       errorDelegate: self.callHandlerContext.errorDelegate,
       logger: self.callHandlerContext.logger
     )
+    self.callContext = callContext
 
-    self.callContext = context
+    let eventObserver = self.eventObserverFactory(callContext)
+    eventObserver.cascadeFailure(to: callContext.statusPromise)
+    self.eventObserver = eventObserver
 
-    context.statusPromise.futureResult.whenComplete { _ in
+    callContext.statusPromise.futureResult.whenComplete { _ in
       // When done, reset references to avoid retain cycles.
+      self.eventObserver = nil
       self.callContext = nil
-      self.observerState = .notRequired
     }
-  }
 
-  public override func handlerAdded(context: ChannelHandlerContext) {
-    guard let callContext = self.callContext,
-      case let .pendingCreation(factory) = self.observerState else {
-      self.logger.warning("handlerAdded(context:) called but handler already has a call context")
-      return
-    }
-
-    let eventObserver = factory(callContext)
-    self.observerState = .created(eventObserver)
-
-    // Terminate the call if the future providing an observer fails.
-    // This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
-    // translate our outgoing `GRPCServerResponsePart<ResponsePayload>` message is already present on the channel.
-    // Otherwise, our `OutboundOut` type would not match the `OutboundIn` type of the next handler on the channel.
-    eventObserver.cascadeFailure(to: callContext.statusPromise)
+    context.writeAndFlush(self.wrapOutboundOut(.headers([:])), promise: nil)
   }
 
   internal override func processMessage(_ message: RequestPayload) {
-    guard case .created(let eventObserver) = self.observerState else {
-      self.logger.warning("expecting observerState to be .created but was \(self.observerState), ignoring message \(message)")
+    guard let eventObserver = self.eventObserver else {
+      self.logger.warning("eventObserver is nil; ignoring message")
       return
     }
     eventObserver.whenSuccess { observer in
@@ -97,8 +83,8 @@ public class BidirectionalStreamingCallHandler<
   }
 
   internal override func endOfStreamReceived() throws {
-    guard case .created(let eventObserver) = self.observerState else {
-      self.logger.warning("expecting observerState to be .created but was \(self.observerState), ignoring end-of-stream call")
+    guard let eventObserver = self.eventObserver else {
+      self.logger.warning("eventObserver is nil; ignoring end-of-stream")
       return
     }
     eventObserver.whenSuccess { observer in

+ 20 - 43
Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift

@@ -19,14 +19,6 @@ import NIO
 import NIOHTTP1
 import Logging
 
-/// For calls which support client streaming we need to delay the creation of the event observer
-/// until the handler has been added to the pipeline.
-enum ClientStreamingHandlerObserverState<Factory, Observer> {
-  case pendingCreation(Factory)
-  case created(EventLoopFuture<Observer>)
-  case notRequired
-}
-
 /// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block.
 ///
 /// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done.
@@ -41,59 +33,44 @@ public final class ClientStreamingCallHandler<
   public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
   public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
 
-  private var observerState: ClientStreamingHandlerObserverState<EventObserverFactory, EventObserver> {
-    willSet(newState) {
-      self.logger.debug("observerState changed from \(self.observerState) to \(newState)")
-    }
-  }
   private var callContext: UnaryResponseCallContext<ResponsePayload>?
+  private var eventObserver: EventLoopFuture<EventObserver>?
+  private let eventObserverFactory: EventObserverFactory
 
   // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
   // If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
   public init(callHandlerContext: CallHandlerContext, eventObserverFactory: @escaping EventObserverFactory) {
-    // Delay the creation of the event observer until `handlerAdded(context:)`, otherwise it is
-    // possible for the service to write into the pipeline (by fulfilling the response promise
-    // of the call context outside of the observer) before it has been configured.
-    self.observerState = .pendingCreation(eventObserverFactory)
-
+    self.eventObserverFactory = eventObserverFactory
     super.init(callHandlerContext: callHandlerContext)
+  }
 
+  internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
     let callContext = UnaryResponseCallContextImpl<ResponsePayload>(
-      channel: self.callHandlerContext.channel,
-      request: self.callHandlerContext.request,
-      errorDelegate: self.callHandlerContext.errorDelegate,
-      logger: self.callHandlerContext.logger
+      channel: context.channel,
+      request: head,
+      errorDelegate: self.errorDelegate,
+      logger: self.logger
     )
 
     self.callContext = callContext
 
+    let eventObserver = self.eventObserverFactory(callContext)
+    eventObserver.cascadeFailure(to: callContext.responsePromise)
+
+    self.eventObserver = eventObserver
+
     callContext.responsePromise.futureResult.whenComplete { _ in
       // When done, reset references to avoid retain cycles.
+      self.eventObserver = nil
       self.callContext = nil
-      self.observerState = .notRequired
     }
-  }
 
-  public override func handlerAdded(context: ChannelHandlerContext) {
-    guard let callContext = self.callContext,
-      case let .pendingCreation(factory) = self.observerState else {
-      self.logger.warning("handlerAdded(context:) called but handler already has a call context")
-      return
-    }
-
-    let eventObserver = factory(callContext)
-    self.observerState = .created(eventObserver)
-
-    // Terminate the call if the future providing an observer fails.
-    // This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
-    // translate our outgoing `GRPCServerResponsePart<ResponsePayload>` message is already present on the channel.
-    // Otherwise, our `OutboundOut` type would not match the `OutboundIn` type of the next handler on the channel.
-    eventObserver.cascadeFailure(to: callContext.responsePromise)
+    context.writeAndFlush(self.wrapOutboundOut(.headers([:])), promise: nil)
   }
 
   internal override func processMessage(_ message: RequestPayload) {
-    guard case .created(let eventObserver) = self.observerState else {
-      self.logger.warning("expecting observerState to be .created but was \(self.observerState), ignoring message \(message)")
+    guard let eventObserver = self.eventObserver else {
+      self.logger.warning("eventObserver is nil; ignoring message")
       return
     }
     eventObserver.whenSuccess { observer in
@@ -102,8 +79,8 @@ public final class ClientStreamingCallHandler<
   }
 
   internal override func endOfStreamReceived() throws {
-    guard case .created(let eventObserver) = self.observerState else {
-      self.logger.warning("expecting observerState to be .created but was \(self.observerState), ignoring end-of-stream call")
+    guard let eventObserver = self.eventObserver else {
+      self.logger.warning("eventObserver is nil; ignoring end-of-stream")
       return
     }
     eventObserver.whenSuccess { observer in

+ 13 - 3
Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift

@@ -31,15 +31,23 @@ public final class ServerStreamingCallHandler<
 
   private var eventObserver: EventObserver?
   private var callContext: StreamingResponseCallContext<ResponsePayload>?
+  private let eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
 
   public init(
     callHandlerContext: CallHandlerContext,
-    eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
+    eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
   ) {
+    // Delay the creation of the event observer until we actually get a request head, otherwise it
+    // would be possible for the observer to write into the pipeline (by completing the status
+    // promise) before the pipeline is configured.
+    self.eventObserverFactory = eventObserverFactory
     super.init(callHandlerContext: callHandlerContext)
+  }
+
+  override internal func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
     let callContext = StreamingResponseCallContextImpl<ResponsePayload>(
-      channel: self.callHandlerContext.channel,
-      request: self.callHandlerContext.request,
+      channel: context.channel,
+      request: head,
       errorDelegate: self.callHandlerContext.errorDelegate,
       logger: self.callHandlerContext.logger
     )
@@ -51,6 +59,8 @@ public final class ServerStreamingCallHandler<
       self.eventObserver = nil
       self.callContext = nil
     }
+
+    context.writeAndFlush(self.wrapOutboundOut(.headers([:])), promise: nil)
   }
 
   override internal func processMessage(_ message: RequestPayload) throws {

+ 13 - 6
Sources/GRPC/CallHandlers/UnaryCallHandler.swift

@@ -31,26 +31,33 @@ public final class UnaryCallHandler<
   public typealias EventObserver = (RequestPayload) -> EventLoopFuture<ResponsePayload>
   private var eventObserver: EventObserver?
   private var callContext: UnaryResponseCallContext<ResponsePayload>?
+  private let eventObserverFactory: (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
 
   public init(
     callHandlerContext: CallHandlerContext,
-    eventObserverFactory: (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
+    eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
   ) {
+    self.eventObserverFactory = eventObserverFactory
     super.init(callHandlerContext: callHandlerContext)
+  }
+
+  internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
     let callContext = UnaryResponseCallContextImpl<ResponsePayload>(
-      channel: self.callHandlerContext.channel,
-      request: self.callHandlerContext.request,
-      errorDelegate: self.callHandlerContext.errorDelegate,
-      logger: self.callHandlerContext.logger
+      channel: context.channel,
+      request: head,
+      errorDelegate: self.errorDelegate,
+      logger: self.logger
     )
 
     self.callContext = callContext
-    self.eventObserver = eventObserverFactory(callContext)
+    self.eventObserver = self.eventObserverFactory(callContext)
     callContext.responsePromise.futureResult.whenComplete { _ in
       // When done, reset references to avoid retain cycles.
       self.eventObserver = nil
       self.callContext = nil
     }
+
+    context.writeAndFlush(self.wrapOutboundOut(.headers([:])), promise: nil)
   }
 
   internal override func processMessage(_ message: RequestPayload) throws {

+ 10 - 11
Sources/GRPC/CallHandlers/_BaseCallHandler.swift

@@ -25,7 +25,14 @@ import Logging
 /// - Important: This is **NOT** part of the public API.
 public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: GRPCCallHandler {
   public func makeGRPCServerCodec() -> ChannelHandler {
-    return GRPCServerCodec<RequestPayload, ResponsePayload>()
+    return HTTP1ToGRPCServerCodec<RequestPayload, ResponsePayload>(logger: self.logger)
+  }
+
+  /// Called when the request head has been received.
+  ///
+  /// Overridden by subclasses.
+  internal func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
+    fatalError("needs to be overridden")
   }
 
   /// Called whenever a message has been received.
@@ -62,12 +69,6 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
   internal init(callHandlerContext: CallHandlerContext) {
     self.callHandlerContext = callHandlerContext
   }
-
-  /// Needs to be implemented by this class so that subclasses can override it.
-  ///
-  /// Otherwise, the subclass's implementation will simply never be called (probably because the protocol's default
-  /// implementation in an extension is being used instead).
-  public func handlerAdded(context: ChannelHandlerContext) { }
 }
 
 extension _BaseCallHandler: ChannelInboundHandler {
@@ -95,10 +96,8 @@ extension _BaseCallHandler: ChannelInboundHandler {
 
   public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
     switch self.unwrapInboundIn(data) {
-    case .head(let requestHead):
-      // Head should have been handled by `GRPCChannelHandler`.
-      self.logger.error("call handler unexpectedly received request head", metadata: ["head": "\(requestHead)"])
-      self.errorCaught(context: context, error: GRPCError.InvalidState("unexpected request head received \(requestHead)").captureContext())
+    case .head(let head):
+      self.processHead(head, context: context)
 
     case .message(let message):
       do {

+ 0 - 154
Sources/GRPC/GRPCChannelHandler.swift

@@ -1,154 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import SwiftProtobuf
-import NIO
-import NIOHTTP1
-import Logging
-
-/// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
-public protocol GRPCCallHandler: ChannelHandler {
-  func makeGRPCServerCodec() -> ChannelHandler
-}
-
-/// Provides `GRPCCallHandler` objects for the methods on a particular service name.
-///
-/// Implemented by the generated code.
-public protocol CallHandlerProvider: class {
-  /// The name of the service this object is providing methods for, including the package path.
-  ///
-  /// - Example: "io.grpc.Echo.EchoService"
-  var serviceName: String { get }
-
-  /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
-  /// method. Returns nil for methods not handled by this service.
-  func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler?
-}
-
-// This is public because it will be passed into generated code, all memebers are `internal` because
-// the context will get passed from generated code back into gRPC library code and all members should
-// be considered an implementation detail to the user.
-public struct CallHandlerContext {
-  internal var request: HTTPRequestHead
-  internal var channel: Channel
-  internal var errorDelegate: ServerErrorDelegate?
-  internal var logger: Logger
-}
-
-/// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available.
-///
-/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
-/// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
-public final class GRPCChannelHandler {
-  private let logger: Logger
-  private let servicesByName: [String: CallHandlerProvider]
-  private weak var errorDelegate: ServerErrorDelegate?
-
-  public init(servicesByName: [String: CallHandlerProvider], errorDelegate: ServerErrorDelegate?, logger: Logger) {
-    self.servicesByName = servicesByName
-    self.errorDelegate = errorDelegate
-    self.logger = logger
-  }
-}
-
-extension GRPCChannelHandler: ChannelInboundHandler, RemovableChannelHandler {
-  public typealias InboundIn = _RawGRPCServerRequestPart
-  public typealias OutboundOut = _RawGRPCServerResponsePart
-
-  public func errorCaught(context: ChannelHandlerContext, error: Error) {
-    let status: GRPCStatus
-
-    if let errorWithContext = error as? GRPCError.WithContext {
-      self.errorDelegate?.observeLibraryError(errorWithContext.error)
-      status = self.errorDelegate?.transformLibraryError(errorWithContext.error)
-          ?? errorWithContext.error.makeGRPCStatus()
-    } else {
-      self.errorDelegate?.observeLibraryError(error)
-      status = self.errorDelegate?.transformLibraryError(error)
-          ?? (error as? GRPCStatusTransformable)?.makeGRPCStatus()
-          ?? .processingError
-    }
-
-    context.writeAndFlush(wrapOutboundOut(.statusAndTrailers(status, HTTPHeaders())), promise: nil)
-  }
-
-  public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
-    let requestPart = self.unwrapInboundIn(data)
-    switch requestPart {
-    case .head(let requestHead):
-      guard let callHandler = self.makeCallHandler(channel: context.channel, requestHead: requestHead) else {
-        self.errorCaught(context: context, error: GRPCError.RPCNotImplemented(rpc: requestHead.uri).captureContext())
-        return
-      }
-
-      logger.debug("received request head, configuring pipeline")
-
-      let codec = callHandler.makeGRPCServerCodec()
-      let handlerRemoved: EventLoopPromise<Void> = context.eventLoop.makePromise()
-      handlerRemoved.futureResult.whenSuccess {
-        self.logger.debug("removed GRPCChannelHandler from pipeline")
-        context.pipeline.addHandler(callHandler, position: .after(codec)).whenComplete { _ in
-          // Send the .headers event back to begin the headers flushing for the response.
-          // At this point, which headers should be returned is not known, as the content type is
-          // processed in HTTP1ToRawGRPCServerCodec. At the same time the HTTP1ToRawGRPCServerCodec
-          // handler doesn't have the data to determine whether headers should be returned, as it is
-          // this handler that checks whether the stub for the requested Service/Method is implemented.
-          // This likely signals that the architecture for these handlers could be improved.
-          context.writeAndFlush(self.wrapOutboundOut(.headers(HTTPHeaders())), promise: nil)
-        }
-      }
-
-      logger.debug("adding handler \(type(of: codec)) to pipeline")
-      context.pipeline.addHandler(codec, position: .after(self))
-        .whenSuccess { context.pipeline.removeHandler(context: context, promise: handlerRemoved) }
-
-    case .message, .end:
-      // We can reach this point if we're receiving messages for a method that isn't implemented.
-      // A status resposne will have been fired which should also close the stream; there's not a
-      // lot we can do at this point.
-      break
-    }
-  }
-
-  private func makeCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
-    // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
-    // - uriComponents[0]: empty
-    // - uriComponents[1]: service name (including the package name);
-    //     `CallHandlerProvider`s should provide the service name including the package name.
-    // - uriComponents[2]: method name.
-    self.logger.debug("making call handler", metadata: ["path": "\(requestHead.uri)"])
-    let uriComponents = requestHead.uri.components(separatedBy: "/")
-
-    var logger = self.logger
-    // Unset the channel handler: it shouldn't be used for downstream handlers.
-    logger[metadataKey: MetadataKey.channelHandler] = nil
-
-    let context = CallHandlerContext(
-      request: requestHead,
-      channel: channel,
-      errorDelegate: self.errorDelegate,
-      logger: logger
-    )
-
-    guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
-      let providerForServiceName = servicesByName[uriComponents[1]],
-      let callHandler = providerForServiceName.handleMethod(uriComponents[2], callHandlerContext: context) else {
-        self.logger.notice("could not create handler", metadata: ["path": "\(requestHead.uri)"])
-        return nil
-    }
-    return callHandler
-  }
-}

+ 4 - 1
Sources/GRPC/GRPCContentType.swift

@@ -44,7 +44,8 @@ internal enum ContentType {
   var canonicalValue: String {
     switch self {
     case .protobuf:
-      return "application/grpc+proto"
+      // This is more widely supported than "application/grpc+proto"
+      return "application/grpc"
 
     case .webProtobuf:
       return "application/grpc-web+proto"
@@ -53,4 +54,6 @@ internal enum ContentType {
       return "application/grpc-web-text+proto"
     }
   }
+
+  static let commonPrefix = "application/grpc"
 }

+ 1 - 0
Sources/GRPC/GRPCHeaderName.swift

@@ -20,4 +20,5 @@ internal enum GRPCHeaderName {
   static let acceptEncoding = "grpc-accept-encoding"
   static let statusCode = "grpc-status"
   static let statusMessage = "grpc-message"
+  static let contentType = "content-type"
 }

+ 0 - 92
Sources/GRPC/GRPCServerCodec.swift

@@ -1,92 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import SwiftProtobuf
-import NIO
-import NIOFoundationCompat
-import NIOHTTP1
-
-/// Incoming gRPC package with a fixed message type.
-///
-/// - Important: This is **NOT** part of the public API.
-public enum _GRPCServerRequestPart<RequestPayload: GRPCPayload> {
-  case head(HTTPRequestHead)
-  case message(RequestPayload)
-  case end
-}
-
-/// Outgoing gRPC package with a fixed message type.
-///
-/// - Important: This is **NOT** part of the public API.
-public enum _GRPCServerResponsePart<ResponsePayload: GRPCPayload> {
-  case headers(HTTPHeaders)
-  case message(ResponsePayload)
-  case statusAndTrailers(GRPCStatus, HTTPHeaders)
-}
-
-/// A simple channel handler that translates raw gRPC packets into decoded protobuf messages, and vice versa.
-internal final class GRPCServerCodec<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload> {
-  var messageWriter = LengthPrefixedMessageWriter(compression: .none)
-}
-
-extension GRPCServerCodec: ChannelInboundHandler {
-  typealias InboundIn = _RawGRPCServerRequestPart
-  typealias InboundOut = _GRPCServerRequestPart<RequestPayload>
-
-  func channelRead(context: ChannelHandlerContext, data: NIOAny) {
-    switch self.unwrapInboundIn(data) {
-    case .head(let requestHead):
-      context.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
-
-    case .message(var message):
-      do {
-        context.fireChannelRead(self.wrapInboundOut(.message(try RequestPayload(serializedByteBuffer: &message))))
-      } catch {
-        context.fireErrorCaught(GRPCError.DeserializationFailure().captureContext())
-      }
-
-    case .end:
-      context.fireChannelRead(self.wrapInboundOut(.end))
-    }
-  }
-}
-
-extension GRPCServerCodec: ChannelOutboundHandler {
-  typealias OutboundIn = _GRPCServerResponsePart<ResponsePayload>
-  typealias OutboundOut = _RawGRPCServerResponsePart
-
-  func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
-    let responsePart = self.unwrapOutboundIn(data)
-    switch responsePart {
-    case .headers(let headers):
-      context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)
-
-    case .message(let message):
-      do {
-        var responseBuffer = context.channel.allocator.buffer(capacity: 0)
-        try self.messageWriter.write(message, into: &responseBuffer)
-        context.write(self.wrapOutboundOut(.message(responseBuffer)), promise: promise)
-      } catch {
-        let error = GRPCError.SerializationFailure().captureContext()
-        promise?.fail(error)
-        context.fireErrorCaught(error)
-      }
-
-    case let .statusAndTrailers(status, trailers):
-      context.writeAndFlush(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
-    }
-  }
-}

+ 241 - 0
Sources/GRPC/GRPCServerRequestRoutingHandler.swift

@@ -0,0 +1,241 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import SwiftProtobuf
+import NIO
+import NIOHTTP1
+import Logging
+
+/// Processes individual gRPC messages and stream-close events on an HTTP2 channel.
+public protocol GRPCCallHandler: ChannelHandler {
+  func makeGRPCServerCodec() -> ChannelHandler
+}
+
+/// Provides `GRPCCallHandler` objects for the methods on a particular service name.
+///
+/// Implemented by the generated code.
+public protocol CallHandlerProvider: class {
+  /// The name of the service this object is providing methods for, including the package path.
+  ///
+  /// - Example: "io.grpc.Echo.EchoService"
+  var serviceName: String { get }
+
+  /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
+  /// method. Returns nil for methods not handled by this service.
+  func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler?
+}
+
+// This is public because it will be passed into generated code, all members are `internal` because
+// the context will get passed from generated code back into gRPC library code and all members should
+// be considered an implementation detail to the user.
+public struct CallHandlerContext {
+  internal var errorDelegate: ServerErrorDelegate?
+  internal var logger: Logger
+}
+
+/// Attempts to route a request to a user-provided call handler. Also validates that the request has
+/// a suitable 'content-type' for gRPC.
+///
+/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
+/// for a `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
+///
+/// After the pipeline has been configured with the `GRPCCallHandler`, this handler removes itself
+/// from the pipeline.
+public final class GRPCServerRequestRoutingHandler {
+  private let logger: Logger
+  private let servicesByName: [String: CallHandlerProvider]
+  private weak var errorDelegate: ServerErrorDelegate?
+
+  private enum State: Equatable {
+    case notConfigured
+    case configuring([InboundOut])
+  }
+
+  private var state: State = .notConfigured
+
+  public init(servicesByName: [String: CallHandlerProvider], errorDelegate: ServerErrorDelegate?, logger: Logger) {
+    self.servicesByName = servicesByName
+    self.errorDelegate = errorDelegate
+    self.logger = logger
+  }
+}
+
+extension GRPCServerRequestRoutingHandler: ChannelInboundHandler, RemovableChannelHandler {
+  public typealias InboundIn = HTTPServerRequestPart
+  public typealias InboundOut = HTTPServerRequestPart
+  public typealias OutboundOut = HTTPServerResponsePart
+
+  public func errorCaught(context: ChannelHandlerContext, error: Error) {
+    let status: GRPCStatus
+    if let errorWithContext = error as? GRPCError.WithContext {
+      self.errorDelegate?.observeLibraryError(errorWithContext.error)
+      status = errorWithContext.error.makeGRPCStatus()
+    } else {
+      self.errorDelegate?.observeLibraryError(error)
+      status = (error as? GRPCStatusTransformable)?.makeGRPCStatus() ?? .processingError
+    }
+
+    switch self.state {
+    case .notConfigured:
+      // We don't know what protocol we're speaking at this point. We'll just have to close the
+      // channel.
+      ()
+
+    case .configuring(let messages):
+      // first! is fine here: we only go from `.notConfigured` to `.configuring` when we receive
+      // and validate the request head.
+      let head = messages.compactMap { part -> HTTPRequestHead? in
+        switch part {
+        case .head(let head):
+          return head
+        default:
+          return nil
+        }
+      }.first!
+
+      let responseHead = self.makeResponseHead(requestHead: head, status: status)
+      context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
+      context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
+      context.flush()
+    }
+
+    context.close(mode: .all, promise: nil)
+  }
+
+  public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
+    let requestPart = self.unwrapInboundIn(data)
+    switch self.unwrapInboundIn(data) {
+    case .head(let requestHead):
+      precondition(.notConfigured == self.state)
+
+      // Validate the 'content-type' is related to gRPC before proceeding.
+      let maybeContentType = requestHead.headers.first(name: GRPCHeaderName.contentType)
+      guard let contentType = maybeContentType, contentType.starts(with: ContentType.commonPrefix) else {
+        self.logger.warning(
+          "received request whose 'content-type' does not exist or start with '\(ContentType.commonPrefix)'",
+          metadata: ["content-type": "\(String(describing: maybeContentType))"]
+        )
+
+        // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+        //
+        //   If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
+        //   with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
+        //   clients from interpreting a gRPC error response, which uses status 200 (OK), as
+        //   successful.
+        let responseHead = HTTPResponseHead(
+          version: requestHead.version,
+          status: .unsupportedMediaType
+        )
+
+        // Fail the call. Note: we're not speaking gRPC here, so no status or message.
+        context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
+        context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
+        return
+      }
+
+      // Do we know how to handle this RPC?
+      guard let callHandler = self.makeCallHandler(channel: context.channel, requestHead: requestHead) else {
+        self.logger.warning(
+          "unable to make call handler; the RPC is not implemented on this server",
+          metadata: ["uri": "\(requestHead.uri)"]
+        )
+
+        let status = GRPCError.RPCNotImplemented(rpc: requestHead.uri).makeGRPCStatus()
+        let responseHead = self.makeResponseHead(requestHead: requestHead, status: status)
+
+        // Write back a 'trailers-only' response.
+        context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
+        context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
+        return
+      }
+
+      self.logger.debug("received request head, configuring pipeline")
+
+      // Buffer the request head; we'll replay it in the next handler when we're removed from the
+      // pipeline.
+      self.state = .configuring([requestPart])
+
+      // Configure the rest of the pipeline to serve the RPC.
+      let codec = callHandler.makeGRPCServerCodec()
+      context.pipeline.addHandlers([codec, callHandler], position: .after(self)).whenSuccess {
+        context.pipeline.removeHandler(self, promise: nil)
+      }
+
+    case .body, .end:
+      switch self.state {
+      case .notConfigured:
+        // We can reach this point if we're receiving messages for a method that isn't implemented,
+        // in which case we just drop the messages; our response should already be in-flight.
+        ()
+
+      case .configuring(var buffer):
+        // We received a message while the pipeline was being configured; hold on to it while we
+        // finish configuring the pipeline.
+        buffer.append(requestPart)
+        self.state = .configuring(buffer)
+      }
+    }
+  }
+
+  public func handlerRemoved(context: ChannelHandlerContext) {
+    switch self.state {
+    case .notConfigured:
+      ()
+
+    case .configuring(let messages):
+      for message in messages {
+        context.fireChannelRead(self.wrapInboundOut(message))
+      }
+    }
+  }
+
+  private func makeCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
+    // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
+    // - uriComponents[0]: empty
+    // - uriComponents[1]: service name (including the package name);
+    //     `CallHandlerProvider`s should provide the service name including the package name.
+    // - uriComponents[2]: method name.
+    self.logger.debug("making call handler", metadata: ["path": "\(requestHead.uri)"])
+    let uriComponents = requestHead.uri.components(separatedBy: "/")
+
+    var logger = self.logger
+    // Unset the channel handler: it shouldn't be used for downstream handlers.
+    logger[metadataKey: MetadataKey.channelHandler] = nil
+
+    let context = CallHandlerContext(errorDelegate: self.errorDelegate, logger: logger)
+
+    guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
+      let providerForServiceName = servicesByName[uriComponents[1]],
+      let callHandler = providerForServiceName.handleMethod(uriComponents[2], callHandlerContext: context) else {
+        self.logger.notice("could not create handler", metadata: ["path": "\(requestHead.uri)"])
+        return nil
+    }
+    return callHandler
+  }
+
+  private func makeResponseHead(requestHead: HTTPRequestHead, status: GRPCStatus) -> HTTPResponseHead {
+    var headers: HTTPHeaders = [
+      GRPCHeaderName.contentType: ContentType.protobuf.canonicalValue,
+      GRPCHeaderName.statusCode: "\(status.code.rawValue)",
+    ]
+
+    if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
+      headers.add(name: GRPCHeaderName.statusMessage, value: message)
+    }
+
+    return HTTPResponseHead(version: requestHead.version, status: .ok, headers: headers)
+  }
+}

+ 69 - 58
Sources/GRPC/HTTP1ToRawGRPCServerCodec.swift → Sources/GRPC/HTTP1ToGRPCServerCodec.swift

@@ -19,31 +19,32 @@ import NIOHTTP1
 import NIOFoundationCompat
 import Logging
 
-/// Incoming gRPC package with an unknown message type (represented by a byte buffer).
-public enum _RawGRPCServerRequestPart {
+/// Incoming gRPC package with a fixed message type.
+///
+/// - Important: This is **NOT** part of the public API.
+public enum _GRPCServerRequestPart<RequestPayload: GRPCPayload> {
   case head(HTTPRequestHead)
-  case message(ByteBuffer)
+  case message(RequestPayload)
   case end
 }
 
-/// Outgoing gRPC package with an unknown message type (represented by `Data`).
-public enum _RawGRPCServerResponsePart {
+/// Outgoing gRPC package with a fixed message type.
+///
+/// - Important: This is **NOT** part of the public API.
+public enum _GRPCServerResponsePart<ResponsePayload: GRPCPayload> {
   case headers(HTTPHeaders)
-  case message(ByteBuffer)
+  case message(ResponsePayload)
   case statusAndTrailers(GRPCStatus, HTTPHeaders)
 }
 
 /// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
 ///
-/// This codec allows us to use the "raw" gRPC protocol on a low level, with further handlers operating the protocol
-/// on a "higher" level.
-///
 /// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
-/// primitives while providing all the functionality we need. In addition, this should make implementing gRPC-over-HTTP1
-/// (sometimes also called pPRC) easier in the future.
+/// primitives while providing all the functionality we need. In addition, it allows us to support
+/// gRPC-Web (gRPC over HTTP1).
 ///
 /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
-public final class HTTP1ToRawGRPCServerCodec {
+public final class HTTP1ToGRPCServerCodec<Request: GRPCPayload, Response: GRPCPayload> {
   public init(logger: Logger) {
     self.logger = logger
 
@@ -51,6 +52,7 @@ public final class HTTP1ToRawGRPCServerCodec {
     accessLog[metadataKey: MetadataKey.requestID] = logger[metadataKey: MetadataKey.requestID]
     self.accessLog = accessLog
     self.messageReader = LengthPrefixedMessageReader()
+    self.messageWriter = LengthPrefixedMessageWriter()
   }
 
   private var contentType: ContentType?
@@ -89,19 +91,10 @@ public final class HTTP1ToRawGRPCServerCodec {
   }
 
   var messageReader: LengthPrefixedMessageReader
+  var messageWriter: LengthPrefixedMessageWriter
 }
 
-extension HTTP1ToRawGRPCServerCodec {
-  /// Expected content types for incoming requests.
-  private enum ContentType: String {
-    /// Binary encoded gRPC request.
-    case binary = "application/grpc"
-    /// Base64 encoded gRPC-Web request.
-    case text = "application/grpc-web-text"
-    /// Binary encoded gRPC-Web request.
-    case web = "application/grpc-web"
-  }
-
+extension HTTP1ToGRPCServerCodec {
   enum InboundState {
     case expectingHeaders
     case expectingBody
@@ -116,9 +109,9 @@ extension HTTP1ToRawGRPCServerCodec {
   }
 }
 
-extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
+extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
   public typealias InboundIn = HTTPServerRequestPart
-  public typealias InboundOut = _RawGRPCServerRequestPart
+  public typealias InboundOut = _GRPCServerRequestPart<Request>
 
   public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
     if case .ignore = inboundState {
@@ -157,15 +150,15 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
       "version": "\(requestHead.version)"
     ])
 
-    if let contentType = requestHead.headers["content-type"].first.flatMap(ContentType.init) {
+    if let contentType = requestHead.headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) {
       self.contentType = contentType
     } else {
       self.logger.debug("no 'content-type' header, assuming content type is 'application/grpc'")
       // If the Content-Type is not present, assume the request is binary encoded gRPC.
-      self.contentType = .binary
+      self.contentType = .protobuf
     }
 
-    if self.contentType == .text {
+    if self.contentType == .webTextProtobuf {
       requestTextBuffer = context.channel.allocator.buffer(capacity: 0)
     }
 
@@ -184,7 +177,7 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
     // it to the binary buffer. If the request is chunked, this section will process the text
     // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
     // where it will expect a new incoming chunk.
-    if self.contentType == .text {
+    if self.contentType == .webTextProtobuf {
       precondition(requestTextBuffer != nil)
       requestTextBuffer.writeBuffer(&body)
 
@@ -199,8 +192,18 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
     }
 
     self.messageReader.append(buffer: &body)
-    while let message = try self.messageReader.nextMessage() {
-      context.fireChannelRead(self.wrapInboundOut(.message(message)))
+    var requests: [Request] = []
+    do {
+      while var buffer = try self.messageReader.nextMessage() {
+        requests.append(try Request(serializedByteBuffer: &buffer))
+      }
+    } catch {
+      context.fireErrorCaught(GRPCError.DeserializationFailure().captureContext())
+      return .ignore
+    }
+
+    requests.forEach {
+      context.fireChannelRead(self.wrapInboundOut(.message($0)))
     }
 
     return .expectingBody
@@ -218,8 +221,8 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
   }
 }
 
-extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
-  public typealias OutboundIn = _RawGRPCServerResponsePart
+extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
+  public typealias OutboundIn = _GRPCServerResponsePart<Response>
   public typealias OutboundOut = HTTPServerResponsePart
 
   public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
@@ -238,49 +241,57 @@ extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
 
       var version = HTTPVersion(major: 2, minor: 0)
       if let contentType = self.contentType {
-        headers.add(name: "content-type", value: contentType.rawValue)
-        if contentType != .binary {
+        headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
+        if contentType != .protobuf {
           version = .init(major: 1, minor: 1)
         }
       }
 
-      if self.contentType == .text {
+      if self.contentType == .webTextProtobuf {
         responseTextBuffer = context.channel.allocator.buffer(capacity: 0)
       }
 
       context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
       self.outboundState = .expectingBodyOrStatus
 
-    case .message(let messageBytes):
+    case .message(let message):
       guard case .expectingBodyOrStatus = self.outboundState else {
-        self.logger.error("invalid state '\(self.outboundState)' while writing message", metadata: ["message": "\(messageBytes)"])
+        self.logger.error("invalid state '\(self.outboundState)' while writing message")
         return
       }
       
-      if contentType == .text {
-        precondition(self.responseTextBuffer != nil)
-
-        // Store the response into an independent buffer. We can't return the message directly as
-        // it needs to be aggregated with all the responses plus the trailers, in order to have
-        // the base64 response properly encoded in a single byte stream.
-        
-        var messageBytes = messageBytes
-        self.responseTextBuffer.writeBuffer(&messageBytes)
-        
-        // Since we stored the written data, mark the write promise as successful so that the
-        // ServerStreaming provider continues sending the data.
-        promise?.succeed(())
-      } else {
-        context.write(self.wrapOutboundOut(.body(.byteBuffer(messageBytes))), promise: promise)
+      do {
+        if contentType == .webTextProtobuf {
+          // Store the response into an independent buffer. We can't return the message directly as
+          // it needs to be aggregated with all the responses plus the trailers, in order to have
+          // the base64 response properly encoded in a single byte stream.
+          precondition(self.responseTextBuffer != nil)
+          try self.messageWriter.write(message, into: &self.responseTextBuffer)
+
+          // Since we stored the written data, mark the write promise as successful so that the
+          // ServerStreaming provider continues sending the data.
+          promise?.succeed(())
+        } else {
+          var lengthPrefixedMessageBuffer = context.channel.allocator.buffer(capacity: 0)
+          try self.messageWriter.write(message, into: &lengthPrefixedMessageBuffer)
+          context.write(self.wrapOutboundOut(.body(.byteBuffer(lengthPrefixedMessageBuffer))), promise: promise)
+        }
+      } catch {
+        let error = GRPCError.SerializationFailure().captureContext()
+        promise?.fail(error)
+        context.fireErrorCaught(error)
+        self.outboundState = .ignore
+        return
       }
+
       self.outboundState = .expectingBodyOrStatus
 
     case let .statusAndTrailers(status, trailers):
-      // If we error before sending the initial headers (e.g. unimplemented method) then we won't have sent the request head.
-      // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still need to loop back and
-      // send the request head first.
+      // If we error before sending the initial headers then we won't have sent the request head.
+      // NIOHTTP2 doesn't support sending a single frame as a "Trailers-Only" response so we still
+      // need to loop back and send the request head first.
       if case .expectingHeaders = self.outboundState {
-        self.write(context: context, data: NIOAny(_RawGRPCServerResponsePart.headers(HTTPHeaders())), promise: nil)
+        self.write(context: context, data: NIOAny(OutboundIn.headers(HTTPHeaders())), promise: nil)
       }
 
       var trailers = trailers
@@ -289,7 +300,7 @@ extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
         trailers.add(name: GRPCHeaderName.statusMessage, value: message)
       }
 
-      if contentType == .text {
+      if contentType == .webTextProtobuf {
         precondition(responseTextBuffer != nil)
 
         // Encode the trailers into the response byte stream as a length delimited message, as per

+ 42 - 53
Sources/GRPC/Server.swift

@@ -37,59 +37,51 @@ import Logging
 ///                   ByteBuffer│                       │ByteBuffer
 ///                             │                       ▼
 ///
-///    The NIOSSLHandler is optional and depends on how the framework user has configured
-///    their server. The HTTPProtocolSwitched detects which HTTP version is being used and
+///    The `NIOSSLHandler` is optional and depends on how the framework user has configured
+///    their server. The `HTTPProtocolSwitcher` detects which HTTP version is being used and
 ///    configures the pipeline accordingly.
 ///
 /// 2. HTTP version detected. "HTTP Handlers" depends on the HTTP version determined by
-///    HTTPProtocolSwitcher. All of these handlers are provided by NIO except for the
-///    WebCORSHandler which is used for HTTP/1.
+///    `HTTPProtocolSwitcher`. All of these handlers are provided by NIO except for the
+///    `WebCORSHandler` which is used for HTTP/1.
 ///
-///                           ┌───────────────────────────┐
-///                           │    GRPCChannelHandler*    │
-///                           └─▲───────────────────────┬─┘
-///     RawGRPCServerRequestPart│                       │RawGRPCServerResponsePart
-///                           ┌─┴───────────────────────▼─┐
-///                           │ HTTP1ToRawGRPCServerCodec │
-///                           └─▲───────────────────────┬─┘
-///        HTTPServerRequestPart│                       │HTTPServerResponsePart
-///                           ┌─┴───────────────────────▼─┐
-///                           │       HTTP Handlers       │
-///                           └─▲───────────────────────┬─┘
-///                   ByteBuffer│                       │ByteBuffer
-///                           ┌─┴───────────────────────▼─┐
-///                           │       NIOSSLHandler       │
-///                           └─▲───────────────────────┬─┘
-///                   ByteBuffer│                       │ByteBuffer
-///                             │                       ▼
+///                           ┌─────────────────────────────────┐
+///                           │ GRPCServerRequestRoutingHandler │
+///                           └─▲─────────────────────────────┬─┘
+///        HTTPServerRequestPart│                             │HTTPServerResponsePart
+///                           ┌─┴─────────────────────────────▼─┐
+///                           │          HTTP Handlers          │
+///                           └─▲─────────────────────────────┬─┘
+///                   ByteBuffer│                             │ByteBuffer
+///                           ┌─┴─────────────────────────────▼─┐
+///                           │          NIOSSLHandler          │
+///                           └─▲─────────────────────────────┬─┘
+///                   ByteBuffer│                             │ByteBuffer
+///                             │                             ▼
 ///
-///    The GPRCChannelHandler resolves the request head and configures the rest of the pipeline
-///    based on the RPC call being made.
+///    The `GRPCServerRequestRoutingHandler` resolves the request head and configures the rest of
+///    the pipeline based on the RPC call being made.
 ///
 /// 3. The call has been resolved and is a function that this server can handle. Responses are
 ///    written into `BaseCallHandler` by a user-implemented `CallHandlerProvider`.
 ///
-///                           ┌───────────────────────────┐
-///                           │     BaseCallHandler*      │
-///                           └─▲───────────────────────┬─┘
-///    GRPCServerRequestPart<T1>│                       │GRPCServerResponsePart<T2>
-///                           ┌─┴───────────────────────▼─┐
-///                           │      GRPCServerCodec      │
-///                           └─▲───────────────────────┬─┘
-///     RawGRPCServerRequestPart│                       │RawGRPCServerResponsePart
-///                           ┌─┴───────────────────────▼─┐
-///                           │ HTTP1ToRawGRPCServerCodec │
-///                           └─▲───────────────────────┬─┘
-///        HTTPServerRequestPart│                       │HTTPServerResponsePart
-///                           ┌─┴───────────────────────▼─┐
-///                           │       HTTP Handlers       │
-///                           └─▲───────────────────────┬─┘
-///                   ByteBuffer│                       │ByteBuffer
-///                           ┌─┴───────────────────────▼─┐
-///                           │       NIOSSLHandler       │
-///                           └─▲───────────────────────┬─┘
-///                   ByteBuffer│                       │ByteBuffer
-///                             │                       ▼
+///                           ┌─────────────────────────────────┐
+///                           │         BaseCallHandler*        │
+///                           └─▲─────────────────────────────┬─┘
+///    GRPCServerRequestPart<T1>│                             │GRPCServerResponsePart<T2>
+///                           ┌─┴─────────────────────────────▼─┐
+///                           │      HTTP1ToGRPCServerCodec     │
+///                           └─▲─────────────────────────────┬─┘
+///        HTTPServerRequestPart│                             │HTTPServerResponsePart
+///                           ┌─┴─────────────────────────────▼─┐
+///                           │          HTTP Handlers          │
+///                           └─▲─────────────────────────────┬─┘
+///                   ByteBuffer│                             │ByteBuffer
+///                           ┌─┴─────────────────────────────▼─┐
+///                           │          NIOSSLHandler          │
+///                           └─▲─────────────────────────────┬─┘
+///                   ByteBuffer│                             │ByteBuffer
+///                             │                             ▼
 ///
 public final class Server {
   /// Makes and configures a `ServerBootstrap` using the provided configuration.
@@ -109,15 +101,12 @@ public final class Server {
       .childChannelInitializer { channel in
         let protocolSwitcher = HTTPProtocolSwitcher(errorDelegate: configuration.errorDelegate) { channel -> EventLoopFuture<Void> in
           let logger = Logger(subsystem: .serverChannelCall, metadata: [MetadataKey.requestID: "\(UUID())"])
-          let handlers: [ChannelHandler] = [
-            HTTP1ToRawGRPCServerCodec(logger: logger),
-            GRPCChannelHandler(
-              servicesByName: configuration.serviceProvidersByName,
-              errorDelegate: configuration.errorDelegate,
-              logger: logger
-            )
-          ]
-          return channel.pipeline.addHandlers(handlers)
+          let handler = GRPCServerRequestRoutingHandler(
+            servicesByName: configuration.serviceProvidersByName,
+            errorDelegate: configuration.errorDelegate,
+            logger: logger
+          )
+          return channel.pipeline.addHandler(handler)
         }
 
         if let tls = configuration.tls {

+ 0 - 1
Tests/GRPCTests/AnyServiceClientTests.swift

@@ -19,7 +19,6 @@ import EchoModel
 import XCTest
 
 class AnyServiceClientTests: EchoTestCaseBase {
-
   var anyServiceClient: AnyServiceClient {
     return AnyServiceClient(connection: self.client.connection)
   }

+ 0 - 78
Tests/GRPCTests/GRPCChannelHandlerResponseCapturingTestCase.swift

@@ -1,78 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import NIO
-import NIOHTTP1
-@testable import GRPC
-import EchoModel
-import EchoImplementation
-import XCTest
-import Logging
-
-class CollectingChannelHandler<OutboundIn>: ChannelOutboundHandler {
-  var responses: [OutboundIn] = []
-
-  func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
-    promise?.succeed(())
-    responses.append(unwrapOutboundIn(data))
-  }
-}
-
-class CollectingServerErrorDelegate: ServerErrorDelegate {
-  var errors: [Error] = []
-
-  func observeLibraryError(_ error: Error) {
-    self.errors.append(error)
-  }
-}
-
-class GRPCChannelHandlerResponseCapturingTestCase: GRPCTestCase {
-  static let echoProvider: [String: CallHandlerProvider] = ["echo.Echo": EchoProvider()]
-  class var defaultServiceProvider: [String: CallHandlerProvider] {
-    return echoProvider
-  }
-
-  func configureChannel(withHandlers handlers: [ChannelHandler]) -> EventLoopFuture<EmbeddedChannel> {
-    let channel = EmbeddedChannel()
-    return channel.pipeline.addHandlers(handlers, position: .first)
-      .map { _ in channel }
-  }
-
-  var errorCollector: CollectingServerErrorDelegate = CollectingServerErrorDelegate()
-
-  /// Waits for `count` responses to be collected and then returns them. The test fails if the number
-  /// of collected responses does not match the expected.
-  ///
-  /// - Parameters:
-  ///   - count: expected number of responses.
-  ///   - servicesByName: service providers keyed by their service name.
-  ///   - callback: a callback called after the channel has been setup, intended to "fill" the channel
-  ///     with messages. The callback is called before this function returns.
-  /// - Returns: The responses collected from the pipeline.
-  func waitForGRPCChannelHandlerResponses(
-    count: Int,
-    servicesByName: [String: CallHandlerProvider] = defaultServiceProvider,
-    callback: @escaping (EmbeddedChannel) throws -> Void
-  ) throws -> [_RawGRPCServerResponsePart] {
-    let collector = CollectingChannelHandler<_RawGRPCServerResponsePart>()
-    try configureChannel(withHandlers: [collector, GRPCChannelHandler(servicesByName: servicesByName, errorDelegate: errorCollector, logger: Logger(label: "io.grpc.testing"))])
-      .flatMapThrowing(callback)
-      .wait()
-
-    XCTAssertEqual(count, collector.responses.count)
-    return collector.responses
-  }
-}

+ 0 - 75
Tests/GRPCTests/GRPCChannelHandlerTests.swift

@@ -1,75 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import XCTest
-import NIO
-import NIOHTTP1
-@testable import GRPC
-import EchoModel
-
-class GRPCChannelHandlerTests: GRPCChannelHandlerResponseCapturingTestCase {
-  func testUnimplementedMethodReturnsUnimplementedStatus() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 1) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "unimplementedMethodName")
-      try channel.writeInbound(_RawGRPCServerRequestPart.head(requestHead))
-    }
-
-    let expectedError = GRPCError.RPCNotImplemented(rpc: "unimplementedMethodName")
-    XCTAssertEqual(expectedError, errorCollector.errors.first as? GRPCError.RPCNotImplemented)
-
-    responses[0].assertStatus { status in
-      XCTAssertEqual(status, expectedError.makeGRPCStatus())
-    }
-  }
-
-  func testImplementedMethodReturnsHeadersMessageAndStatus() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 3) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Get")
-      try channel.writeInbound(_RawGRPCServerRequestPart.head(requestHead))
-
-      let request = Echo_EchoRequest.with { $0.text = "echo!" }
-      let requestData = try request.serializedData()
-      var buffer = channel.allocator.buffer(capacity: requestData.count)
-      buffer.writeBytes(requestData)
-      try channel.writeInbound(_RawGRPCServerRequestPart.message(buffer))
-    }
-
-    responses[0].assertHeaders()
-    responses[1].assertMessage()
-    responses[2].assertStatus { status in
-      XCTAssertEqual(status.code, .ok)
-    }
-  }
-
-  func testImplementedMethodReturnsStatusForBadlyFormedProto() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 2) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Get")
-      try channel.writeInbound(_RawGRPCServerRequestPart.head(requestHead))
-
-      var buffer = channel.allocator.buffer(capacity: 3)
-      buffer.writeBytes([1, 2, 3])
-      try channel.writeInbound(_RawGRPCServerRequestPart.message(buffer))
-    }
-
-    let expectedError = GRPCError.DeserializationFailure()
-    XCTAssertEqual(expectedError, errorCollector.errors.first as? GRPCError.DeserializationFailure)
-
-    responses[0].assertHeaders()
-    responses[1].assertStatus { status in
-      XCTAssertEqual(status, expectedError.makeGRPCStatus())
-    }
-  }
-}

+ 119 - 0
Tests/GRPCTests/GRPCServerRequestRoutingHandlerTests.swift

@@ -0,0 +1,119 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import XCTest
+import NIO
+import NIOHTTP1
+import GRPC
+import EchoModel
+import EchoImplementation
+import Logging
+
+class GRPCServerRequestRoutingHandlerTests: GRPCTestCase {
+  var channel: EmbeddedChannel!
+
+  override func setUp() {
+    super.setUp()
+
+    let logger = Logger(label: "io.grpc.testing")
+    let provider = EchoProvider()
+    let handler = GRPCServerRequestRoutingHandler(
+      servicesByName: [provider.serviceName: provider],
+      errorDelegate: nil,
+      logger: logger
+    )
+
+    self.channel = EmbeddedChannel(handler: handler)
+  }
+
+  override func tearDown() {
+    XCTAssertNoThrow(try self.channel.finish())
+  }
+
+  func testInvalidGRPCContentTypeReturnsUnsupportedMediaType() throws {
+    let requestHead = HTTPRequestHead(
+      version: .init(major: 2, minor: 0),
+      method: .POST,
+      uri: "/echo.Echo/Get",
+      headers: ["content-type": "not-grpc"]
+    )
+
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(requestHead)))
+
+    let firstResponsePart = try self.channel.readOutbound(as: HTTPServerResponsePart.self)
+    switch firstResponsePart {
+    case .some(.head(let head)):
+      XCTAssertEqual(head.status, .unsupportedMediaType)
+    default:
+      XCTFail("Unexpected response part: \(String(describing: firstResponsePart))")
+    }
+
+    let secondResponsePart = try self.channel.readOutbound(as: HTTPServerResponsePart.self)
+    switch secondResponsePart {
+    case .some(.end(nil)):
+      ()
+    default:
+      XCTFail("Unexpected response part: \(String(describing: secondResponsePart))")
+    }
+  }
+
+  func testUnimplementedMethodReturnsUnimplementedStatus() throws {
+    let requestHead = HTTPRequestHead(
+      version: .init(major: 2, minor: 0),
+      method: .POST,
+      uri: "/foo/Bar",
+      headers: ["content-type": "application/grpc"]
+    )
+
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(requestHead)))
+
+    let firstResponsePart = try self.channel.readOutbound(as: HTTPServerResponsePart.self)
+    switch firstResponsePart {
+    case .some(.head(let head)):
+      XCTAssertEqual(head.status, .ok)
+      XCTAssertEqual(head.headers.first(name: "grpc-status"), "\(GRPCStatus.Code.unimplemented.rawValue)")
+    default:
+      XCTFail("Unexpected response part: \(String(describing: firstResponsePart))")
+    }
+
+    let secondResponsePart = try self.channel.readOutbound(as: HTTPServerResponsePart.self)
+    switch secondResponsePart {
+    case .some(.end(nil)):
+      ()
+    default:
+      XCTFail("Unexpected response part: \(String(describing: secondResponsePart))")
+    }
+  }
+
+  func testImplementedMethodReconfiguresPipeline() throws {
+    let requestHead = HTTPRequestHead(
+      version: .init(major: 2, minor: 0),
+      method: .POST,
+      uri: "/echo.Echo/Get",
+      headers: ["content-type": "application/grpc"]
+    )
+
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(requestHead)))
+
+    // The router should be removed from the pipeline.
+    let router = self.channel.pipeline.handler(type: GRPCServerRequestRoutingHandler.self)
+    XCTAssertThrowsError(try router.wait())
+
+    // There should now be a unary call handler.
+    let unary = self.channel.pipeline.handler(type: UnaryCallHandler<Echo_EchoRequest, Echo_EchoResponse>.self)
+    XCTAssertNoThrow(try unary.wait())
+  }
+}

+ 138 - 0
Tests/GRPCTests/HTTP1ToGRPCServerCodecTests.swift

@@ -0,0 +1,138 @@
+/*
+ * Copyright 2019, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import XCTest
+import NIO
+import NIOHTTP1
+@testable import GRPC
+import EchoModel
+import EchoImplementation
+import Logging
+
+
+class HTTP1ToGRPCServerCodecTests: GRPCTestCase {
+  var channel: EmbeddedChannel!
+
+  override func setUp() {
+    super.setUp()
+    let logger = Logger(label: "io.grpc.testing")
+    let handler = HTTP1ToGRPCServerCodec<Echo_EchoRequest, Echo_EchoResponse>(logger: logger)
+    self.channel = EmbeddedChannel(handler: handler)
+  }
+
+  override func tearDown() {
+    super.tearDown()
+    XCTAssertNoThrow(try self.channel.finish())
+  }
+
+  func makeRequestHead() -> HTTPRequestHead {
+    return HTTPRequestHead(
+      version: .init(major: 2, minor: 0),
+      method: .POST,
+      uri: "/echo.Echo/Get"
+    )
+  }
+
+  func testDeserializationErrorOnInvalidMessageBytes() throws {
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.makeRequestHead())))
+    var buffer = self.channel.allocator.buffer(capacity: 0)
+    buffer.writeInteger(UInt8(0))  // not compressed
+    buffer.writeInteger(UInt32(3)) // message is 3 bytes
+    buffer.writeBytes([42, 42, 42])
+    XCTAssertThrowsError(try self.channel.writeInbound(HTTPServerRequestPart.body(buffer))) { error in
+      let withContext = error as? GRPCError.WithContext
+      XCTAssertTrue(withContext?.error is GRPCError.DeserializationFailure)
+      XCTAssertEqual(withContext?.error.makeGRPCStatus().code, .internalError)
+    }
+  }
+
+  func testSingleMessageFromMultipleBodyParts() throws {
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.makeRequestHead())))
+    let requestPart = try self.channel.readInbound(as: _GRPCServerRequestPart<Echo_EchoRequest>.self)
+
+    switch requestPart {
+    case .some(.head):
+      ()
+    default:
+      XCTFail("Unexpected request part: \(String(describing: requestPart))")
+    }
+
+    // Write a message across multiple buffers.
+    let message = Echo_EchoRequest.with { $0.text = String(repeating: "x", count: 42) }
+    let data = try message.serializedData()
+
+    // Split the payload into two parts.
+    let halfIndex = data.count / 2
+    let firstChunk = data[0..<halfIndex]
+    let secondChunk = data[halfIndex...]
+
+    // Frame the message; send it in 2 parts.
+    var firstBuffer = self.channel.allocator.buffer(capacity: firstChunk.count + 5)
+    firstBuffer.writeInteger(UInt8(0))
+    firstBuffer.writeInteger(UInt32(data.count))
+    firstBuffer.writeBytes(firstChunk)
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(firstBuffer)))
+
+    var secondBuffer = self.channel.allocator.buffer(capacity: secondChunk.count)
+    secondBuffer.writeBytes(secondChunk)
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(secondBuffer)))
+
+    let messagePart = try self.channel.readInbound(as: _GRPCServerRequestPart<Echo_EchoRequest>.self)
+    switch messagePart {
+    case .some(.message(let actual)):
+      XCTAssertEqual(message, actual)
+    default:
+      XCTFail("Unexpected request part: \(String(describing: requestPart))")
+    }
+  }
+
+  func testMultipleMessagesFromSingleBodyPart() throws {
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.makeRequestHead())))
+    let requestPart = try self.channel.readInbound(as: _GRPCServerRequestPart<Echo_EchoRequest>.self)
+
+    switch requestPart {
+    case .some(.head):
+      ()
+    default:
+      XCTFail("Unexpected request part: \(String(describing: requestPart))")
+    }
+
+    // Write three messages into a single body.
+    var buffer = self.channel.allocator.buffer(capacity: 0)
+    let messages = ["foo", "bar", "baz"].map { text in
+      Echo_EchoRequest.with { $0.text = text }
+    }
+
+    for message in messages {
+      let data = try message.serializedData()
+      buffer.writeInteger(UInt8(0))
+      buffer.writeInteger(UInt32(data.count))
+      buffer.writeBytes(data)
+    }
+
+    XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(buffer)))
+
+    for message in messages {
+      let requestPart = try self.channel.readInbound(as: _GRPCServerRequestPart<Echo_EchoRequest>.self)
+      switch requestPart {
+      case .some(.message(let actual)):
+        XCTAssertEqual(message, actual)
+      default:
+        XCTFail("Unexpected request part: \(String(describing: requestPart))")
+      }
+    }
+  }
+}

+ 0 - 157
Tests/GRPCTests/HTTP1ToRawGRPCServerCodecTests.swift

@@ -1,157 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import XCTest
-import NIO
-import NIOHTTP1
-@testable import GRPC
-import EchoModel
-import Logging
-
-func gRPCMessage(channel: EmbeddedChannel, compression: Bool = false, message: Data? = nil) -> ByteBuffer {
-  let messageLength = message?.count ?? 0
-  var buffer = channel.allocator.buffer(capacity: 5 + messageLength)
-  buffer.writeInteger(Int8(compression ? 1 : 0))
-  buffer.writeInteger(UInt32(messageLength))
-  if let bytes = message {
-    buffer.writeBytes(bytes)
-  }
-  return buffer
-}
-
-class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCase {
-  func testInternalErrorStatusReturnedWhenCompressionFlagIsSet() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 2) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Get")
-      try channel.writeInbound(HTTPServerRequestPart.head(requestHead))
-      try channel.writeInbound(HTTPServerRequestPart.body(gRPCMessage(channel: channel, compression: true)))
-    }
-
-    let expectedError = GRPCError.CompressionUnsupported()
-    XCTAssertEqual(expectedError, errorCollector.errors.first as? GRPCError.CompressionUnsupported)
-
-    responses[0].assertHeaders()
-    responses[1].assertStatus { status in
-      XCTAssertEqual(status, expectedError.makeGRPCStatus())
-    }
-  }
-
-  func testMessageCanBeSentAcrossMultipleByteBuffers() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 3) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Get")
-      // Sending the header allocates a buffer.
-      try channel.writeInbound(HTTPServerRequestPart.head(requestHead))
-
-      let request = Echo_EchoRequest.with { $0.text = "echo!" }
-      let requestAsData = try request.serializedData()
-
-      var buffer = channel.allocator.buffer(capacity: 1)
-      buffer.writeInteger(Int8(0))
-      try channel.writeInbound(HTTPServerRequestPart.body(buffer))
-
-      buffer = channel.allocator.buffer(capacity: 4)
-      buffer.writeInteger(Int32(requestAsData.count))
-      try channel.writeInbound(HTTPServerRequestPart.body(buffer))
-
-      buffer = channel.allocator.buffer(capacity: requestAsData.count)
-      buffer.writeBytes(requestAsData)
-      try channel.writeInbound(HTTPServerRequestPart.body(buffer))
-    }
-
-    responses[0].assertHeaders()
-    responses[1].assertMessage()
-    responses[2].assertStatus { status in
-      XCTAssertEqual(status, .ok)
-    }
-  }
-
-  func testInternalErrorStatusIsReturnedIfMessageCannotBeDeserialized() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 2) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Get")
-      try channel.writeInbound(HTTPServerRequestPart.head(requestHead))
-
-      let buffer = gRPCMessage(channel: channel, message: Data([42]))
-      try channel.writeInbound(HTTPServerRequestPart.body(buffer))
-    }
-
-    let expectedError = GRPCError.DeserializationFailure()
-    XCTAssertEqual(expectedError, errorCollector.errors.first as? GRPCError.DeserializationFailure)
-
-    responses[0].assertHeaders()
-    responses[1].assertStatus { status in
-      XCTAssertEqual(status, expectedError.makeGRPCStatus())
-    }
-  }
-
-  func testInternalErrorStatusIsReturnedWhenSendingTrailersInRequest() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 2) { channel in
-      // We have to use "Collect" (client streaming) as the tests rely on `EmbeddedChannel` which runs in this thread.
-      // In the current server implementation, responses from unary calls send a status immediately after sending the response.
-      // As such, a unary "Get" would return an "ok" status before the trailers would be sent.
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Collect")
-      try channel.writeInbound(HTTPServerRequestPart.head(requestHead))
-      try channel.writeInbound(HTTPServerRequestPart.body(gRPCMessage(channel: channel)))
-
-      var trailers = HTTPHeaders()
-      trailers.add(name: "foo", value: "bar")
-      try channel.writeInbound(HTTPServerRequestPart.end(trailers))
-    }
-
-    XCTAssertEqual(errorCollector.errors.count, 1)
-
-    let expected = GRPCError.InvalidState("unexpected trailers received")
-    XCTAssertEqual(expected, errorCollector.errors.first as? GRPCError.InvalidState)
-
-    responses[0].assertHeaders()
-    responses[1].assertStatus { status in
-      XCTAssertEqual(status, expected.makeGRPCStatus())
-    }
-  }
-
-  func testOnlyOneStatusIsReturned() throws {
-    let responses = try waitForGRPCChannelHandlerResponses(count: 3) { channel in
-      let requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: "/echo.Echo/Get")
-      try channel.writeInbound(HTTPServerRequestPart.head(requestHead))
-      try channel.writeInbound(HTTPServerRequestPart.body(gRPCMessage(channel: channel)))
-
-      // Sending trailers with `.end` should trigger an error. However, writing a message to a unary call
-      // will trigger a response and status to be sent back. Since we're using `EmbeddedChannel` this will
-      // be done before the trailers are sent. If a 4th response were to be sent (for the error status) then
-      // the test would fail.
-
-      var trailers = HTTPHeaders()
-      trailers.add(name: "foo", value: "bar")
-      try channel.writeInbound(HTTPServerRequestPart.end(trailers))
-    }
-
-    responses[0].assertHeaders()
-    responses[1].assertMessage()
-    responses[2].assertStatus { status in
-      XCTAssertEqual(status, .ok)
-    }
-  }
-
-  override func waitForGRPCChannelHandlerResponses(
-    count: Int,
-    servicesByName: [String: CallHandlerProvider] = GRPCChannelHandlerResponseCapturingTestCase.echoProvider,
-    callback: @escaping (EmbeddedChannel) throws -> Void
-    ) throws -> [_RawGRPCServerResponsePart] {
-    return try super.waitForGRPCChannelHandlerResponses(count: count, servicesByName: servicesByName) { channel in
-      _ = channel.pipeline.addHandlers(HTTP1ToRawGRPCServerCodec(logger: Logger(label: "io.grpc.testing")), position: .first)
-        .flatMapThrowing { _ in try callback(channel) }
-    }
-  }
-}

+ 0 - 55
Tests/GRPCTests/RawGRPCServerResponsePart+Assertions.swift

@@ -1,55 +0,0 @@
-/*
- * Copyright 2019, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import Foundation
-import GRPC
-import NIOHTTP1
-import NIO
-import XCTest
-
-extension _RawGRPCServerResponsePart {
-  /// Asserts that this value represents the headers case.
-  ///
-  /// - Parameter validate: A block to further validate the headers.
-  func assertHeaders(validate: ((HTTPHeaders) -> Void)? = nil) {
-    guard case .headers(let headers) = self else {
-      XCTFail("Expected .headers but got \(self)")
-      return
-    }
-    validate?(headers)
-  }
-
-  /// Asserts that this value represents the message case.
-  ///
-  /// - Parameter validate: A block to further validate the message.
-  func assertMessage(validate: ((NIO.ByteBuffer) -> Void)? = nil) {
-    guard case .message(let message) = self else {
-      XCTFail("Expected .message but got \(self)")
-      return
-    }
-    validate?(message)
-  }
-
-  /// Asserts that this value represents the status case.
-  ///
-  /// - Parameter validate: A block to further validate the status.
-  func assertStatus(validate: ((GRPCStatus) -> Void)? = nil) {
-    guard case let .statusAndTrailers(status, _) = self else {
-      XCTFail("Expected .status but got \(self)")
-      return
-    }
-    validate?(status)
-  }
-}

+ 18 - 20
Tests/GRPCTests/XCTestManifests.swift

@@ -248,17 +248,6 @@ extension FunctionalTestsMutualAuthenticationNIOTS {
     ]
 }
 
-extension GRPCChannelHandlerTests {
-    // DO NOT MODIFY: This is autogenerated, use:
-    //   `swift test --generate-linuxmain`
-    // to regenerate.
-    static let __allTests__GRPCChannelHandlerTests = [
-        ("testImplementedMethodReturnsHeadersMessageAndStatus", testImplementedMethodReturnsHeadersMessageAndStatus),
-        ("testImplementedMethodReturnsStatusForBadlyFormedProto", testImplementedMethodReturnsStatusForBadlyFormedProto),
-        ("testUnimplementedMethodReturnsUnimplementedStatus", testUnimplementedMethodReturnsUnimplementedStatus),
-    ]
-}
-
 extension GRPCClientStateMachineTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -373,6 +362,17 @@ extension GRPCSecureInteroperabilityTests {
     ]
 }
 
+extension GRPCServerRequestRoutingHandlerTests {
+    // DO NOT MODIFY: This is autogenerated, use:
+    //   `swift test --generate-linuxmain`
+    // to regenerate.
+    static let __allTests__GRPCServerRequestRoutingHandlerTests = [
+        ("testImplementedMethodReconfiguresPipeline", testImplementedMethodReconfiguresPipeline),
+        ("testInvalidGRPCContentTypeReturnsUnsupportedMediaType", testInvalidGRPCContentTypeReturnsUnsupportedMediaType),
+        ("testUnimplementedMethodReturnsUnimplementedStatus", testUnimplementedMethodReturnsUnimplementedStatus),
+    ]
+}
+
 extension GRPCStatusCodeTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
@@ -439,16 +439,14 @@ extension GRPCTypeSizeTests {
     ]
 }
 
-extension HTTP1ToRawGRPCServerCodecTests {
+extension HTTP1ToGRPCServerCodecTests {
     // DO NOT MODIFY: This is autogenerated, use:
     //   `swift test --generate-linuxmain`
     // to regenerate.
-    static let __allTests__HTTP1ToRawGRPCServerCodecTests = [
-        ("testInternalErrorStatusIsReturnedIfMessageCannotBeDeserialized", testInternalErrorStatusIsReturnedIfMessageCannotBeDeserialized),
-        ("testInternalErrorStatusIsReturnedWhenSendingTrailersInRequest", testInternalErrorStatusIsReturnedWhenSendingTrailersInRequest),
-        ("testInternalErrorStatusReturnedWhenCompressionFlagIsSet", testInternalErrorStatusReturnedWhenCompressionFlagIsSet),
-        ("testMessageCanBeSentAcrossMultipleByteBuffers", testMessageCanBeSentAcrossMultipleByteBuffers),
-        ("testOnlyOneStatusIsReturned", testOnlyOneStatusIsReturned),
+    static let __allTests__HTTP1ToGRPCServerCodecTests = [
+        ("testDeserializationErrorOnInvalidMessageBytes", testDeserializationErrorOnInvalidMessageBytes),
+        ("testMultipleMessagesFromSingleBodyPart", testMultipleMessagesFromSingleBodyPart),
+        ("testSingleMessageFromMultipleBodyParts", testSingleMessageFromMultipleBodyParts),
     ]
 }
 
@@ -626,16 +624,16 @@ public func __allTests() -> [XCTestCaseEntry] {
         testCase(FunctionalTestsInsecureTransportNIOTS.__allTests__FunctionalTestsInsecureTransportNIOTS),
         testCase(FunctionalTestsMutualAuthentication.__allTests__FunctionalTestsMutualAuthentication),
         testCase(FunctionalTestsMutualAuthenticationNIOTS.__allTests__FunctionalTestsMutualAuthenticationNIOTS),
-        testCase(GRPCChannelHandlerTests.__allTests__GRPCChannelHandlerTests),
         testCase(GRPCClientStateMachineTests.__allTests__GRPCClientStateMachineTests),
         testCase(GRPCInsecureInteroperabilityTests.__allTests__GRPCInsecureInteroperabilityTests),
         testCase(GRPCSecureInteroperabilityTests.__allTests__GRPCSecureInteroperabilityTests),
+        testCase(GRPCServerRequestRoutingHandlerTests.__allTests__GRPCServerRequestRoutingHandlerTests),
         testCase(GRPCStatusCodeTests.__allTests__GRPCStatusCodeTests),
         testCase(GRPCStatusMessageMarshallerTests.__allTests__GRPCStatusMessageMarshallerTests),
         testCase(GRPCStatusTests.__allTests__GRPCStatusTests),
         testCase(GRPCTimeoutTests.__allTests__GRPCTimeoutTests),
         testCase(GRPCTypeSizeTests.__allTests__GRPCTypeSizeTests),
-        testCase(HTTP1ToRawGRPCServerCodecTests.__allTests__HTTP1ToRawGRPCServerCodecTests),
+        testCase(HTTP1ToGRPCServerCodecTests.__allTests__HTTP1ToGRPCServerCodecTests),
         testCase(ImmediatelyFailingProviderTests.__allTests__ImmediatelyFailingProviderTests),
         testCase(LengthPrefixedMessageReaderTests.__allTests__LengthPrefixedMessageReaderTests),
         testCase(PlatformSupportTests.__allTests__PlatformSupportTests),