Server.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. import NIOSSL
  22. import NIOTransportServices
  23. /// Wrapper object to manage the lifecycle of a gRPC server.
  24. ///
  25. /// The pipeline is configured in three stages detailed below. Note: handlers marked with
  26. /// a '*' are responsible for handling errors.
  27. ///
  28. /// 1. Initial stage, prior to HTTP protocol detection.
  29. ///
  30. /// ┌───────────────────────────┐
  31. /// │ HTTPProtocolSwitcher* │
  32. /// └─▲───────────────────────┬─┘
  33. /// ByteBuffer│ │ByteBuffer
  34. /// ┌─┴───────────────────────▼─┐
  35. /// │ NIOSSLHandler │
  36. /// └─▲───────────────────────┬─┘
  37. /// ByteBuffer│ │ByteBuffer
  38. /// │ ▼
  39. ///
  40. /// The `NIOSSLHandler` is optional and depends on how the framework user has configured
  41. /// their server. The `HTTPProtocolSwitcher` detects which HTTP version is being used and
  42. /// configures the pipeline accordingly.
  43. ///
  44. /// 2. HTTP version detected. "HTTP Handlers" depends on the HTTP version determined by
  45. /// `HTTPProtocolSwitcher`. All of these handlers are provided by NIO except for the
  46. /// `WebCORSHandler` which is used for HTTP/1.
  47. ///
  48. /// ┌─────────────────────────────────┐
  49. /// │ GRPCServerRequestRoutingHandler │
  50. /// └─▲─────────────────────────────┬─┘
  51. /// HTTPServerRequestPart│ │HTTPServerResponsePart
  52. /// ┌─┴─────────────────────────────▼─┐
  53. /// │ HTTP Handlers │
  54. /// └─▲─────────────────────────────┬─┘
  55. /// ByteBuffer│ │ByteBuffer
  56. /// ┌─┴─────────────────────────────▼─┐
  57. /// │ NIOSSLHandler │
  58. /// └─▲─────────────────────────────┬─┘
  59. /// ByteBuffer│ │ByteBuffer
  60. /// │ ▼
  61. ///
  62. /// The `GRPCServerRequestRoutingHandler` resolves the request head and configures the rest of
  63. /// the pipeline based on the RPC call being made.
  64. ///
  65. /// 3. The call has been resolved and is a function that this server can handle. Responses are
  66. /// written into `BaseCallHandler` by a user-implemented `CallHandlerProvider`.
  67. ///
  68. /// ┌─────────────────────────────────┐
  69. /// │ BaseCallHandler* │
  70. /// └─▲─────────────────────────────┬─┘
  71. /// GRPCServerRequestPart<T1>│ │GRPCServerResponsePart<T2>
  72. /// ┌─┴─────────────────────────────▼─┐
  73. /// │ HTTP1ToGRPCServerCodec │
  74. /// └─▲─────────────────────────────┬─┘
  75. /// HTTPServerRequestPart│ │HTTPServerResponsePart
  76. /// ┌─┴─────────────────────────────▼─┐
  77. /// │ HTTP Handlers │
  78. /// └─▲─────────────────────────────┬─┘
  79. /// ByteBuffer│ │ByteBuffer
  80. /// ┌─┴─────────────────────────────▼─┐
  81. /// │ NIOSSLHandler │
  82. /// └─▲─────────────────────────────┬─┘
  83. /// ByteBuffer│ │ByteBuffer
  84. /// │ ▼
  85. ///
  86. public final class Server {
  87. /// Makes and configures a `ServerBootstrap` using the provided configuration.
  88. public class func makeBootstrap(configuration: Configuration) -> ServerBootstrapProtocol {
  89. let bootstrap = PlatformSupport.makeServerBootstrap(group: configuration.eventLoopGroup)
  90. // Backlog is only available on `ServerBootstrap`.
  91. if bootstrap is ServerBootstrap {
  92. // Specify a backlog to avoid overloading the server.
  93. _ = bootstrap.serverChannelOption(ChannelOptions.backlog, value: 256)
  94. }
  95. return bootstrap
  96. // Enable `SO_REUSEADDR` to avoid "address already in use" error.
  97. .serverChannelOption(
  98. ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
  99. value: 1
  100. )
  101. // Set the handlers that are applied to the accepted Channels
  102. .childChannelInitializer { channel in
  103. var logger = configuration.logger
  104. logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
  105. logger[metadataKey: MetadataKey.remoteAddress] = channel.remoteAddress
  106. .map { "\($0)" } ?? "n/a"
  107. let protocolSwitcher = HTTPProtocolSwitcher(
  108. errorDelegate: configuration.errorDelegate,
  109. httpTargetWindowSize: configuration.httpTargetWindowSize,
  110. keepAlive: configuration.connectionKeepalive,
  111. idleTimeout: configuration.connectionIdleTimeout,
  112. scheme: configuration.tls == nil ? "http" : "https",
  113. logger: logger
  114. ) { (channel, logger) -> EventLoopFuture<Void> in
  115. let handler = HTTP2ToRawGRPCServerCodec(
  116. servicesByName: configuration.serviceProvidersByName,
  117. encoding: configuration.messageEncoding,
  118. errorDelegate: configuration.errorDelegate,
  119. normalizeHeaders: true,
  120. logger: logger
  121. )
  122. return channel.pipeline.addHandler(handler)
  123. }
  124. var configured: EventLoopFuture<Void>
  125. if let tls = configuration.tls {
  126. configured = channel.configureTLS(configuration: tls).flatMap {
  127. channel.pipeline.addHandler(protocolSwitcher)
  128. }
  129. } else {
  130. configured = channel.pipeline.addHandler(protocolSwitcher)
  131. }
  132. // Work around the zero length write issue, if needed.
  133. let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
  134. group: configuration.eventLoopGroup,
  135. hasTLS: configuration.tls != nil
  136. )
  137. if requiresZeroLengthWorkaround,
  138. #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  139. configured = configured.flatMap {
  140. channel.pipeline.addHandler(NIOFilterEmptyWritesHandler())
  141. }
  142. }
  143. // Add the debug initializer, if there is one.
  144. if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
  145. return configured.flatMap {
  146. debugAcceptedChannelInitializer(channel)
  147. }
  148. } else {
  149. return configured
  150. }
  151. }
  152. // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
  153. .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  154. .childChannelOption(
  155. ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
  156. value: 1
  157. )
  158. }
  159. /// Starts a server with the given configuration. See `Server.Configuration` for the options
  160. /// available to configure the server.
  161. public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
  162. return self.makeBootstrap(configuration: configuration)
  163. .bind(to: configuration.target)
  164. .map { channel in
  165. Server(channel: channel, errorDelegate: configuration.errorDelegate)
  166. }
  167. }
  168. public let channel: Channel
  169. private var errorDelegate: ServerErrorDelegate?
  170. private init(channel: Channel, errorDelegate: ServerErrorDelegate?) {
  171. self.channel = channel
  172. // Maintain a strong reference to ensure it lives as long as the server.
  173. self.errorDelegate = errorDelegate
  174. // If we have an error delegate, add a server channel error handler as well. We don't need to wait for the handler to
  175. // be added.
  176. if let errorDelegate = errorDelegate {
  177. _ = channel.pipeline.addHandler(ServerChannelErrorHandler(errorDelegate: errorDelegate))
  178. }
  179. // nil out errorDelegate to avoid retain cycles.
  180. self.onClose.whenComplete { _ in
  181. self.errorDelegate = nil
  182. }
  183. }
  184. /// Fired when the server shuts down.
  185. public var onClose: EventLoopFuture<Void> {
  186. return self.channel.closeFuture
  187. }
  188. /// Shut down the server; this should be called to avoid leaking resources.
  189. public func close() -> EventLoopFuture<Void> {
  190. return self.channel.close(mode: .all)
  191. }
  192. }
  193. public typealias BindTarget = ConnectionTarget
  194. extension Server {
  195. /// The configuration for a server.
  196. public struct Configuration {
  197. /// The target to bind to.
  198. public var target: BindTarget
  199. /// The event loop group to run the connection on.
  200. public var eventLoopGroup: EventLoopGroup
  201. /// Providers the server should use to handle gRPC requests.
  202. public var serviceProviders: [CallHandlerProvider] {
  203. get {
  204. return Array(self.serviceProvidersByName.values)
  205. }
  206. set {
  207. self
  208. .serviceProvidersByName = Dictionary(
  209. uniqueKeysWithValues: newValue
  210. .map { ($0.serviceName, $0) }
  211. )
  212. }
  213. }
  214. /// An error delegate which is called when errors are caught. Provided delegates **must not
  215. /// maintain a strong reference to this `Server`**. Doing so will cause a retain cycle.
  216. public var errorDelegate: ServerErrorDelegate?
  217. /// TLS configuration for this connection. `nil` if TLS is not desired.
  218. public var tls: TLS?
  219. /// The connection keepalive configuration.
  220. public var connectionKeepalive: ServerConnectionKeepalive
  221. /// The amount of time to wait before closing connections. The idle timeout will start only
  222. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  223. public var connectionIdleTimeout: TimeAmount
  224. /// The compression configuration for requests and responses.
  225. ///
  226. /// If compression is enabled for the server it may be disabled for responses on any RPC by
  227. /// setting `compressionEnabled` to `false` on the context of the call.
  228. ///
  229. /// Compression may also be disabled at the message-level for streaming responses (i.e. server
  230. /// streaming and bidirectional streaming RPCs) by passing setting `compression` to `.disabled`
  231. /// in `sendResponse(_:compression)`.
  232. public var messageEncoding: ServerMessageEncoding
  233. /// The HTTP/2 flow control target window size.
  234. public var httpTargetWindowSize: Int
  235. /// The root server logger. Accepted connections will branch from this logger and RPCs on
  236. /// each connection will use a logger branched from the connections logger. This logger is made
  237. /// available to service providers via `context`. Defaults to a no-op logger.
  238. public var logger: Logger
  239. /// A channel initializer which will be run after gRPC has initialized each accepted channel.
  240. /// This may be used to add additional handlers to the pipeline and is intended for debugging.
  241. /// This is analogous to `NIO.ServerBootstrap.childChannelInitializer`.
  242. ///
  243. /// - Warning: The initializer closure may be invoked *multiple times*. More precisely: it will
  244. /// be invoked at most once per accepted connection.
  245. public var debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
  246. /// A calculated private cache of the service providers by name.
  247. ///
  248. /// This is how gRPC consumes the service providers internally. Caching this as stored data avoids
  249. /// the need to recalculate this dictionary each time we receive an rpc.
  250. fileprivate private(set) var serviceProvidersByName: [Substring: CallHandlerProvider]
  251. /// Create a `Configuration` with some pre-defined defaults.
  252. ///
  253. /// - Parameters:
  254. /// - target: The target to bind to.
  255. /// - eventLoopGroup: The event loop group to run the server on.
  256. /// - serviceProviders: An array of `CallHandlerProvider`s which the server should use
  257. /// to handle requests.
  258. /// - errorDelegate: The error delegate, defaulting to a logging delegate.
  259. /// - tls: TLS configuration, defaulting to `nil`.
  260. /// - connectionKeepalive: The keepalive configuration to use.
  261. /// - connectionIdleTimeout: The amount of time to wait before closing the connection, this is
  262. /// indefinite by default.
  263. /// - messageEncoding: Message compression configuration, defaulting to no compression.
  264. /// - httpTargetWindowSize: The HTTP/2 flow control target window size.
  265. /// - logger: A logger. Defaults to a no-op logger.
  266. /// - debugChannelInitializer: A channel initializer which will be called for each connection
  267. /// the server accepts after gRPC has initialized the channel. Defaults to `nil`.
  268. public init(
  269. target: BindTarget,
  270. eventLoopGroup: EventLoopGroup,
  271. serviceProviders: [CallHandlerProvider],
  272. errorDelegate: ServerErrorDelegate? = nil,
  273. tls: TLS? = nil,
  274. connectionKeepalive: ServerConnectionKeepalive = ServerConnectionKeepalive(),
  275. connectionIdleTimeout: TimeAmount = .nanoseconds(.max),
  276. messageEncoding: ServerMessageEncoding = .disabled,
  277. httpTargetWindowSize: Int = 65535,
  278. logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
  279. debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)? = nil
  280. ) {
  281. self.target = target
  282. self.eventLoopGroup = eventLoopGroup
  283. self
  284. .serviceProvidersByName = Dictionary(
  285. uniqueKeysWithValues: serviceProviders
  286. .map { ($0.serviceName, $0) }
  287. )
  288. self.errorDelegate = errorDelegate
  289. self.tls = tls
  290. self.connectionKeepalive = connectionKeepalive
  291. self.connectionIdleTimeout = connectionIdleTimeout
  292. self.messageEncoding = messageEncoding
  293. self.httpTargetWindowSize = httpTargetWindowSize
  294. self.logger = logger
  295. self.debugChannelInitializer = debugChannelInitializer
  296. }
  297. }
  298. }
  299. private extension Channel {
  300. /// Configure an SSL handler on the channel.
  301. ///
  302. /// - Parameters:
  303. /// - configuration: The configuration to use when creating the handler.
  304. /// - Returns: A future which will be succeeded when the pipeline has been configured.
  305. func configureTLS(configuration: Server.Configuration.TLS) -> EventLoopFuture<Void> {
  306. do {
  307. let context = try NIOSSLContext(configuration: configuration.configuration)
  308. return self.pipeline.addHandler(NIOSSLServerHandler(context: context))
  309. } catch {
  310. return self.pipeline.eventLoop.makeFailedFuture(error)
  311. }
  312. }
  313. }
  314. private extension ServerBootstrapProtocol {
  315. func bind(to target: BindTarget) -> EventLoopFuture<Channel> {
  316. switch target.wrapped {
  317. case let .hostAndPort(host, port):
  318. return self.bind(host: host, port: port)
  319. case let .unixDomainSocket(path):
  320. return self.bind(unixDomainSocketPath: path)
  321. case let .socketAddress(address):
  322. return self.bind(to: address)
  323. }
  324. }
  325. }