2
0

GRPCChannelHandler.swift 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
  6. public protocol GRPCCallHandler: ChannelHandler {
  7. func makeGRPCServerCodec() -> ChannelHandler
  8. }
  9. /// Provides `GRPCCallHandler` objects for the methods on a particular service name.
  10. ///
  11. /// Implemented by the generated code.
  12. public protocol CallHandlerProvider: class {
  13. /// The name of the service this object is providing methods for, including the package path.
  14. ///
  15. /// - Example: "io.grpc.Echo.EchoService"
  16. var serviceName: String { get }
  17. /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
  18. /// method. Returns nil for methods not handled by this service.
  19. func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler?
  20. }
  21. /// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available.
  22. ///
  23. /// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
  24. /// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
  25. public final class GRPCChannelHandler {
  26. private let servicesByName: [String: CallHandlerProvider]
  27. public init(servicesByName: [String: CallHandlerProvider]) {
  28. self.servicesByName = servicesByName
  29. }
  30. }
  31. extension GRPCChannelHandler: ChannelInboundHandler {
  32. public typealias InboundIn = RawGRPCServerRequestPart
  33. public typealias OutboundOut = RawGRPCServerResponsePart
  34. public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
  35. let requestPart = self.unwrapInboundIn(data)
  36. switch requestPart {
  37. case .head(let requestHead):
  38. // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
  39. // - uriComponents[0]: empty
  40. // - uriComponents[1]: service name (including the package name);
  41. // `CallHandlerProvider`s should provide the service name including the package name.
  42. // - uriComponents[2]: method name.
  43. let uriComponents = requestHead.uri.components(separatedBy: "/")
  44. guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
  45. let providerForServiceName = servicesByName[uriComponents[1]],
  46. let callHandler = providerForServiceName.handleMethod(uriComponents[2], request: requestHead, serverHandler: self, channel: ctx.channel) else {
  47. ctx.writeAndFlush(self.wrapOutboundOut(.status(.unimplemented(method: requestHead.uri))), promise: nil)
  48. return
  49. }
  50. let codec = callHandler.makeGRPCServerCodec()
  51. let handlerRemoved: EventLoopPromise<Bool> = ctx.eventLoop.newPromise()
  52. handlerRemoved.futureResult.whenSuccess { handlerWasRemoved in
  53. assert(handlerWasRemoved)
  54. ctx.pipeline.add(handler: callHandler, after: codec).whenComplete {
  55. var responseHeaders = HTTPHeaders()
  56. responseHeaders.add(name: "content-type", value: "application/grpc")
  57. ctx.write(self.wrapOutboundOut(.headers(responseHeaders)), promise: nil)
  58. }
  59. }
  60. ctx.pipeline.add(handler: codec, after: self)
  61. .whenComplete { ctx.pipeline.remove(handler: self, promise: handlerRemoved) }
  62. case .message, .end:
  63. preconditionFailure("received \(requestPart), should have been removed as a handler at this point")
  64. }
  65. }
  66. }