Server.swift 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOExtras
  20. import NIOHTTP1
  21. import NIOHTTP2
  22. import NIOSSL
  23. import NIOTransportServices
  24. #if canImport(Network)
  25. import Network
  26. #endif
  27. /// Wrapper object to manage the lifecycle of a gRPC server.
  28. ///
  29. /// The pipeline is configured in three stages detailed below. Note: handlers marked with
  30. /// a '*' are responsible for handling errors.
  31. ///
  32. /// 1. Initial stage, prior to pipeline configuration.
  33. ///
  34. /// ┌─────────────────────────────────┐
  35. /// │ GRPCServerPipelineConfigurator* │
  36. /// └────▲───────────────────────┬────┘
  37. /// ByteBuffer│ │ByteBuffer
  38. /// ┌─┴───────────────────────▼─┐
  39. /// │ NIOSSLHandler │
  40. /// └─▲───────────────────────┬─┘
  41. /// ByteBuffer│ │ByteBuffer
  42. /// │ ▼
  43. ///
  44. /// The `NIOSSLHandler` is optional and depends on how the framework user has configured
  45. /// their server. The `GRPCServerPipelineConfigurator` detects which HTTP version is being used
  46. /// (via ALPN if TLS is used or by parsing the first bytes on the connection otherwise) and
  47. /// configures the pipeline accordingly.
  48. ///
  49. /// 2. HTTP version detected. "HTTP Handlers" depends on the HTTP version determined by
  50. /// `GRPCServerPipelineConfigurator`. In the case of HTTP/2:
  51. ///
  52. /// ┌─────────────────────────────────┐
  53. /// │ HTTP2StreamMultiplexer │
  54. /// └─▲─────────────────────────────┬─┘
  55. /// HTTP2Frame│ │HTTP2Frame
  56. /// ┌─┴─────────────────────────────▼─┐
  57. /// │ HTTP2Handler │
  58. /// └─▲─────────────────────────────┬─┘
  59. /// ByteBuffer│ │ByteBuffer
  60. /// ┌─┴─────────────────────────────▼─┐
  61. /// │ NIOSSLHandler │
  62. /// └─▲─────────────────────────────┬─┘
  63. /// ByteBuffer│ │ByteBuffer
  64. /// │ ▼
  65. ///
  66. /// The `HTTP2StreamMultiplexer` provides one `Channel` for each HTTP/2 stream (and thus each
  67. /// RPC).
  68. ///
  69. /// 3. The frames for each stream channel are routed by the `HTTP2ToRawGRPCServerCodec` handler to
  70. /// a handler containing the user-implemented logic provided by a `CallHandlerProvider`:
  71. ///
  72. /// ┌─────────────────────────────────┐
  73. /// │ BaseCallHandler* │
  74. /// └─▲─────────────────────────────┬─┘
  75. /// GRPCServerRequestPart│ │GRPCServerResponsePart
  76. /// ┌─┴─────────────────────────────▼─┐
  77. /// │ HTTP2ToRawGRPCServerCodec │
  78. /// └─▲─────────────────────────────┬─┘
  79. /// HTTP2Frame.FramePayload│ │HTTP2Frame.FramePayload
  80. /// │ ▼
  81. ///
  82. public final class Server {
  83. /// Makes and configures a `ServerBootstrap` using the provided configuration.
  84. public class func makeBootstrap(configuration: Configuration) -> ServerBootstrapProtocol {
  85. let bootstrap = PlatformSupport.makeServerBootstrap(group: configuration.eventLoopGroup)
  86. // Backlog is only available on `ServerBootstrap`.
  87. if bootstrap is ServerBootstrap {
  88. // Specify a backlog to avoid overloading the server.
  89. _ = bootstrap.serverChannelOption(ChannelOptions.backlog, value: 256)
  90. }
  91. // Making a `NIOSSLContext` is expensive, we should only do it once per TLS configuration so
  92. // we'll do it now, before accepting connections. Unfortunately our API isn't throwing so we'll
  93. // only surface any error when initializing a child channel.
  94. //
  95. // 'nil' means we're not using TLS, or we're using the Network.framework TLS backend. If we're
  96. // using the Network.framework TLS backend we'll apply the settings just below.
  97. let sslContext: Result<NIOSSLContext, Error>?
  98. if let tlsConfiguration = configuration.tlsConfiguration {
  99. do {
  100. sslContext = try configuration.tlsConfiguration?.makeNIOSSLContext().map { .success($0) }
  101. } catch {
  102. sslContext = .failure(error)
  103. }
  104. // No SSL context means we must be using the Network.framework TLS stack (as
  105. // `tlsConfiguration` was not `nil`).
  106. if sslContext == nil {
  107. #if canImport(Network)
  108. if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *),
  109. let transportServicesBootstrap = bootstrap as? NIOTSListenerBootstrap {
  110. _ = transportServicesBootstrap.tlsOptions(from: tlsConfiguration)
  111. }
  112. #else
  113. // We must be using Network.framework (because we aren't using NIOSSL) but we don't have
  114. // an a NIOTSListenerBootstrap available, something is very wrong.
  115. preconditionFailure()
  116. #endif
  117. }
  118. } else {
  119. // No TLS configuration, no SSL context.
  120. sslContext = nil
  121. }
  122. return bootstrap
  123. // Enable `SO_REUSEADDR` to avoid "address already in use" error.
  124. .serverChannelOption(
  125. ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
  126. value: 1
  127. )
  128. // Set the handlers that are applied to the accepted Channels
  129. .childChannelInitializer { channel in
  130. var configuration = configuration
  131. configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
  132. configuration.logger.addIPAddressMetadata(
  133. local: channel.localAddress,
  134. remote: channel.remoteAddress
  135. )
  136. do {
  137. let sync = channel.pipeline.syncOperations
  138. if let sslContext = try sslContext?.get() {
  139. try sync.addHandler(NIOSSLServerHandler(context: sslContext))
  140. }
  141. // Configures the pipeline based on whether the connection uses TLS or not.
  142. try sync.addHandler(GRPCServerPipelineConfigurator(configuration: configuration))
  143. // Work around the zero length write issue, if needed.
  144. let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
  145. group: configuration.eventLoopGroup,
  146. hasTLS: configuration.tlsConfiguration != nil
  147. )
  148. if requiresZeroLengthWorkaround,
  149. #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) {
  150. try sync.addHandler(NIOFilterEmptyWritesHandler())
  151. }
  152. } catch {
  153. return channel.eventLoop.makeFailedFuture(error)
  154. }
  155. // Run the debug initializer, if there is one.
  156. if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
  157. return debugAcceptedChannelInitializer(channel)
  158. } else {
  159. return channel.eventLoop.makeSucceededVoidFuture()
  160. }
  161. }
  162. // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
  163. .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  164. .childChannelOption(
  165. ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
  166. value: 1
  167. )
  168. }
  169. /// Starts a server with the given configuration. See `Server.Configuration` for the options
  170. /// available to configure the server.
  171. public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
  172. let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
  173. return self.makeBootstrap(configuration: configuration)
  174. .serverChannelInitializer { channel in
  175. channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
  176. }
  177. .bind(to: configuration.target)
  178. .map { channel in
  179. Server(
  180. channel: channel,
  181. quiescingHelper: quiescingHelper,
  182. errorDelegate: configuration.errorDelegate
  183. )
  184. }
  185. }
  186. public let channel: Channel
  187. private let quiescingHelper: ServerQuiescingHelper
  188. private var errorDelegate: ServerErrorDelegate?
  189. private init(
  190. channel: Channel,
  191. quiescingHelper: ServerQuiescingHelper,
  192. errorDelegate: ServerErrorDelegate?
  193. ) {
  194. self.channel = channel
  195. self.quiescingHelper = quiescingHelper
  196. // Maintain a strong reference to ensure it lives as long as the server.
  197. self.errorDelegate = errorDelegate
  198. // If we have an error delegate, add a server channel error handler as well. We don't need to wait for the handler to
  199. // be added.
  200. if let errorDelegate = errorDelegate {
  201. _ = channel.pipeline.addHandler(ServerChannelErrorHandler(errorDelegate: errorDelegate))
  202. }
  203. // nil out errorDelegate to avoid retain cycles.
  204. self.onClose.whenComplete { _ in
  205. self.errorDelegate = nil
  206. }
  207. }
  208. /// Fired when the server shuts down.
  209. public var onClose: EventLoopFuture<Void> {
  210. return self.channel.closeFuture
  211. }
  212. /// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
  213. /// connections will be rejected.
  214. public func initiateGracefulShutdown(promise: EventLoopPromise<Void>?) {
  215. self.quiescingHelper.initiateShutdown(promise: promise)
  216. }
  217. /// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
  218. /// connections will be rejected.
  219. public func initiateGracefulShutdown() -> EventLoopFuture<Void> {
  220. let promise = self.channel.eventLoop.makePromise(of: Void.self)
  221. self.initiateGracefulShutdown(promise: promise)
  222. return promise.futureResult
  223. }
  224. /// Shutdown the server immediately. Active RPCs and connections will be terminated.
  225. public func close(promise: EventLoopPromise<Void>?) {
  226. self.channel.close(mode: .all, promise: promise)
  227. }
  228. /// Shutdown the server immediately. Active RPCs and connections will be terminated.
  229. public func close() -> EventLoopFuture<Void> {
  230. return self.channel.close(mode: .all)
  231. }
  232. }
  233. public typealias BindTarget = ConnectionTarget
  234. extension Server {
  235. /// The configuration for a server.
  236. public struct Configuration {
  237. /// The target to bind to.
  238. public var target: BindTarget
  239. /// The event loop group to run the connection on.
  240. public var eventLoopGroup: EventLoopGroup
  241. /// Providers the server should use to handle gRPC requests.
  242. public var serviceProviders: [CallHandlerProvider] {
  243. get {
  244. return Array(self.serviceProvidersByName.values)
  245. }
  246. set {
  247. self
  248. .serviceProvidersByName = Dictionary(
  249. uniqueKeysWithValues: newValue
  250. .map { ($0.serviceName, $0) }
  251. )
  252. }
  253. }
  254. /// An error delegate which is called when errors are caught. Provided delegates **must not
  255. /// maintain a strong reference to this `Server`**. Doing so will cause a retain cycle.
  256. public var errorDelegate: ServerErrorDelegate?
  257. /// TLS configuration for this connection. `nil` if TLS is not desired.
  258. @available(*, deprecated, renamed: "tlsConfiguration")
  259. public var tls: TLS? {
  260. get {
  261. return self.tlsConfiguration?.asDeprecatedServerConfiguration
  262. }
  263. set {
  264. self.tlsConfiguration = newValue.map { GRPCTLSConfiguration(transforming: $0) }
  265. }
  266. }
  267. public var tlsConfiguration: GRPCTLSConfiguration?
  268. /// The connection keepalive configuration.
  269. public var connectionKeepalive = ServerConnectionKeepalive()
  270. /// The amount of time to wait before closing connections. The idle timeout will start only
  271. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  272. public var connectionIdleTimeout: TimeAmount = .nanoseconds(.max)
  273. /// The compression configuration for requests and responses.
  274. ///
  275. /// If compression is enabled for the server it may be disabled for responses on any RPC by
  276. /// setting `compressionEnabled` to `false` on the context of the call.
  277. ///
  278. /// Compression may also be disabled at the message-level for streaming responses (i.e. server
  279. /// streaming and bidirectional streaming RPCs) by passing setting `compression` to `.disabled`
  280. /// in `sendResponse(_:compression)`.
  281. ///
  282. /// Defaults to `.disabled`.
  283. public var messageEncoding: ServerMessageEncoding = .disabled
  284. /// The maximum size in bytes of a message which may be received from a client. Defaults to 4MB.
  285. public var maximumReceiveMessageLength: Int = 4 * 1024 * 1024 {
  286. willSet {
  287. precondition(newValue >= 0, "maximumReceiveMessageLength must be positive")
  288. }
  289. }
  290. /// The HTTP/2 flow control target window size. Defaults to 65535.
  291. public var httpTargetWindowSize: Int = 65535
  292. /// The HTTP/2 max number of concurrent streams. Defaults to 100. Must be non-negative.
  293. public var httpMaxConcurrentStreams: Int = 100 {
  294. willSet {
  295. precondition(newValue >= 0, "httpMaxConcurrentStreams must be non-negative")
  296. }
  297. }
  298. /// The root server logger. Accepted connections will branch from this logger and RPCs on
  299. /// each connection will use a logger branched from the connections logger. This logger is made
  300. /// available to service providers via `context`. Defaults to a no-op logger.
  301. public var logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() })
  302. /// A channel initializer which will be run after gRPC has initialized each accepted channel.
  303. /// This may be used to add additional handlers to the pipeline and is intended for debugging.
  304. /// This is analogous to `NIO.ServerBootstrap.childChannelInitializer`.
  305. ///
  306. /// - Warning: The initializer closure may be invoked *multiple times*. More precisely: it will
  307. /// be invoked at most once per accepted connection.
  308. public var debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
  309. /// A calculated private cache of the service providers by name.
  310. ///
  311. /// This is how gRPC consumes the service providers internally. Caching this as stored data avoids
  312. /// the need to recalculate this dictionary each time we receive an rpc.
  313. internal var serviceProvidersByName: [Substring: CallHandlerProvider]
  314. /// Create a `Configuration` with some pre-defined defaults.
  315. ///
  316. /// - Parameters:
  317. /// - target: The target to bind to.
  318. /// - eventLoopGroup: The event loop group to run the server on.
  319. /// - serviceProviders: An array of `CallHandlerProvider`s which the server should use
  320. /// to handle requests.
  321. /// - errorDelegate: The error delegate, defaulting to a logging delegate.
  322. /// - tls: TLS configuration, defaulting to `nil`.
  323. /// - connectionKeepalive: The keepalive configuration to use.
  324. /// - connectionIdleTimeout: The amount of time to wait before closing the connection, this is
  325. /// indefinite by default.
  326. /// - messageEncoding: Message compression configuration, defaulting to no compression.
  327. /// - httpTargetWindowSize: The HTTP/2 flow control target window size.
  328. /// - logger: A logger. Defaults to a no-op logger.
  329. /// - debugChannelInitializer: A channel initializer which will be called for each connection
  330. /// the server accepts after gRPC has initialized the channel. Defaults to `nil`.
  331. @available(*, deprecated, renamed: "default(target:eventLoopGroup:serviceProviders:)")
  332. public init(
  333. target: BindTarget,
  334. eventLoopGroup: EventLoopGroup,
  335. serviceProviders: [CallHandlerProvider],
  336. errorDelegate: ServerErrorDelegate? = nil,
  337. tls: TLS? = nil,
  338. connectionKeepalive: ServerConnectionKeepalive = ServerConnectionKeepalive(),
  339. connectionIdleTimeout: TimeAmount = .nanoseconds(.max),
  340. messageEncoding: ServerMessageEncoding = .disabled,
  341. httpTargetWindowSize: Int = 65535,
  342. logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
  343. debugChannelInitializer: ((Channel) -> EventLoopFuture<Void>)? = nil
  344. ) {
  345. self.target = target
  346. self.eventLoopGroup = eventLoopGroup
  347. self
  348. .serviceProvidersByName = Dictionary(
  349. uniqueKeysWithValues: serviceProviders
  350. .map { ($0.serviceName, $0) }
  351. )
  352. self.errorDelegate = errorDelegate
  353. self.tlsConfiguration = tls.map { GRPCTLSConfiguration(transforming: $0) }
  354. self.connectionKeepalive = connectionKeepalive
  355. self.connectionIdleTimeout = connectionIdleTimeout
  356. self.messageEncoding = messageEncoding
  357. self.httpTargetWindowSize = httpTargetWindowSize
  358. self.logger = logger
  359. self.debugChannelInitializer = debugChannelInitializer
  360. }
  361. private init(
  362. eventLoopGroup: EventLoopGroup,
  363. target: BindTarget,
  364. serviceProviders: [CallHandlerProvider]
  365. ) {
  366. self.eventLoopGroup = eventLoopGroup
  367. self.target = target
  368. self.serviceProvidersByName = Dictionary(uniqueKeysWithValues: serviceProviders.map {
  369. ($0.serviceName, $0)
  370. })
  371. }
  372. /// Make a new configuration using default values.
  373. ///
  374. /// - Parameters:
  375. /// - target: The target to bind to.
  376. /// - eventLoopGroup: The `EventLoopGroup` the server should run on.
  377. /// - serviceProviders: An array of `CallHandlerProvider`s which the server should use
  378. /// to handle requests.
  379. /// - Returns: A configuration with default values set.
  380. public static func `default`(
  381. target: BindTarget,
  382. eventLoopGroup: EventLoopGroup,
  383. serviceProviders: [CallHandlerProvider]
  384. ) -> Configuration {
  385. return .init(
  386. eventLoopGroup: eventLoopGroup,
  387. target: target,
  388. serviceProviders: serviceProviders
  389. )
  390. }
  391. }
  392. }
  393. private extension ServerBootstrapProtocol {
  394. func bind(to target: BindTarget) -> EventLoopFuture<Channel> {
  395. switch target.wrapped {
  396. case let .hostAndPort(host, port):
  397. return self.bind(host: host, port: port)
  398. case let .unixDomainSocket(path):
  399. return self.bind(unixDomainSocketPath: path)
  400. case let .socketAddress(address):
  401. return self.bind(to: address)
  402. }
  403. }
  404. }