Server.swift 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOExtras
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import NIOSSL
  23. import NIOTransportServices
  24. /// Wrapper object to manage the lifecycle of a gRPC server.
  25. ///
  26. /// The pipeline is configured in three stages detailed below. Note: handlers marked with
  27. /// a '*' are responsible for handling errors.
  28. ///
  29. /// 1. Initial stage, prior to pipeline configuration.
  30. ///
  31. /// ┌─────────────────────────────────┐
  32. /// │ GRPCServerPipelineConfigurator* │
  33. /// └────▲───────────────────────┬────┘
  34. /// ByteBuffer│ │ByteBuffer
  35. /// ┌─┴───────────────────────▼─┐
  36. /// │ NIOSSLHandler │
  37. /// └─▲───────────────────────┬─┘
  38. /// ByteBuffer│ │ByteBuffer
  39. /// │ ▼
  40. ///
  41. /// The `NIOSSLHandler` is optional and depends on how the framework user has configured
  42. /// their server. The `GRPCServerPipelineConfigurator` detects which HTTP version is being used
  43. /// (via ALPN if TLS is used or by parsing the first bytes on the connection otherwise) and
  44. /// configures the pipeline accordingly.
  45. ///
  46. /// 2. HTTP version detected. "HTTP Handlers" depends on the HTTP version determined by
  47. /// `GRPCServerPipelineConfigurator`. In the case of HTTP/2:
  48. ///
  49. /// ┌─────────────────────────────────┐
  50. /// │ HTTP2StreamMultiplexer │
  51. /// └─▲─────────────────────────────┬─┘
  52. /// HTTP2Frame│ │HTTP2Frame
  53. /// ┌─┴─────────────────────────────▼─┐
  54. /// │ HTTP2Handler │
  55. /// └─▲─────────────────────────────┬─┘
  56. /// ByteBuffer│ │ByteBuffer
  57. /// ┌─┴─────────────────────────────▼─┐
  58. /// │ NIOSSLHandler │
  59. /// └─▲─────────────────────────────┬─┘
  60. /// ByteBuffer│ │ByteBuffer
  61. /// │ ▼
  62. ///
  63. /// The `HTTP2StreamMultiplexer` provides one `Channel` for each HTTP/2 stream (and thus each
  64. /// RPC).
  65. ///
  66. /// 3. The frames for each stream channel are routed by the `HTTP2ToRawGRPCServerCodec` handler to
  67. /// a handler containing the user-implemented logic provided by a `CallHandlerProvider`:
  68. ///
  69. /// ┌─────────────────────────────────┐
  70. /// │ BaseCallHandler* │
  71. /// └─▲─────────────────────────────┬─┘
  72. /// GRPCServerRequestPart│ │GRPCServerResponsePart
  73. /// ┌─┴─────────────────────────────▼─┐
  74. /// │ HTTP2ToRawGRPCServerCodec │
  75. /// └─▲─────────────────────────────┬─┘
  76. /// HTTP2Frame.FramePayload│ │HTTP2Frame.FramePayload
  77. /// │ ▼
  78. ///
  79. public final class Server {
  80. /// Makes and configures a `ServerBootstrap` using the provided configuration.
  81. public class func makeBootstrap(configuration: Configuration) -> ServerBootstrapProtocol {
  82. let bootstrap = PlatformSupport.makeServerBootstrap(group: configuration.eventLoopGroup)
  83. // Backlog is only available on `ServerBootstrap`.
  84. if bootstrap is ServerBootstrap {
  85. // Specify a backlog to avoid overloading the server.
  86. _ = bootstrap.serverChannelOption(ChannelOptions.backlog, value: 256)
  87. }
  88. return bootstrap
  89. // Enable `SO_REUSEADDR` to avoid "address already in use" error.
  90. .serverChannelOption(
  91. ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
  92. value: 1
  93. )
  94. // Set the handlers that are applied to the accepted Channels
  95. .childChannelInitializer { channel in
  96. var configuration = configuration
  97. configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
  98. configuration.logger[metadataKey: MetadataKey.remoteAddress] = channel.remoteAddress
  99. .map { "\($0)" } ?? "n/a"
  100. var configured: EventLoopFuture<Void>
  101. let configurator = GRPCServerPipelineConfigurator(configuration: configuration)
  102. if let tls = configuration.tls {
  103. configured = channel.configureTLS(configuration: tls).flatMap {
  104. channel.pipeline.addHandler(configurator)
  105. }
  106. } else {
  107. configured = channel.pipeline.addHandler(configurator)
  108. }
  109. // Work around the zero length write issue, if needed.
  110. let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
  111. group: configuration.eventLoopGroup,
  112. hasTLS: configuration.tls != nil
  113. )
  114. if requiresZeroLengthWorkaround,
  115. #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  116. configured = configured.flatMap {
  117. channel.pipeline.addHandler(NIOFilterEmptyWritesHandler())
  118. }
  119. }
  120. // Add the debug initializer, if there is one.
  121. if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
  122. return configured.flatMap {
  123. debugAcceptedChannelInitializer(channel)
  124. }
  125. } else {
  126. return configured
  127. }
  128. }
  129. // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
  130. .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  131. .childChannelOption(
  132. ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
  133. value: 1
  134. )
  135. }
  136. /// Starts a server with the given configuration. See `Server.Configuration` for the options
  137. /// available to configure the server.
  138. public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
  139. let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
  140. return self.makeBootstrap(configuration: configuration)
  141. .serverChannelInitializer { channel in
  142. channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
  143. }
  144. .bind(to: configuration.target)
  145. .map { channel in
  146. Server(
  147. channel: channel,
  148. quiescingHelper: quiescingHelper,
  149. errorDelegate: configuration.errorDelegate
  150. )
  151. }
  152. }
  153. public let channel: Channel
  154. private let quiescingHelper: ServerQuiescingHelper
  155. private var errorDelegate: ServerErrorDelegate?
  156. private init(
  157. channel: Channel,
  158. quiescingHelper: ServerQuiescingHelper,
  159. errorDelegate: ServerErrorDelegate?
  160. ) {
  161. self.channel = channel
  162. self.quiescingHelper = quiescingHelper
  163. // Maintain a strong reference to ensure it lives as long as the server.
  164. self.errorDelegate = errorDelegate
  165. // If we have an error delegate, add a server channel error handler as well. We don't need to wait for the handler to
  166. // be added.
  167. if let errorDelegate = errorDelegate {
  168. _ = channel.pipeline.addHandler(ServerChannelErrorHandler(errorDelegate: errorDelegate))
  169. }
  170. // nil out errorDelegate to avoid retain cycles.
  171. self.onClose.whenComplete { _ in
  172. self.errorDelegate = nil
  173. }
  174. }
  175. /// Fired when the server shuts down.
  176. public var onClose: EventLoopFuture<Void> {
  177. return self.channel.closeFuture
  178. }
  179. /// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
  180. /// connections will be rejected.
  181. public func initiateGracefulShutdown(promise: EventLoopPromise<Void>?) {
  182. self.quiescingHelper.initiateShutdown(promise: promise)
  183. }
  184. /// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
  185. /// connections will be rejected.
  186. public func initiateGracefulShutdown() -> EventLoopFuture<Void> {
  187. let promise = self.channel.eventLoop.makePromise(of: Void.self)
  188. self.initiateGracefulShutdown(promise: promise)
  189. return promise.futureResult
  190. }
  191. /// Shutdown the server immediately. Active RPCs and connections will be terminated.
  192. public func close(promise: EventLoopPromise<Void>?) {
  193. self.channel.close(mode: .all, promise: promise)
  194. }
  195. /// Shutdown the server immediately. Active RPCs and connections will be terminated.
  196. public func close() -> EventLoopFuture<Void> {
  197. return self.channel.close(mode: .all)
  198. }
  199. }
  200. public typealias BindTarget = ConnectionTarget
  201. extension Server {
  202. /// The configuration for a server.
  203. public struct Configuration {
  204. /// The target to bind to.
  205. public var target: BindTarget
  206. /// The event loop group to run the connection on.
  207. public var eventLoopGroup: EventLoopGroup
  208. /// Providers the server should use to handle gRPC requests.
  209. public var serviceProviders: [CallHandlerProvider] {
  210. get {
  211. return Array(self.serviceProvidersByName.values)
  212. }
  213. set {
  214. self
  215. .serviceProvidersByName = Dictionary(
  216. uniqueKeysWithValues: newValue
  217. .map { ($0.serviceName, $0) }
  218. )
  219. }
  220. }
  221. /// An error delegate which is called when errors are caught. Provided delegates **must not
  222. /// maintain a strong reference to this `Server`**. Doing so will cause a retain cycle.
  223. public var errorDelegate: ServerErrorDelegate?
  224. /// TLS configuration for this connection. `nil` if TLS is not desired.
  225. public var tls: TLS?
  226. /// The connection keepalive configuration.
  227. public var connectionKeepalive: ServerConnectionKeepalive
  228. /// The amount of time to wait before closing connections. The idle timeout will start only
  229. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  230. public var connectionIdleTimeout: TimeAmount
  231. /// The compression configuration for requests and responses.
  232. ///
  233. /// If compression is enabled for the server it may be disabled for responses on any RPC by
  234. /// setting `compressionEnabled` to `false` on the context of the call.
  235. ///
  236. /// Compression may also be disabled at the message-level for streaming responses (i.e. server
  237. /// streaming and bidirectional streaming RPCs) by passing setting `compression` to `.disabled`
  238. /// in `sendResponse(_:compression)`.
  239. public var messageEncoding: ServerMessageEncoding
  240. /// The HTTP/2 flow control target window size.
  241. public var httpTargetWindowSize: Int
  242. /// The root server logger. Accepted connections will branch from this logger and RPCs on
  243. /// each connection will use a logger branched from the connections logger. This logger is made
  244. /// available to service providers via `context`. Defaults to a no-op logger.
  245. public var logger: Logger
  246. /// A channel initializer which will be run after gRPC has initialized each accepted channel.
  247. /// This may be used to add additional handlers to the pipeline and is intended for debugging.
  248. /// This is analogous to `NIO.ServerBootstrap.childChannelInitializer`.
  249. ///
  250. /// - Warning: The initializer closure may be invoked *multiple times*. More precisely: it will
  251. /// be invoked at most once per accepted connection.
  252. public var debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
  253. /// A calculated private cache of the service providers by name.
  254. ///
  255. /// This is how gRPC consumes the service providers internally. Caching this as stored data avoids
  256. /// the need to recalculate this dictionary each time we receive an rpc.
  257. internal var serviceProvidersByName: [Substring: CallHandlerProvider]
  258. /// Create a `Configuration` with some pre-defined defaults.
  259. ///
  260. /// - Parameters:
  261. /// - target: The target to bind to.
  262. /// - eventLoopGroup: The event loop group to run the server on.
  263. /// - serviceProviders: An array of `CallHandlerProvider`s which the server should use
  264. /// to handle requests.
  265. /// - errorDelegate: The error delegate, defaulting to a logging delegate.
  266. /// - tls: TLS configuration, defaulting to `nil`.
  267. /// - connectionKeepalive: The keepalive configuration to use.
  268. /// - connectionIdleTimeout: The amount of time to wait before closing the connection, this is
  269. /// indefinite by default.
  270. /// - messageEncoding: Message compression configuration, defaulting to no compression.
  271. /// - httpTargetWindowSize: The HTTP/2 flow control target window size.
  272. /// - logger: A logger. Defaults to a no-op logger.
  273. /// - debugChannelInitializer: A channel initializer which will be called for each connection
  274. /// the server accepts after gRPC has initialized the channel. Defaults to `nil`.
  275. public init(
  276. target: BindTarget,
  277. eventLoopGroup: EventLoopGroup,
  278. serviceProviders: [CallHandlerProvider],
  279. errorDelegate: ServerErrorDelegate? = nil,
  280. tls: TLS? = nil,
  281. connectionKeepalive: ServerConnectionKeepalive = ServerConnectionKeepalive(),
  282. connectionIdleTimeout: TimeAmount = .nanoseconds(.max),
  283. messageEncoding: ServerMessageEncoding = .disabled,
  284. httpTargetWindowSize: Int = 65535,
  285. logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
  286. debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)? = nil
  287. ) {
  288. self.target = target
  289. self.eventLoopGroup = eventLoopGroup
  290. self
  291. .serviceProvidersByName = Dictionary(
  292. uniqueKeysWithValues: serviceProviders
  293. .map { ($0.serviceName, $0) }
  294. )
  295. self.errorDelegate = errorDelegate
  296. self.tls = tls
  297. self.connectionKeepalive = connectionKeepalive
  298. self.connectionIdleTimeout = connectionIdleTimeout
  299. self.messageEncoding = messageEncoding
  300. self.httpTargetWindowSize = httpTargetWindowSize
  301. self.logger = logger
  302. self.debugChannelInitializer = debugChannelInitializer
  303. }
  304. }
  305. }
  306. private extension Channel {
  307. /// Configure an SSL handler on the channel.
  308. ///
  309. /// - Parameters:
  310. /// - configuration: The configuration to use when creating the handler.
  311. /// - Returns: A future which will be succeeded when the pipeline has been configured.
  312. func configureTLS(configuration: Server.Configuration.TLS) -> EventLoopFuture<Void> {
  313. do {
  314. let context = try NIOSSLContext(configuration: configuration.configuration)
  315. return self.pipeline.addHandler(NIOSSLServerHandler(context: context))
  316. } catch {
  317. return self.pipeline.eventLoop.makeFailedFuture(error)
  318. }
  319. }
  320. }
  321. private extension ServerBootstrapProtocol {
  322. func bind(to target: BindTarget) -> EventLoopFuture<Channel> {
  323. switch target.wrapped {
  324. case let .hostAndPort(host, port):
  325. return self.bind(host: host, port: port)
  326. case let .unixDomainSocket(path):
  327. return self.bind(unixDomainSocketPath: path)
  328. case let .socketAddress(address):
  329. return self.bind(to: address)
  330. }
  331. }
  332. }