GRPCServerRequestRoutingHandler.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP1
  19. import SwiftProtobuf
  20. /// Processes individual gRPC messages and stream-close events on an HTTP2 channel.
  21. public protocol GRPCCallHandler: ChannelHandler {
  22. var _codec: ChannelHandler { get }
  23. }
  24. /// Provides `GRPCCallHandler` objects for the methods on a particular service name.
  25. ///
  26. /// Implemented by the generated code.
  27. public protocol CallHandlerProvider: AnyObject {
  28. /// The name of the service this object is providing methods for, including the package path.
  29. ///
  30. /// - Example: "io.grpc.Echo.EchoService"
  31. var serviceName: Substring { get }
  32. /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
  33. /// method. Returns nil for methods not handled by this service.
  34. func handleMethod(_ methodName: Substring, callHandlerContext: CallHandlerContext)
  35. -> GRPCCallHandler?
  36. }
  37. // This is public because it will be passed into generated code, all members are `internal` because
  38. // the context will get passed from generated code back into gRPC library code and all members should
  39. // be considered an implementation detail to the user.
  40. public struct CallHandlerContext {
  41. internal var errorDelegate: ServerErrorDelegate?
  42. internal var logger: Logger
  43. internal var encoding: ServerMessageEncoding
  44. internal var eventLoop: EventLoop
  45. internal var path: String
  46. }
  47. /// Attempts to route a request to a user-provided call handler. Also validates that the request has
  48. /// a suitable 'content-type' for gRPC.
  49. ///
  50. /// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
  51. /// for a `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
  52. ///
  53. /// After the pipeline has been configured with the `GRPCCallHandler`, this handler removes itself
  54. /// from the pipeline.
  55. public final class GRPCServerRequestRoutingHandler {
  56. private let logger: Logger
  57. private let servicesByName: [Substring: CallHandlerProvider]
  58. private let encoding: ServerMessageEncoding
  59. private weak var errorDelegate: ServerErrorDelegate?
  60. private enum State: Equatable {
  61. case notConfigured
  62. case configuring([InboundOut])
  63. }
  64. private var state: State = .notConfigured
  65. public init(
  66. servicesByName: [Substring: CallHandlerProvider],
  67. encoding: ServerMessageEncoding,
  68. errorDelegate: ServerErrorDelegate?,
  69. logger: Logger
  70. ) {
  71. self.servicesByName = servicesByName
  72. self.encoding = encoding
  73. self.errorDelegate = errorDelegate
  74. self.logger = logger
  75. }
  76. }
  77. extension GRPCServerRequestRoutingHandler: ChannelInboundHandler, RemovableChannelHandler {
  78. public typealias InboundIn = HTTPServerRequestPart
  79. public typealias InboundOut = HTTPServerRequestPart
  80. public typealias OutboundOut = HTTPServerResponsePart
  81. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  82. let status: GRPCStatus
  83. if let errorWithContext = error as? GRPCError.WithContext {
  84. self.errorDelegate?.observeLibraryError(errorWithContext.error)
  85. status = errorWithContext.error.makeGRPCStatus()
  86. } else {
  87. self.errorDelegate?.observeLibraryError(error)
  88. status = (error as? GRPCStatusTransformable)?.makeGRPCStatus() ?? .processingError
  89. }
  90. switch self.state {
  91. case .notConfigured:
  92. // We don't know what protocol we're speaking at this point. We'll just have to close the
  93. // channel.
  94. ()
  95. case let .configuring(messages):
  96. // first! is fine here: we only go from `.notConfigured` to `.configuring` when we receive
  97. // and validate the request head.
  98. let head = messages.compactMap { part -> HTTPRequestHead? in
  99. switch part {
  100. case let .head(head):
  101. return head
  102. default:
  103. return nil
  104. }
  105. }.first!
  106. let responseHead = self.makeResponseHead(requestHead: head, status: status)
  107. context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
  108. context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
  109. context.flush()
  110. }
  111. context.close(mode: .all, promise: nil)
  112. }
  113. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  114. let requestPart = self.unwrapInboundIn(data)
  115. switch requestPart {
  116. case let .head(requestHead):
  117. precondition(self.state == .notConfigured)
  118. // Validate the 'content-type' is related to gRPC before proceeding.
  119. let maybeContentType = requestHead.headers.first(name: GRPCHeaderName.contentType)
  120. guard let contentType = maybeContentType,
  121. contentType.starts(with: ContentType.commonPrefix) else {
  122. self.logger.warning(
  123. "received request whose 'content-type' does not exist or start with '\(ContentType.commonPrefix)'",
  124. metadata: ["content-type": "\(String(describing: maybeContentType))"]
  125. )
  126. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  127. //
  128. // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
  129. // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
  130. // clients from interpreting a gRPC error response, which uses status 200 (OK), as
  131. // successful.
  132. let responseHead = HTTPResponseHead(
  133. version: requestHead.version,
  134. status: .unsupportedMediaType
  135. )
  136. // Fail the call. Note: we're not speaking gRPC here, so no status or message.
  137. context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
  138. context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
  139. return
  140. }
  141. // Do we know how to handle this RPC?
  142. guard let callHandler = self.makeCallHandler(
  143. channel: context.channel,
  144. requestHead: requestHead
  145. ) else {
  146. self.logger.warning(
  147. "unable to make call handler; the RPC is not implemented on this server",
  148. metadata: ["uri": "\(requestHead.uri)"]
  149. )
  150. let status = GRPCError.RPCNotImplemented(rpc: requestHead.uri).makeGRPCStatus()
  151. let responseHead = self.makeResponseHead(requestHead: requestHead, status: status)
  152. // Write back a 'trailers-only' response.
  153. context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
  154. context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
  155. return
  156. }
  157. self.logger.debug("received request head, configuring pipeline")
  158. // Buffer the request head; we'll replay it in the next handler when we're removed from the
  159. // pipeline.
  160. self.state = .configuring([requestPart])
  161. // Configure the rest of the pipeline to serve the RPC.
  162. let httpToGRPC = HTTP1ToGRPCServerCodec(encoding: self.encoding, logger: self.logger)
  163. let codec = callHandler._codec
  164. context.pipeline.addHandlers([httpToGRPC, codec, callHandler], position: .after(self))
  165. .whenSuccess {
  166. context.pipeline.removeHandler(self, promise: nil)
  167. }
  168. case .body, .end:
  169. switch self.state {
  170. case .notConfigured:
  171. // We can reach this point if we're receiving messages for a method that isn't implemented,
  172. // in which case we just drop the messages; our response should already be in-flight.
  173. ()
  174. case var .configuring(buffer):
  175. // We received a message while the pipeline was being configured; hold on to it while we
  176. // finish configuring the pipeline.
  177. buffer.append(requestPart)
  178. self.state = .configuring(buffer)
  179. }
  180. }
  181. }
  182. public func handlerRemoved(context: ChannelHandlerContext) {
  183. switch self.state {
  184. case .notConfigured:
  185. ()
  186. case let .configuring(messages):
  187. for message in messages {
  188. context.fireChannelRead(self.wrapInboundOut(message))
  189. }
  190. }
  191. }
  192. /// A call URI split into components.
  193. struct CallPath {
  194. /// The name of the service to call.
  195. var service: String.UTF8View.SubSequence
  196. /// The name of the method to call.
  197. var method: String.UTF8View.SubSequence
  198. /// Charater used to split the path into components.
  199. private let pathSplitDelimiter = UInt8(ascii: "/")
  200. /// Split a path into service and method.
  201. /// Split is done in UTF8 as this turns out to be approximately 10x faster than a simple split.
  202. /// URI format: "/package.Servicename/MethodName"
  203. init?(requestURI: String) {
  204. var utf8View = requestURI.utf8[...]
  205. // Check and remove the split character at the beginning.
  206. guard let prefix = utf8View.trimPrefix(to: self.pathSplitDelimiter), prefix.isEmpty else {
  207. return nil
  208. }
  209. guard let service = utf8View.trimPrefix(to: pathSplitDelimiter) else {
  210. return nil
  211. }
  212. guard let method = utf8View.trimPrefix(to: pathSplitDelimiter) else {
  213. return nil
  214. }
  215. self.service = service
  216. self.method = method
  217. }
  218. }
  219. private func makeCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
  220. // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
  221. // - uriComponents[0]: empty
  222. // - uriComponents[1]: service name (including the package name);
  223. // `CallHandlerProvider`s should provide the service name including the package name.
  224. // - uriComponents[2]: method name.
  225. self.logger.debug("making call handler", metadata: ["path": "\(requestHead.uri)"])
  226. let uriComponents = CallPath(requestURI: requestHead.uri)
  227. let context = CallHandlerContext(
  228. errorDelegate: self.errorDelegate,
  229. logger: self.logger,
  230. encoding: self.encoding,
  231. eventLoop: channel.eventLoop,
  232. path: requestHead.uri
  233. )
  234. guard let callPath = uriComponents,
  235. let providerForServiceName = servicesByName[String.SubSequence(callPath.service)],
  236. let callHandler = providerForServiceName.handleMethod(
  237. String.SubSequence(callPath.method),
  238. callHandlerContext: context
  239. ) else {
  240. self.logger.notice("could not create handler", metadata: ["path": "\(requestHead.uri)"])
  241. return nil
  242. }
  243. return callHandler
  244. }
  245. private func makeResponseHead(requestHead: HTTPRequestHead,
  246. status: GRPCStatus) -> HTTPResponseHead {
  247. var headers: HTTPHeaders = [
  248. GRPCHeaderName.contentType: ContentType.protobuf.canonicalValue,
  249. GRPCHeaderName.statusCode: "\(status.code.rawValue)",
  250. ]
  251. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  252. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  253. }
  254. return HTTPResponseHead(version: requestHead.version, status: .ok, headers: headers)
  255. }
  256. }
  257. extension Collection where Self == Self.SubSequence, Self.Element: Equatable {
  258. /// Trims out the prefix up to `separator`, and returns it.
  259. /// Sets self to the subsequence after the separator, and returns the subsequence before the separator.
  260. /// If self is emtpy returns `nil`
  261. /// - parameters:
  262. /// - separator : The Element between the head which is returned and the rest which is left in self.
  263. /// - returns: SubSequence containing everything between the beginnning and the first occurance of
  264. /// `separator`. If `separator` is not found this will be the entire Collection. If the collection is empty
  265. /// returns `nil`
  266. mutating func trimPrefix(to separator: Element) -> SubSequence? {
  267. guard !self.isEmpty else {
  268. return nil
  269. }
  270. if let separatorIndex = self.firstIndex(of: separator) {
  271. let indexAfterSeparator = self.index(after: separatorIndex)
  272. defer { self = self[indexAfterSeparator...] }
  273. return self[..<separatorIndex]
  274. } else {
  275. defer { self = self[self.endIndex...] }
  276. return self[...]
  277. }
  278. }
  279. }