|
|
@@ -98,25 +98,7 @@ public final class Server: @unchecked Sendable {
|
|
|
}
|
|
|
|
|
|
#if canImport(NIOSSL)
|
|
|
- // Making a `NIOSSLContext` is expensive, we should only do it once per TLS configuration so
|
|
|
- // we'll do it now, before accepting connections. Unfortunately our API isn't throwing so we'll
|
|
|
- // only surface any error when initializing a child channel.
|
|
|
- //
|
|
|
- // 'nil' means we're not using TLS, or we're using the Network.framework TLS backend. If we're
|
|
|
- // using the Network.framework TLS backend we'll apply the settings just below.
|
|
|
- let sslContext: Result<NIOSSLContext, Error>?
|
|
|
-
|
|
|
- if let tlsConfiguration = configuration.tlsConfiguration {
|
|
|
- do {
|
|
|
- sslContext = try tlsConfiguration.makeNIOSSLContext().map { .success($0) }
|
|
|
- } catch {
|
|
|
- sslContext = .failure(error)
|
|
|
- }
|
|
|
-
|
|
|
- } else {
|
|
|
- // No TLS configuration, no SSL context.
|
|
|
- sslContext = nil
|
|
|
- }
|
|
|
+ let sslContext = Self.makeNIOSSLContext(configuration: configuration)
|
|
|
#endif // canImport(NIOSSL)
|
|
|
|
|
|
#if canImport(Network)
|
|
|
@@ -152,53 +134,10 @@ public final class Server: @unchecked Sendable {
|
|
|
)
|
|
|
// Set the handlers that are applied to the accepted Channels
|
|
|
.childChannelInitializer { channel in
|
|
|
- var configuration = configuration
|
|
|
- configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
|
|
|
- configuration.logger.addIPAddressMetadata(
|
|
|
- local: channel.localAddress,
|
|
|
- remote: channel.remoteAddress
|
|
|
- )
|
|
|
-
|
|
|
- do {
|
|
|
- let sync = channel.pipeline.syncOperations
|
|
|
+ Self.configureAcceptedChannel(channel, configuration: configuration) { sync in
|
|
|
#if canImport(NIOSSL)
|
|
|
- if let sslContext = try sslContext?.get() {
|
|
|
- let sslHandler: NIOSSLServerHandler
|
|
|
- if let verify = configuration.tlsConfiguration?.nioSSLCustomVerificationCallback {
|
|
|
- sslHandler = NIOSSLServerHandler(
|
|
|
- context: sslContext,
|
|
|
- customVerificationCallback: verify
|
|
|
- )
|
|
|
- } else {
|
|
|
- sslHandler = NIOSSLServerHandler(context: sslContext)
|
|
|
- }
|
|
|
-
|
|
|
- try sync.addHandler(sslHandler)
|
|
|
- }
|
|
|
+ try Self.addNIOSSLHandler(sslContext, configuration: configuration, sync: sync)
|
|
|
#endif // canImport(NIOSSL)
|
|
|
-
|
|
|
- // Configures the pipeline based on whether the connection uses TLS or not.
|
|
|
- try sync.addHandler(GRPCServerPipelineConfigurator(configuration: configuration))
|
|
|
-
|
|
|
- // Work around the zero length write issue, if needed.
|
|
|
- let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
|
|
|
- group: configuration.eventLoopGroup,
|
|
|
- hasTLS: configuration.tlsConfiguration != nil
|
|
|
- )
|
|
|
- if requiresZeroLengthWorkaround,
|
|
|
- #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
|
|
|
- {
|
|
|
- try sync.addHandler(NIOFilterEmptyWritesHandler())
|
|
|
- }
|
|
|
- } catch {
|
|
|
- return channel.eventLoop.makeFailedFuture(error)
|
|
|
- }
|
|
|
-
|
|
|
- // Run the debug initializer, if there is one.
|
|
|
- if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
|
|
|
- return debugAcceptedChannelInitializer(channel)
|
|
|
- } else {
|
|
|
- return channel.eventLoop.makeSucceededVoidFuture()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -210,11 +149,108 @@ public final class Server: @unchecked Sendable {
|
|
|
)
|
|
|
}
|
|
|
|
|
|
+ #if canImport(NIOSSL)
|
|
|
+ private static func makeNIOSSLContext(
|
|
|
+ configuration: Configuration
|
|
|
+ ) -> Result<NIOSSLContext, Error>? {
|
|
|
+ // Making a `NIOSSLContext` is expensive, we should only do it once per TLS configuration so
|
|
|
+ // we'll do it now, before accepting connections. Unfortunately our API isn't throwing so we'll
|
|
|
+ // only surface any error when initializing a child channel.
|
|
|
+ //
|
|
|
+ // 'nil' means we're not using TLS, or we're using the Network.framework TLS backend. If we're
|
|
|
+ // using the Network.framework TLS backend we'll apply the settings just below.
|
|
|
+ let sslContext: Result<NIOSSLContext, Error>?
|
|
|
+
|
|
|
+ if let tlsConfiguration = configuration.tlsConfiguration {
|
|
|
+ do {
|
|
|
+ sslContext = try tlsConfiguration.makeNIOSSLContext().map { .success($0) }
|
|
|
+ } catch {
|
|
|
+ sslContext = .failure(error)
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // No TLS configuration, no SSL context.
|
|
|
+ sslContext = nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return sslContext
|
|
|
+ }
|
|
|
+
|
|
|
+ private static func addNIOSSLHandler(
|
|
|
+ _ sslContext: Result<NIOSSLContext, Error>?,
|
|
|
+ configuration: Configuration,
|
|
|
+ sync: ChannelPipeline.SynchronousOperations
|
|
|
+ ) throws {
|
|
|
+ if let sslContext = try sslContext?.get() {
|
|
|
+ let sslHandler: NIOSSLServerHandler
|
|
|
+ if let verify = configuration.tlsConfiguration?.nioSSLCustomVerificationCallback {
|
|
|
+ sslHandler = NIOSSLServerHandler(
|
|
|
+ context: sslContext,
|
|
|
+ customVerificationCallback: verify
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ sslHandler = NIOSSLServerHandler(context: sslContext)
|
|
|
+ }
|
|
|
+
|
|
|
+ try sync.addHandler(sslHandler)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ #endif // canImport(NIOSSL)
|
|
|
+
|
|
|
+ private static func configureAcceptedChannel(
|
|
|
+ _ channel: Channel,
|
|
|
+ configuration: Configuration,
|
|
|
+ addNIOSSLIfNecessary: (ChannelPipeline.SynchronousOperations) throws -> Void
|
|
|
+ ) -> EventLoopFuture<Void> {
|
|
|
+ var configuration = configuration
|
|
|
+ configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
|
|
|
+ configuration.logger.addIPAddressMetadata(
|
|
|
+ local: channel.localAddress,
|
|
|
+ remote: channel.remoteAddress
|
|
|
+ )
|
|
|
+
|
|
|
+ do {
|
|
|
+ let sync = channel.pipeline.syncOperations
|
|
|
+ try addNIOSSLIfNecessary(sync)
|
|
|
+
|
|
|
+ // Configures the pipeline based on whether the connection uses TLS or not.
|
|
|
+ try sync.addHandler(GRPCServerPipelineConfigurator(configuration: configuration))
|
|
|
+
|
|
|
+ // Work around the zero length write issue, if needed.
|
|
|
+ let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
|
|
|
+ group: configuration.eventLoopGroup,
|
|
|
+ hasTLS: configuration.tlsConfiguration != nil
|
|
|
+ )
|
|
|
+ if requiresZeroLengthWorkaround,
|
|
|
+ #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
|
|
|
+ {
|
|
|
+ try sync.addHandler(NIOFilterEmptyWritesHandler())
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ return channel.eventLoop.makeFailedFuture(error)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Run the debug initializer, if there is one.
|
|
|
+ if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
|
|
|
+ return debugAcceptedChannelInitializer(channel)
|
|
|
+ } else {
|
|
|
+ return channel.eventLoop.makeSucceededVoidFuture()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/// Starts a server with the given configuration. See `Server.Configuration` for the options
|
|
|
/// available to configure the server.
|
|
|
public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
|
|
|
- let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
|
|
|
+ switch configuration.target.wrapped {
|
|
|
+ case .connectedSocket(let handle) where configuration.connectedSocketTargetIsAcceptedConnection:
|
|
|
+ return Self.startServerFromAcceptedConnection(handle: handle, configuration: configuration)
|
|
|
+ case .connectedSocket, .hostAndPort, .unixDomainSocket, .socketAddress, .vsockAddress:
|
|
|
+ return Self.startServer(configuration: configuration)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ private static func startServer(configuration: Configuration) -> EventLoopFuture<Server> {
|
|
|
+ let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
|
|
|
return self.makeBootstrap(configuration: configuration)
|
|
|
.serverChannelInitializer { channel in
|
|
|
channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
|
|
|
@@ -229,13 +265,53 @@ public final class Server: @unchecked Sendable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static func startServerFromAcceptedConnection(
|
|
|
+ handle: NIOBSDSocket.Handle,
|
|
|
+ configuration: Configuration
|
|
|
+ ) -> EventLoopFuture<Server> {
|
|
|
+ guard let bootstrap = ClientBootstrap(validatingGroup: configuration.eventLoopGroup) else {
|
|
|
+ let status = GRPCStatus(
|
|
|
+ code: .unimplemented,
|
|
|
+ message: """
|
|
|
+ You must use a NIOPosix EventLoopGroup to create a server from an already accepted \
|
|
|
+ socket.
|
|
|
+ """
|
|
|
+ )
|
|
|
+ return configuration.eventLoopGroup.any().makeFailedFuture(status)
|
|
|
+ }
|
|
|
+
|
|
|
+ #if canImport(NIOSSL)
|
|
|
+ let sslContext = Self.makeNIOSSLContext(configuration: configuration)
|
|
|
+ #endif // canImport(NIOSSL)
|
|
|
+
|
|
|
+ return bootstrap.channelInitializer { channel in
|
|
|
+ Self.configureAcceptedChannel(channel, configuration: configuration) { sync in
|
|
|
+ #if canImport(NIOSSL)
|
|
|
+ try Self.addNIOSSLHandler(sslContext, configuration: configuration, sync: sync)
|
|
|
+ #endif // canImport(NIOSSL)
|
|
|
+ }
|
|
|
+ }.withConnectedSocket(handle).map { channel in
|
|
|
+ Server(
|
|
|
+ channel: channel,
|
|
|
+ quiescingHelper: nil,
|
|
|
+ errorDelegate: configuration.errorDelegate
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// The listening server channel.
|
|
|
+ ///
|
|
|
+ /// If the server was created from an already accepted connection then this channel will
|
|
|
+ /// be for the accepted connection.
|
|
|
public let channel: Channel
|
|
|
- private let quiescingHelper: ServerQuiescingHelper
|
|
|
+
|
|
|
+ /// Quiescing helper. `nil` if `channel` is for an accepted connection.
|
|
|
+ private let quiescingHelper: ServerQuiescingHelper?
|
|
|
private var errorDelegate: ServerErrorDelegate?
|
|
|
|
|
|
private init(
|
|
|
channel: Channel,
|
|
|
- quiescingHelper: ServerQuiescingHelper,
|
|
|
+ quiescingHelper: ServerQuiescingHelper?,
|
|
|
errorDelegate: ServerErrorDelegate?
|
|
|
) {
|
|
|
self.channel = channel
|
|
|
@@ -264,7 +340,13 @@ public final class Server: @unchecked Sendable {
|
|
|
/// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
|
|
|
/// connections will be rejected.
|
|
|
public func initiateGracefulShutdown(promise: EventLoopPromise<Void>?) {
|
|
|
- self.quiescingHelper.initiateShutdown(promise: promise)
|
|
|
+ if let quiescingHelper = self.quiescingHelper {
|
|
|
+ quiescingHelper.initiateShutdown(promise: promise)
|
|
|
+ } else {
|
|
|
+ // No quiescing helper: the channel must be for an already accepted connection.
|
|
|
+ self.channel.closeFuture.cascade(to: promise)
|
|
|
+ self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
|
|
|
@@ -436,6 +518,13 @@ extension Server {
|
|
|
/// CORS configuration for gRPC-Web support.
|
|
|
public var webCORS = Configuration.CORS()
|
|
|
|
|
|
+ /// Indicates whether a `connectedSocket` ``target`` is treated as an accepted connection.
|
|
|
+ ///
|
|
|
+ /// If ``target`` is a `connectedSocket` then this flag indicates whether that socket is for
|
|
|
+ /// an already accepted connection. If the value is `false` then the socket is treated as a
|
|
|
+ /// listener. This value is ignored if ``target`` is any value other than `connectedSocket`.
|
|
|
+ public var connectedSocketTargetIsAcceptedConnection: Bool = false
|
|
|
+
|
|
|
#if canImport(NIOSSL)
|
|
|
/// Create a `Configuration` with some pre-defined defaults.
|
|
|
///
|