2
0

GRPCChannelHandler.swift 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// Processes individual gRPC messages and stream-close events on a HTTP2 channel.
  22. public protocol GRPCCallHandler: ChannelHandler {
  23. func makeGRPCServerCodec() -> ChannelHandler
  24. }
  25. /// Provides `GRPCCallHandler` objects for the methods on a particular service name.
  26. ///
  27. /// Implemented by the generated code.
  28. public protocol CallHandlerProvider: class {
  29. /// The name of the service this object is providing methods for, including the package path.
  30. ///
  31. /// - Example: "io.grpc.Echo.EchoService"
  32. var serviceName: String { get }
  33. /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
  34. /// method. Returns nil for methods not handled by this service.
  35. func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler?
  36. }
  37. // This is public because it will be passed into generated code, all memebers are `internal` because
  38. // the context will get passed from generated code back into gRPC library code and all members should
  39. // be considered an implementation detail to the user.
  40. public struct CallHandlerContext {
  41. internal var request: HTTPRequestHead
  42. internal var channel: Channel
  43. internal var errorDelegate: ServerErrorDelegate?
  44. internal var logger: Logger
  45. }
  46. /// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available.
  47. ///
  48. /// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
  49. /// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
  50. public final class GRPCChannelHandler {
  51. private let logger: Logger
  52. private let servicesByName: [String: CallHandlerProvider]
  53. private weak var errorDelegate: ServerErrorDelegate?
  54. public init(servicesByName: [String: CallHandlerProvider], errorDelegate: ServerErrorDelegate?, logger: Logger) {
  55. self.servicesByName = servicesByName
  56. self.errorDelegate = errorDelegate
  57. self.logger = logger.addingMetadata(key: MetadataKey.channelHandler, value: "GRPCChannelHandler")
  58. }
  59. }
  60. extension GRPCChannelHandler: ChannelInboundHandler, RemovableChannelHandler {
  61. public typealias InboundIn = RawGRPCServerRequestPart
  62. public typealias OutboundOut = RawGRPCServerResponsePart
  63. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  64. self.errorDelegate?.observeLibraryError(error)
  65. let status = self.errorDelegate?.transformLibraryError(error)
  66. ?? (error as? GRPCStatusTransformable)?.asGRPCStatus()
  67. ?? .processingError
  68. context.writeAndFlush(wrapOutboundOut(.statusAndTrailers(status, HTTPHeaders())), promise: nil)
  69. }
  70. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  71. let requestPart = self.unwrapInboundIn(data)
  72. switch requestPart {
  73. case .head(let requestHead):
  74. guard let callHandler = self.makeCallHandler(channel: context.channel, requestHead: requestHead) else {
  75. self.errorCaught(context: context, error: GRPCError.server(.unimplementedMethod(requestHead.uri)))
  76. return
  77. }
  78. logger.info("received request head, configuring pipeline")
  79. let codec = callHandler.makeGRPCServerCodec()
  80. let handlerRemoved: EventLoopPromise<Void> = context.eventLoop.makePromise()
  81. handlerRemoved.futureResult.whenSuccess {
  82. self.logger.info("removed GRPCChannelHandler from pipeline")
  83. context.pipeline.addHandler(callHandler, position: .after(codec)).whenComplete { _ in
  84. // Send the .headers event back to begin the headers flushing for the response.
  85. // At this point, which headers should be returned is not known, as the content type is
  86. // processed in HTTP1ToRawGRPCServerCodec. At the same time the HTTP1ToRawGRPCServerCodec
  87. // handler doesn't have the data to determine whether headers should be returned, as it is
  88. // this handler that checks whether the stub for the requested Service/Method is implemented.
  89. // This likely signals that the architecture for these handlers could be improved.
  90. context.writeAndFlush(self.wrapOutboundOut(.headers(HTTPHeaders())), promise: nil)
  91. }
  92. }
  93. logger.info("adding handler \(type(of: codec)) to pipeline")
  94. context.pipeline.addHandler(codec, position: .after(self))
  95. .whenSuccess { context.pipeline.removeHandler(context: context, promise: handlerRemoved) }
  96. case .message, .end:
  97. // We can reach this point if we're receiving messages for a method that isn't implemented.
  98. // A status resposne will have been fired which should also close the stream; there's not a
  99. // lot we can do at this point.
  100. break
  101. }
  102. }
  103. private func makeCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
  104. // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
  105. // - uriComponents[0]: empty
  106. // - uriComponents[1]: service name (including the package name);
  107. // `CallHandlerProvider`s should provide the service name including the package name.
  108. // - uriComponents[2]: method name.
  109. self.logger.info("making call handler", metadata: ["path": "\(requestHead.uri)"])
  110. let uriComponents = requestHead.uri.components(separatedBy: "/")
  111. var logger = self.logger
  112. // Unset the channel handler: it shouldn't be used for downstream handlers.
  113. logger[metadataKey: MetadataKey.channelHandler] = nil
  114. let context = CallHandlerContext(
  115. request: requestHead,
  116. channel: channel,
  117. errorDelegate: self.errorDelegate,
  118. logger: logger
  119. )
  120. guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
  121. let providerForServiceName = servicesByName[uriComponents[1]],
  122. let callHandler = providerForServiceName.handleMethod(uriComponents[2], callHandlerContext: context) else {
  123. self.logger.notice("could not create handler", metadata: ["path": "\(requestHead.uri)"])
  124. return nil
  125. }
  126. return callHandler
  127. }
  128. }