GRPCChannelHandler.swift 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import Foundation
  2. import SwiftProtobuf
  3. import NIO
  4. import NIOHTTP1
  5. /// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
  6. public protocol GRPCCallHandler: ChannelHandler {
  7. func makeGRPCServerCodec() -> ChannelHandler
  8. }
  9. /// Provides `GRPCCallHandler` objects for the methods on a particular service name.
  10. ///
  11. /// Implemented by the generated code.
  12. public protocol CallHandlerProvider: class {
  13. /// The name of the service this object is providing methods for, including the package path.
  14. ///
  15. /// - Example: "io.grpc.Echo.EchoService"
  16. var serviceName: String { get }
  17. /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
  18. /// method. Returns nil for methods not handled by this service.
  19. func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel, errorDelegate: ServerErrorDelegate?) -> GRPCCallHandler?
  20. }
  21. /// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available.
  22. ///
  23. /// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
  24. /// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
  25. public final class GRPCChannelHandler {
  26. private let servicesByName: [String: CallHandlerProvider]
  27. private weak var errorDelegate: ServerErrorDelegate?
  28. public init(servicesByName: [String: CallHandlerProvider], errorDelegate: ServerErrorDelegate?) {
  29. self.servicesByName = servicesByName
  30. self.errorDelegate = errorDelegate
  31. }
  32. }
  33. extension GRPCChannelHandler: ChannelInboundHandler, RemovableChannelHandler {
  34. public typealias InboundIn = RawGRPCServerRequestPart
  35. public typealias OutboundOut = RawGRPCServerResponsePart
  36. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  37. errorDelegate?.observeLibraryError(error)
  38. let status = errorDelegate?.transformLibraryError(error)
  39. ?? (error as? GRPCStatusTransformable)?.asGRPCStatus()
  40. ?? .processingError
  41. context.writeAndFlush(wrapOutboundOut(.status(status)), promise: nil)
  42. }
  43. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  44. let requestPart = self.unwrapInboundIn(data)
  45. switch requestPart {
  46. case .head(let requestHead):
  47. guard let callHandler = getCallHandler(channel: context.channel, requestHead: requestHead) else {
  48. errorCaught(context: context, error: GRPCError.server(.unimplementedMethod(requestHead.uri)))
  49. return
  50. }
  51. let codec = callHandler.makeGRPCServerCodec()
  52. let handlerRemoved: EventLoopPromise<Void> = context.eventLoop.makePromise()
  53. handlerRemoved.futureResult.whenSuccess {
  54. context.pipeline.addHandler(callHandler, position: .after(codec)).whenComplete { _ in
  55. // Send the .headers event back to begin the headers flushing for the response.
  56. // At this point, which headers should be returned is not known, as the content type is
  57. // processed in HTTP1ToRawGRPCServerCodec. At the same time the HTTP1ToRawGRPCServerCodec
  58. // handler doesn't have the data to determine whether headers should be returned, as it is
  59. // this handler that checks whether the stub for the requested Service/Method is implemented.
  60. // This likely signals that the architecture for these handlers could be improved.
  61. context.writeAndFlush(self.wrapOutboundOut(.headers(HTTPHeaders())), promise: nil)
  62. }
  63. }
  64. context.pipeline.addHandler(codec, position: .after(self))
  65. .whenSuccess { context.pipeline.removeHandler(context: context, promise: handlerRemoved) }
  66. case .message, .end:
  67. // We can reach this point if we're receiving messages for a method that isn't implemented.
  68. // A status resposne will have been fired which should also close the stream; there's not a
  69. // lot we can do at this point.
  70. break
  71. }
  72. }
  73. private func getCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
  74. // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
  75. // - uriComponents[0]: empty
  76. // - uriComponents[1]: service name (including the package name);
  77. // `CallHandlerProvider`s should provide the service name including the package name.
  78. // - uriComponents[2]: method name.
  79. let uriComponents = requestHead.uri.components(separatedBy: "/")
  80. guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
  81. let providerForServiceName = servicesByName[uriComponents[1]],
  82. let callHandler = providerForServiceName.handleMethod(uriComponents[2], request: requestHead, serverHandler: self, channel: channel, errorDelegate: errorDelegate) else {
  83. return nil
  84. }
  85. return callHandler
  86. }
  87. }