Server.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import NIOSSL
  21. import Logging
  22. /// Wrapper object to manage the lifecycle of a gRPC server.
  23. ///
  24. /// The pipeline is configured in three stages detailed below. Note: handlers marked with
  25. /// a '*' are responsible for handling errors.
  26. ///
  27. /// 1. Initial stage, prior to HTTP protocol detection.
  28. ///
  29. /// ┌───────────────────────────┐
  30. /// │ HTTPProtocolSwitcher* │
  31. /// └─▲───────────────────────┬─┘
  32. /// ByteBuffer│ │ByteBuffer
  33. /// ┌─┴───────────────────────▼─┐
  34. /// │ NIOSSLHandler │
  35. /// └─▲───────────────────────┬─┘
  36. /// ByteBuffer│ │ByteBuffer
  37. /// │ ▼
  38. ///
  39. /// The `NIOSSLHandler` is optional and depends on how the framework user has configured
  40. /// their server. The `HTTPProtocolSwitcher` detects which HTTP version is being used and
  41. /// configures the pipeline accordingly.
  42. ///
  43. /// 2. HTTP version detected. "HTTP Handlers" depends on the HTTP version determined by
  44. /// `HTTPProtocolSwitcher`. All of these handlers are provided by NIO except for the
  45. /// `WebCORSHandler` which is used for HTTP/1.
  46. ///
  47. /// ┌─────────────────────────────────┐
  48. /// │ GRPCServerRequestRoutingHandler │
  49. /// └─▲─────────────────────────────┬─┘
  50. /// HTTPServerRequestPart│ │HTTPServerResponsePart
  51. /// ┌─┴─────────────────────────────▼─┐
  52. /// │ HTTP Handlers │
  53. /// └─▲─────────────────────────────┬─┘
  54. /// ByteBuffer│ │ByteBuffer
  55. /// ┌─┴─────────────────────────────▼─┐
  56. /// │ NIOSSLHandler │
  57. /// └─▲─────────────────────────────┬─┘
  58. /// ByteBuffer│ │ByteBuffer
  59. /// │ ▼
  60. ///
  61. /// The `GRPCServerRequestRoutingHandler` resolves the request head and configures the rest of
  62. /// the pipeline based on the RPC call being made.
  63. ///
  64. /// 3. The call has been resolved and is a function that this server can handle. Responses are
  65. /// written into `BaseCallHandler` by a user-implemented `CallHandlerProvider`.
  66. ///
  67. /// ┌─────────────────────────────────┐
  68. /// │ BaseCallHandler* │
  69. /// └─▲─────────────────────────────┬─┘
  70. /// GRPCServerRequestPart<T1>│ │GRPCServerResponsePart<T2>
  71. /// ┌─┴─────────────────────────────▼─┐
  72. /// │ HTTP1ToGRPCServerCodec │
  73. /// └─▲─────────────────────────────┬─┘
  74. /// HTTPServerRequestPart│ │HTTPServerResponsePart
  75. /// ┌─┴─────────────────────────────▼─┐
  76. /// │ HTTP Handlers │
  77. /// └─▲─────────────────────────────┬─┘
  78. /// ByteBuffer│ │ByteBuffer
  79. /// ┌─┴─────────────────────────────▼─┐
  80. /// │ NIOSSLHandler │
  81. /// └─▲─────────────────────────────┬─┘
  82. /// ByteBuffer│ │ByteBuffer
  83. /// │ ▼
  84. ///
  85. public final class Server {
  86. /// Makes and configures a `ServerBootstrap` using the provided configuration.
  87. public class func makeBootstrap(configuration: Configuration) -> ServerBootstrapProtocol {
  88. let bootstrap = PlatformSupport.makeServerBootstrap(group: configuration.eventLoopGroup)
  89. // Backlog is only available on `ServerBootstrap`.
  90. if bootstrap is ServerBootstrap {
  91. // Specify a backlog to avoid overloading the server.
  92. _ = bootstrap.serverChannelOption(ChannelOptions.backlog, value: 256)
  93. }
  94. return bootstrap
  95. // Enable `SO_REUSEADDR` to avoid "address already in use" error.
  96. .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  97. // Set the handlers that are applied to the accepted Channels
  98. .childChannelInitializer { channel in
  99. let protocolSwitcher = HTTPProtocolSwitcher(
  100. errorDelegate: configuration.errorDelegate,
  101. httpTargetWindowSize: configuration.httpTargetWindowSize,
  102. keepAlive: configuration.connectionKeepalive,
  103. idleTimeout: configuration.connectionIdleTimeout
  104. ) { channel -> EventLoopFuture<Void> in
  105. let logger = Logger(subsystem: .serverChannelCall, metadata: [MetadataKey.requestID: "\(UUID())"])
  106. let handler = GRPCServerRequestRoutingHandler(
  107. servicesByName: configuration.serviceProvidersByName,
  108. encoding: configuration.messageEncoding,
  109. errorDelegate: configuration.errorDelegate,
  110. logger: logger
  111. )
  112. return channel.pipeline.addHandler(handler)
  113. }
  114. if let tls = configuration.tls {
  115. return channel.configureTLS(configuration: tls).flatMap {
  116. channel.pipeline.addHandler(protocolSwitcher)
  117. }
  118. } else {
  119. return channel.pipeline.addHandler(protocolSwitcher)
  120. }
  121. }
  122. // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
  123. .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
  124. .childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
  125. }
  126. /// Starts a server with the given configuration. See `Server.Configuration` for the options
  127. /// available to configure the server.
  128. public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
  129. return makeBootstrap(configuration: configuration)
  130. .bind(to: configuration.target)
  131. .map { channel in
  132. Server(channel: channel, errorDelegate: configuration.errorDelegate)
  133. }
  134. }
  135. public let channel: Channel
  136. private var errorDelegate: ServerErrorDelegate?
  137. private init(channel: Channel, errorDelegate: ServerErrorDelegate?) {
  138. self.channel = channel
  139. // Maintain a strong reference to ensure it lives as long as the server.
  140. self.errorDelegate = errorDelegate
  141. // If we have an error delegate, add a server channel error handler as well. We don't need to wait for the handler to
  142. // be added.
  143. if let errorDelegate = errorDelegate {
  144. _ = channel.pipeline.addHandler(ServerChannelErrorHandler(errorDelegate: errorDelegate))
  145. }
  146. // nil out errorDelegate to avoid retain cycles.
  147. onClose.whenComplete { _ in
  148. self.errorDelegate = nil
  149. }
  150. }
  151. /// Fired when the server shuts down.
  152. public var onClose: EventLoopFuture<Void> {
  153. return channel.closeFuture
  154. }
  155. /// Shut down the server; this should be called to avoid leaking resources.
  156. public func close() -> EventLoopFuture<Void> {
  157. return channel.close(mode: .all)
  158. }
  159. }
  160. public typealias BindTarget = ConnectionTarget
  161. extension Server {
  162. /// The configuration for a server.
  163. public struct Configuration {
  164. /// The target to bind to.
  165. public var target: BindTarget
  166. /// The event loop group to run the connection on.
  167. public var eventLoopGroup: EventLoopGroup
  168. /// Providers the server should use to handle gRPC requests.
  169. public var serviceProviders: [CallHandlerProvider]
  170. /// An error delegate which is called when errors are caught. Provided delegates **must not
  171. /// maintain a strong reference to this `Server`**. Doing so will cause a retain cycle.
  172. public var errorDelegate: ServerErrorDelegate?
  173. /// TLS configuration for this connection. `nil` if TLS is not desired.
  174. public var tls: TLS?
  175. /// The connection keepalive configuration.
  176. public var connectionKeepalive: ServerConnectionKeepalive
  177. /// The amount of time to wait before closing connections. The idle timeout will start only
  178. /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
  179. public var connectionIdleTimeout: TimeAmount
  180. /// The compression configuration for requests and responses.
  181. ///
  182. /// If compression is enabled for the server it may be disabled for responses on any RPC by
  183. /// setting `compressionEnabled` to `false` on the context of the call.
  184. ///
  185. /// Compression may also be disabled at the message-level for streaming responses (i.e. server
  186. /// streaming and bidirectional streaming RPCs) by passing setting `compression` to `.disabled`
  187. /// in `sendResponse(_:compression)`.
  188. public var messageEncoding: ServerMessageEncoding
  189. /// The HTTP/2 flow control target window size.
  190. public var httpTargetWindowSize: Int
  191. /// Create a `Configuration` with some pre-defined defaults.
  192. ///
  193. /// - Parameter target: The target to bind to.
  194. /// - Parameter eventLoopGroup: The event loop group to run the server on.
  195. /// - Parameter serviceProviders: An array of `CallHandlerProvider`s which the server should use
  196. /// to handle requests.
  197. /// - Parameter errorDelegate: The error delegate, defaulting to a logging delegate.
  198. /// - Parameter tls: TLS configuration, defaulting to `nil`.
  199. /// - Parameter connectionKeepalive: The keepalive configuration to use.
  200. /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 5 minutes.
  201. /// - Parameter messageEncoding: Message compression configuration, defaulting to no compression.
  202. /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size.
  203. public init(
  204. target: BindTarget,
  205. eventLoopGroup: EventLoopGroup,
  206. serviceProviders: [CallHandlerProvider],
  207. errorDelegate: ServerErrorDelegate? = LoggingServerErrorDelegate.shared,
  208. tls: TLS? = nil,
  209. connectionKeepalive: ServerConnectionKeepalive = ServerConnectionKeepalive(),
  210. connectionIdleTimeout: TimeAmount = .minutes(5),
  211. messageEncoding: ServerMessageEncoding = .disabled,
  212. httpTargetWindowSize: Int = 65535
  213. ) {
  214. self.target = target
  215. self.eventLoopGroup = eventLoopGroup
  216. self.serviceProviders = serviceProviders
  217. self.errorDelegate = errorDelegate
  218. self.tls = tls
  219. self.connectionKeepalive = connectionKeepalive
  220. self.connectionIdleTimeout = connectionIdleTimeout
  221. self.messageEncoding = messageEncoding
  222. self.httpTargetWindowSize = httpTargetWindowSize
  223. }
  224. }
  225. }
  226. fileprivate extension Server.Configuration {
  227. var serviceProvidersByName: [String: CallHandlerProvider] {
  228. return Dictionary(uniqueKeysWithValues: self.serviceProviders.map { ($0.serviceName, $0) })
  229. }
  230. }
  231. fileprivate extension Channel {
  232. /// Configure an SSL handler on the channel.
  233. ///
  234. /// - Parameters:
  235. /// - configuration: The configuration to use when creating the handler.
  236. /// - Returns: A future which will be succeeded when the pipeline has been configured.
  237. func configureTLS(configuration: Server.Configuration.TLS) -> EventLoopFuture<Void> {
  238. do {
  239. let context = try NIOSSLContext(configuration: configuration.configuration)
  240. return self.pipeline.addHandler(NIOSSLServerHandler(context: context))
  241. } catch {
  242. return self.pipeline.eventLoop.makeFailedFuture(error)
  243. }
  244. }
  245. }
  246. fileprivate extension ServerBootstrapProtocol {
  247. func bind(to target: BindTarget) -> EventLoopFuture<Channel> {
  248. switch target.wrapped {
  249. case .hostAndPort(let host, let port):
  250. return self.bind(host: host, port: port)
  251. case .unixDomainSocket(let path):
  252. return self.bind(unixDomainSocketPath: path)
  253. case .socketAddress(let address):
  254. return self.bind(to: address)
  255. }
  256. }
  257. }