GRPCServerRequestRoutingHandler.swift 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftProtobuf
  18. import NIO
  19. import NIOHTTP1
  20. import Logging
  21. /// Processes individual gRPC messages and stream-close events on an HTTP2 channel.
  22. public protocol GRPCCallHandler: ChannelHandler {
  23. func makeGRPCServerCodec() -> ChannelHandler
  24. }
  25. /// Provides `GRPCCallHandler` objects for the methods on a particular service name.
  26. ///
  27. /// Implemented by the generated code.
  28. public protocol CallHandlerProvider: class {
  29. /// The name of the service this object is providing methods for, including the package path.
  30. ///
  31. /// - Example: "io.grpc.Echo.EchoService"
  32. var serviceName: String { get }
  33. /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's
  34. /// method. Returns nil for methods not handled by this service.
  35. func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler?
  36. }
  37. // This is public because it will be passed into generated code, all members are `internal` because
  38. // the context will get passed from generated code back into gRPC library code and all members should
  39. // be considered an implementation detail to the user.
  40. public struct CallHandlerContext {
  41. internal var errorDelegate: ServerErrorDelegate?
  42. internal var logger: Logger
  43. }
  44. /// Attempts to route a request to a user-provided call handler. Also validates that the request has
  45. /// a suitable 'content-type' for gRPC.
  46. ///
  47. /// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
  48. /// for a `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
  49. ///
  50. /// After the pipeline has been configured with the `GRPCCallHandler`, this handler removes itself
  51. /// from the pipeline.
  52. public final class GRPCServerRequestRoutingHandler {
  53. private let logger: Logger
  54. private let servicesByName: [String: CallHandlerProvider]
  55. private weak var errorDelegate: ServerErrorDelegate?
  56. private enum State: Equatable {
  57. case notConfigured
  58. case configuring([InboundOut])
  59. }
  60. private var state: State = .notConfigured
  61. public init(servicesByName: [String: CallHandlerProvider], errorDelegate: ServerErrorDelegate?, logger: Logger) {
  62. self.servicesByName = servicesByName
  63. self.errorDelegate = errorDelegate
  64. self.logger = logger
  65. }
  66. }
  67. extension GRPCServerRequestRoutingHandler: ChannelInboundHandler, RemovableChannelHandler {
  68. public typealias InboundIn = HTTPServerRequestPart
  69. public typealias InboundOut = HTTPServerRequestPart
  70. public typealias OutboundOut = HTTPServerResponsePart
  71. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  72. let status: GRPCStatus
  73. if let errorWithContext = error as? GRPCError.WithContext {
  74. self.errorDelegate?.observeLibraryError(errorWithContext.error)
  75. status = errorWithContext.error.makeGRPCStatus()
  76. } else {
  77. self.errorDelegate?.observeLibraryError(error)
  78. status = (error as? GRPCStatusTransformable)?.makeGRPCStatus() ?? .processingError
  79. }
  80. switch self.state {
  81. case .notConfigured:
  82. // We don't know what protocol we're speaking at this point. We'll just have to close the
  83. // channel.
  84. ()
  85. case .configuring(let messages):
  86. // first! is fine here: we only go from `.notConfigured` to `.configuring` when we receive
  87. // and validate the request head.
  88. let head = messages.compactMap { part -> HTTPRequestHead? in
  89. switch part {
  90. case .head(let head):
  91. return head
  92. default:
  93. return nil
  94. }
  95. }.first!
  96. let responseHead = self.makeResponseHead(requestHead: head, status: status)
  97. context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
  98. context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
  99. context.flush()
  100. }
  101. context.close(mode: .all, promise: nil)
  102. }
  103. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  104. let requestPart = self.unwrapInboundIn(data)
  105. switch self.unwrapInboundIn(data) {
  106. case .head(let requestHead):
  107. precondition(.notConfigured == self.state)
  108. // Validate the 'content-type' is related to gRPC before proceeding.
  109. let maybeContentType = requestHead.headers.first(name: GRPCHeaderName.contentType)
  110. guard let contentType = maybeContentType, contentType.starts(with: ContentType.commonPrefix) else {
  111. self.logger.warning(
  112. "received request whose 'content-type' does not exist or start with '\(ContentType.commonPrefix)'",
  113. metadata: ["content-type": "\(String(describing: maybeContentType))"]
  114. )
  115. // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  116. //
  117. // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
  118. // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
  119. // clients from interpreting a gRPC error response, which uses status 200 (OK), as
  120. // successful.
  121. let responseHead = HTTPResponseHead(
  122. version: requestHead.version,
  123. status: .unsupportedMediaType
  124. )
  125. // Fail the call. Note: we're not speaking gRPC here, so no status or message.
  126. context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
  127. context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
  128. return
  129. }
  130. // Do we know how to handle this RPC?
  131. guard let callHandler = self.makeCallHandler(channel: context.channel, requestHead: requestHead) else {
  132. self.logger.warning(
  133. "unable to make call handler; the RPC is not implemented on this server",
  134. metadata: ["uri": "\(requestHead.uri)"]
  135. )
  136. let status = GRPCError.RPCNotImplemented(rpc: requestHead.uri).makeGRPCStatus()
  137. let responseHead = self.makeResponseHead(requestHead: requestHead, status: status)
  138. // Write back a 'trailers-only' response.
  139. context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
  140. context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
  141. return
  142. }
  143. self.logger.debug("received request head, configuring pipeline")
  144. // Buffer the request head; we'll replay it in the next handler when we're removed from the
  145. // pipeline.
  146. self.state = .configuring([requestPart])
  147. // Configure the rest of the pipeline to serve the RPC.
  148. let codec = callHandler.makeGRPCServerCodec()
  149. context.pipeline.addHandlers([codec, callHandler], position: .after(self)).whenSuccess {
  150. context.pipeline.removeHandler(self, promise: nil)
  151. }
  152. case .body, .end:
  153. switch self.state {
  154. case .notConfigured:
  155. // We can reach this point if we're receiving messages for a method that isn't implemented,
  156. // in which case we just drop the messages; our response should already be in-flight.
  157. ()
  158. case .configuring(var buffer):
  159. // We received a message while the pipeline was being configured; hold on to it while we
  160. // finish configuring the pipeline.
  161. buffer.append(requestPart)
  162. self.state = .configuring(buffer)
  163. }
  164. }
  165. }
  166. public func handlerRemoved(context: ChannelHandlerContext) {
  167. switch self.state {
  168. case .notConfigured:
  169. ()
  170. case .configuring(let messages):
  171. for message in messages {
  172. context.fireChannelRead(self.wrapInboundOut(message))
  173. }
  174. }
  175. }
  176. private func makeCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
  177. // URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
  178. // - uriComponents[0]: empty
  179. // - uriComponents[1]: service name (including the package name);
  180. // `CallHandlerProvider`s should provide the service name including the package name.
  181. // - uriComponents[2]: method name.
  182. self.logger.debug("making call handler", metadata: ["path": "\(requestHead.uri)"])
  183. let uriComponents = requestHead.uri.components(separatedBy: "/")
  184. var logger = self.logger
  185. // Unset the channel handler: it shouldn't be used for downstream handlers.
  186. logger[metadataKey: MetadataKey.channelHandler] = nil
  187. let context = CallHandlerContext(errorDelegate: self.errorDelegate, logger: logger)
  188. guard uriComponents.count >= 3 && uriComponents[0].isEmpty,
  189. let providerForServiceName = servicesByName[uriComponents[1]],
  190. let callHandler = providerForServiceName.handleMethod(uriComponents[2], callHandlerContext: context) else {
  191. self.logger.notice("could not create handler", metadata: ["path": "\(requestHead.uri)"])
  192. return nil
  193. }
  194. return callHandler
  195. }
  196. private func makeResponseHead(requestHead: HTTPRequestHead, status: GRPCStatus) -> HTTPResponseHead {
  197. var headers: HTTPHeaders = [
  198. GRPCHeaderName.contentType: ContentType.protobuf.canonicalValue,
  199. GRPCHeaderName.statusCode: "\(status.code.rawValue)",
  200. ]
  201. if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
  202. headers.add(name: GRPCHeaderName.statusMessage, value: message)
  203. }
  204. return HTTPResponseHead(version: requestHead.version, status: .ok, headers: headers)
  205. }
  206. }