| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262 |
- /*
- * Copyright 2020, gRPC Authors All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import EchoModel
- @testable import GRPC
- import Logging
- import NIO
- import NIOHTTP2
- import XCTest
- class ConnectionManagerTests: GRPCTestCase {
- private let loop = EmbeddedEventLoop()
- private let recorder = RecordingConnectivityDelegate()
- private var monitor: ConnectivityStateMonitor!
- private var defaultConfiguration: ClientConnection.Configuration {
- var configuration = ClientConnection.Configuration.default(
- target: .unixDomainSocket("/ignored"),
- eventLoopGroup: self.loop
- )
- configuration.connectionBackoff = nil
- configuration.backgroundActivityLogger = self.clientLogger
- return configuration
- }
- override func setUp() {
- super.setUp()
- self.monitor = ConnectivityStateMonitor(delegate: self.recorder, queue: nil)
- }
- override func tearDown() {
- XCTAssertNoThrow(try self.loop.syncShutdownGracefully())
- super.tearDown()
- }
- private func makeConnectionManager(
- configuration config: ClientConnection.Configuration? = nil,
- channelProvider: ((ConnectionManager, EventLoop) -> EventLoopFuture<Channel>)? = nil
- ) -> ConnectionManager {
- let configuration = config ?? self.defaultConfiguration
- return ConnectionManager(
- configuration: configuration,
- channelProvider: channelProvider.map { HookedChannelProvider($0) },
- connectivityDelegate: self.monitor,
- logger: self.logger
- )
- }
- private func waitForStateChange<Result>(
- from: ConnectivityState,
- to: ConnectivityState,
- timeout: DispatchTimeInterval = .seconds(1),
- file: StaticString = #file,
- line: UInt = #line,
- body: () throws -> Result
- ) rethrows -> Result {
- self.recorder.expectChange {
- XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
- }
- let result = try body()
- self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
- return result
- }
- private func waitForStateChanges<Result>(
- _ changes: [Change],
- timeout: DispatchTimeInterval = .seconds(1),
- file: StaticString = #file,
- line: UInt = #line,
- body: () throws -> Result
- ) rethrows -> Result {
- self.recorder.expectChanges(changes.count) {
- XCTAssertEqual($0, changes)
- }
- let result = try body()
- self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
- return result
- }
- }
- extension ConnectionManagerTests {
- func testIdleShutdown() throws {
- let manager = self.makeConnectionManager()
- try self.waitForStateChange(from: .idle, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- // Getting a multiplexer should fail.
- let multiplexer = manager.getHTTP2Multiplexer()
- self.loop.run()
- XCTAssertThrowsError(try multiplexer.wait())
- }
- func testConnectFromIdleFailsWithNoReconnect() {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let channel = manager.getHTTP2Multiplexer()
- self.loop.run()
- return channel
- }
- self.waitForStateChange(from: .connecting, to: .shutdown) {
- channelPromise.fail(DoomedChannelError())
- }
- XCTAssertThrowsError(try multiplexer.wait()) {
- XCTAssertTrue($0 is DoomedChannelError)
- }
- }
- func testConnectAndDisconnect() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- // Start the connection.
- self.waitForStateChange(from: .idle, to: .connecting) {
- _ = manager.getHTTP2Multiplexer()
- self.loop.run()
- }
- // Setup the real channel and activate it.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // Write a settings frame on the root stream; this'll make the channel 'ready'.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- }
- // Close the channel.
- try self.waitForStateChange(from: .ready, to: .shutdown) {
- // Now the channel should be available: shut it down.
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testConnectAndIdle() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- // Start the connection.
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Setup the channel.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // Write a settings frame on the root stream; this'll make the channel 'ready'.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- // Wait for the multiplexer, it _must_ be ready now.
- XCTAssertNoThrow(try readyChannelMux.wait())
- }
- // Go idle. This will shutdown the channel.
- try self.waitForStateChange(from: .ready, to: .idle) {
- self.loop.advanceTime(by: .minutes(5))
- XCTAssertNoThrow(try channel.closeFuture.wait())
- }
- // Now shutdown.
- try self.waitForStateChange(from: .idle, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testIdleTimeoutWhenThereAreActiveStreams() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- // Start the connection.
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Setup the channel.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // Write a settings frame on the root stream; this'll make the channel 'ready'.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
- XCTAssertNoThrow(try readyChannelMux.wait())
- }
- // "create" a stream; the details don't matter here.
- let streamCreated = NIOHTTP2StreamCreatedEvent(
- streamID: 1,
- localInitialWindowSize: nil,
- remoteInitialWindowSize: nil
- )
- channel.pipeline.fireUserInboundEventTriggered(streamCreated)
- // Wait for the idle timeout: this should _not_ cause the channel to idle.
- self.loop.advanceTime(by: .minutes(5))
- // Now we're going to close the stream and wait for an idle timeout and then shutdown.
- self.waitForStateChange(from: .ready, to: .idle) {
- // Close the stream.
- let streamClosed = StreamClosedEvent(streamID: 1, reason: nil)
- channel.pipeline.fireUserInboundEventTriggered(streamClosed)
- // ... wait for the idle timeout,
- self.loop.advanceTime(by: .minutes(5))
- }
- // Now shutdown.
- try self.waitForStateChange(from: .idle, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testConnectAndThenBecomeInactive() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Setup the channel.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- try self.waitForStateChange(from: .connecting, to: .shutdown) {
- // Okay: now close the channel; the `readyChannel` future has not been completed yet.
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- // We failed to get a channel and we don't have reconnect configured: we should be shutdown and
- // the `readyChannelMux` should error.
- XCTAssertThrowsError(try readyChannelMux.wait())
- }
- func testConnectOnSecondAttempt() throws {
- let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
- let channelFutures: [EventLoopFuture<Channel>] = [
- self.loop.makeFailedFuture(DoomedChannelError()),
- channelPromise.futureResult,
- ]
- var channelFutureIterator = channelFutures.makeIterator()
- var configuration = self.defaultConfiguration
- configuration.connectionBackoff = .oneSecondFixed
- let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
- guard let next = channelFutureIterator.next() else {
- XCTFail("Too many channels requested")
- return self.loop.makeFailedFuture(DoomedChannelError())
- }
- return next
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- ]) {
- // Get a HTTP/2 stream multiplexer.
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Get a HTTP/2 stream mux from the manager - it is a future for the one we made earlier.
- let anotherReadyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- // Move time forwards by a second to start the next connection attempt.
- self.waitForStateChange(from: .transientFailure, to: .connecting) {
- self.loop.advanceTime(by: .seconds(1))
- }
- // Setup the actual channel and complete the promise.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // Write a SETTINGS frame on the root stream.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- }
- // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
- XCTAssertNoThrow(try readyChannelMux.wait())
- XCTAssertNoThrow(try anotherReadyChannelMux.wait())
- // Now shutdown.
- try self.waitForStateChange(from: .ready, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testShutdownWhileConnecting() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Now shutdown.
- try self.waitForStateChange(from: .connecting, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- // The multiplexer we were requesting should fail.
- XCTAssertThrowsError(try readyChannelMux.wait())
- // We still have our channel promise to fulfil: if it succeeds then it too should be closed.
- channelPromise.succeed(EmbeddedChannel(loop: self.loop))
- let channel = try channelPromise.futureResult.wait()
- self.loop.run()
- XCTAssertNoThrow(try channel.closeFuture.wait())
- }
- func testShutdownWhileTransientFailure() throws {
- var configuration = self.defaultConfiguration
- configuration.connectionBackoff = .oneSecondFixed
- let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
- self.loop.makeFailedFuture(DoomedChannelError())
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChanges([
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- ]) {
- // Get a HTTP/2 stream multiplexer.
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Now shutdown.
- try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- // The HTTP/2 stream mux we were requesting should fail.
- XCTAssertThrowsError(try readyChannelMux.wait())
- }
- func testShutdownWhileActive() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Prepare the channel
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // (No state change expected here: active is an internal state.)
- // Now shutdown.
- try self.waitForStateChange(from: .connecting, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- // The HTTP/2 stream multiplexer we were requesting should fail.
- XCTAssertThrowsError(try readyChannelMux.wait())
- }
- func testShutdownWhileShutdown() throws {
- let manager = self.makeConnectionManager()
- try self.waitForStateChange(from: .idle, to: .shutdown) {
- let firstShutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try firstShutdown.wait())
- }
- let secondShutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try secondShutdown.wait())
- }
- func testTransientFailureWhileActive() throws {
- var configuration = self.defaultConfiguration
- configuration.connectionBackoff = .oneSecondFixed
- let channelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
- let channelFutures: [EventLoopFuture<Channel>] = [
- channelPromise.futureResult,
- self.loop.makeFailedFuture(DoomedChannelError()),
- ]
- var channelFutureIterator = channelFutures.makeIterator()
- let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
- guard let next = channelFutureIterator.next() else {
- XCTFail("Too many channels requested")
- return self.loop.makeFailedFuture(DoomedChannelError())
- }
- return next
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Prepare the channel
- let firstChannel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: firstChannel,
- inboundStreamInitializer: nil
- )
- try firstChannel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(firstChannel)
- XCTAssertNoThrow(
- try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // (No state change expected here: active is an internal state.)
- // Close the channel (simulate e.g. TLS handshake failed)
- try self.waitForStateChange(from: .connecting, to: .transientFailure) {
- XCTAssertNoThrow(try firstChannel.close().wait())
- }
- // Start connecting again.
- self.waitForStateChanges([
- Change(from: .transientFailure, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- ]) {
- self.loop.advanceTime(by: .seconds(1))
- }
- // Now shutdown
- try self.waitForStateChange(from: .transientFailure, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- // The channel never came up: it should be throw.
- XCTAssertThrowsError(try readyChannelMux.wait())
- }
- func testTransientFailureWhileReady() throws {
- var configuration = self.defaultConfiguration
- configuration.connectionBackoff = .oneSecondFixed
- let firstChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
- let secondChannelPromise: EventLoopPromise<Channel> = self.loop.makePromise()
- let channelFutures: [EventLoopFuture<Channel>] = [
- firstChannelPromise.futureResult,
- secondChannelPromise.futureResult,
- ]
- var channelFutureIterator = channelFutures.makeIterator()
- let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
- guard let next = channelFutureIterator.next() else {
- XCTFail("Too many channels requested")
- return self.loop.makeFailedFuture(DoomedChannelError())
- }
- return next
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Prepare the first channel
- let firstChannel = EmbeddedChannel(loop: self.loop)
- let firstH2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: firstChannel,
- inboundStreamInitializer: nil
- )
- try firstChannel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: firstH2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- firstChannelPromise.succeed(firstChannel)
- XCTAssertNoThrow(
- try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // Write a SETTINGS frame on the root stream.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try firstChannel.writeInbound(frame))
- }
- // Channel should now be ready.
- XCTAssertNoThrow(try readyChannelMux.wait())
- // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
- let streamCreated = NIOHTTP2StreamCreatedEvent(
- streamID: 1,
- localInitialWindowSize: nil,
- remoteInitialWindowSize: nil
- )
- firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
- try self.waitForStateChange(from: .ready, to: .transientFailure) {
- XCTAssertNoThrow(try firstChannel.close().wait())
- }
- // Run to start connecting again.
- self.waitForStateChange(from: .transientFailure, to: .connecting) {
- self.loop.advanceTime(by: .seconds(1))
- }
- // Prepare the second channel
- let secondChannel = EmbeddedChannel(loop: self.loop)
- let secondH2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: secondChannel,
- inboundStreamInitializer: nil
- )
- try secondChannel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: secondH2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- secondChannelPromise.succeed(secondChannel)
- XCTAssertNoThrow(
- try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- // Write a SETTINGS frame on the root stream.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try secondChannel.writeInbound(frame))
- }
- // Now shutdown
- try self.waitForStateChange(from: .ready, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testGoAwayWhenReady() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self
- .waitForStateChange(from: .idle, to: .connecting) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Setup the channel.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- try channel.pipeline.addHandler(
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- )
- ).wait()
- channelPromise.succeed(channel)
- XCTAssertNoThrow(
- try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored"))
- .wait()
- )
- try self.waitForStateChange(from: .connecting, to: .ready) {
- // Write a SETTINGS frame on the root stream.
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- }
- // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now.
- XCTAssertNoThrow(try readyChannelMux.wait())
- // Send a GO_AWAY; the details don't matter. This will cause the connection to go idle and the
- // channel to close.
- try self.waitForStateChange(from: .ready, to: .idle) {
- let goAway = HTTP2Frame(
- streamID: .rootStream,
- payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil)
- )
- XCTAssertNoThrow(try channel.writeInbound(goAway))
- }
- self.loop.run()
- XCTAssertNoThrow(try channel.closeFuture.wait())
- // Now shutdown
- try self.waitForStateChange(from: .idle, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testDoomedOptimisticChannelFromIdle() {
- var configuration = self.defaultConfiguration
- configuration.callStartBehavior = .fastFailure
- let manager = ConnectionManager(
- configuration: configuration,
- channelProvider: HookedChannelProvider { _, loop in
- return loop.makeFailedFuture(DoomedChannelError())
- },
- connectivityDelegate: nil,
- logger: self.logger
- )
- let candidate = manager.getHTTP2Multiplexer()
- self.loop.run()
- XCTAssertThrowsError(try candidate.wait())
- }
- func testDoomedOptimisticChannelFromConnecting() throws {
- var configuration = self.defaultConfiguration
- configuration.callStartBehavior = .fastFailure
- let promise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return promise.futureResult
- }
- self.waitForStateChange(from: .idle, to: .connecting) {
- // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
- _ = manager.getHTTP2Multiplexer()
- self.loop.run()
- }
- // We're connecting: get an optimistic HTTP/2 stream multiplexer - this was selected in config.
- let optimisticChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- // Fail the promise.
- promise.fail(DoomedChannelError())
- XCTAssertThrowsError(try optimisticChannelMux.wait())
- }
- func testOptimisticChannelFromTransientFailure() throws {
- var configuration = self.defaultConfiguration
- configuration.callStartBehavior = .fastFailure
- configuration.connectionBackoff = ConnectionBackoff()
- let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
- self.loop.makeFailedFuture(DoomedChannelError())
- }
- self.waitForStateChanges([
- Change(from: .idle, to: .connecting),
- Change(from: .connecting, to: .transientFailure),
- ]) {
- // Trigger channel creation, and a connection attempt, we don't care about the HTTP/2 stream multiplexer.
- _ = manager.getHTTP2Multiplexer()
- self.loop.run()
- }
- // Now we're sitting in transient failure. Get a HTTP/2 stream mux optimistically - selected in config.
- let optimisticChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- XCTAssertThrowsError(try optimisticChannelMux.wait()) { error in
- XCTAssertTrue(error is DoomedChannelError)
- }
- }
- func testOptimisticChannelFromShutdown() throws {
- var configuration = self.defaultConfiguration
- configuration.callStartBehavior = .fastFailure
- let manager = self.makeConnectionManager { _, _ in
- return self.loop.makeFailedFuture(DoomedChannelError())
- }
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- // Get a channel optimistically. It'll fail, obviously.
- let channelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- XCTAssertThrowsError(try channelMux.wait())
- }
- func testForceIdleAfterInactive() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- // Start the connection.
- let readyChannelMux: EventLoopFuture<HTTP2StreamMultiplexer> = self.waitForStateChange(
- from: .idle,
- to: .connecting
- ) {
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Setup the real channel and activate it.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- XCTAssertNoThrow(try channel.pipeline.addHandlers([
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- ),
- ]).wait())
- channelPromise.succeed(channel)
- self.loop.run()
- let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
- XCTAssertNoThrow(try connect.wait())
- // Write a SETTINGS frame on the root stream.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- }
- // The channel should now be ready.
- XCTAssertNoThrow(try readyChannelMux.wait())
- // Now drop the connection.
- try self.waitForStateChange(from: .ready, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testCloseWithoutActiveRPCs() throws {
- let channelPromise = self.loop.makePromise(of: Channel.self)
- let manager = self.makeConnectionManager { _, _ in
- return channelPromise.futureResult
- }
- // Start the connection.
- let readyChannelMux = self.waitForStateChange(
- from: .idle,
- to: .connecting
- ) { () -> EventLoopFuture<HTTP2StreamMultiplexer> in
- let readyChannelMux = manager.getHTTP2Multiplexer()
- self.loop.run()
- return readyChannelMux
- }
- // Setup the actual channel and activate it.
- let channel = EmbeddedChannel(loop: self.loop)
- let h2mux = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- XCTAssertNoThrow(try channel.pipeline.addHandlers([
- GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: h2mux,
- idleTimeout: .minutes(5),
- keepalive: .init(),
- logger: self.logger
- ),
- ]).wait())
- channelPromise.succeed(channel)
- self.loop.run()
- let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
- XCTAssertNoThrow(try connect.wait())
- // "ready" the connection.
- try self.waitForStateChange(from: .connecting, to: .ready) {
- let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
- XCTAssertNoThrow(try channel.writeInbound(frame))
- }
- // The HTTP/2 stream multiplexer should now be ready.
- XCTAssertNoThrow(try readyChannelMux.wait())
- // Close the channel. There are no active RPCs so we should idle rather than be in the transient
- // failure state.
- self.waitForStateChange(from: .ready, to: .idle) {
- channel.pipeline.fireChannelInactive()
- }
- }
- func testIdleErrorDoesNothing() throws {
- let manager = self.makeConnectionManager()
- // Dropping an error on this manager should be fine.
- manager.channelError(DoomedChannelError())
- // Shutting down is then safe.
- try self.waitForStateChange(from: .idle, to: .shutdown) {
- let shutdown = manager.shutdown()
- self.loop.run()
- XCTAssertNoThrow(try shutdown.wait())
- }
- }
- func testHTTP2Delegates() throws {
- let channel = EmbeddedChannel(loop: self.loop)
- defer {
- XCTAssertNoThrow(try channel.finish())
- }
- let multiplexer = HTTP2StreamMultiplexer(
- mode: .client,
- channel: channel,
- inboundStreamInitializer: nil
- )
- class HTTP2Delegate: ConnectionManagerHTTP2Delegate {
- var streamsClosed = 0
- var maxConcurrentStreams = 0
- func streamClosed(_ connectionManager: ConnectionManager) {
- self.streamsClosed += 1
- }
- func receivedSettingsMaxConcurrentStreams(
- _ connectionManager: ConnectionManager,
- maxConcurrentStreams: Int
- ) {
- self.maxConcurrentStreams = maxConcurrentStreams
- }
- }
- let http2 = HTTP2Delegate()
- let manager = ConnectionManager(
- eventLoop: self.loop,
- channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture<Channel> in
- let idleHandler = GRPCIdleHandler(
- connectionManager: manager,
- multiplexer: multiplexer,
- idleTimeout: .minutes(5),
- keepalive: ClientConnectionKeepalive(),
- logger: self.logger
- )
- // We're going to cheat a bit by not putting the multiplexer in the channel. This allows
- // us to just fire stream created/closed events into the channel.
- do {
- try channel.pipeline.syncOperations.addHandler(idleHandler)
- } catch {
- return eventLoop.makeFailedFuture(error)
- }
- return eventLoop.makeSucceededFuture(channel)
- },
- callStartBehavior: .waitsForConnectivity,
- connectionBackoff: ConnectionBackoff(),
- connectivityDelegate: nil,
- http2Delegate: http2,
- logger: self.logger
- )
- // Start connecting.
- let futureMultiplexer = manager.getHTTP2Multiplexer()
- self.loop.run()
- // Do the actual connecting.
- XCTAssertNoThrow(try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")))
- // The channel isn't ready until it's seen a SETTINGS frame.
- func makeSettingsFrame(maxConcurrentStreams: Int) -> HTTP2Frame {
- let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)]
- return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings)))
- }
- XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42)))
- // We're ready now so the future multiplexer will resolve and we'll have seen an update to
- // max concurrent streams.
- XCTAssertNoThrow(try futureMultiplexer.wait())
- XCTAssertEqual(http2.maxConcurrentStreams, 42)
- XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13)))
- XCTAssertEqual(http2.maxConcurrentStreams, 13)
- // Open some streams.
- for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
- let streamCreated = NIOHTTP2StreamCreatedEvent(
- streamID: streamID,
- localInitialWindowSize: nil,
- remoteInitialWindowSize: nil
- )
- channel.pipeline.fireUserInboundEventTriggered(streamCreated)
- }
- // ... and then close them.
- for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) {
- let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil)
- channel.pipeline.fireUserInboundEventTriggered(streamClosed)
- }
- XCTAssertEqual(http2.streamsClosed, 4)
- }
- }
- internal struct Change: Hashable, CustomStringConvertible {
- var from: ConnectivityState
- var to: ConnectivityState
- var description: String {
- return "\(self.from) → \(self.to)"
- }
- }
- internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
- private let serialQueue = DispatchQueue(label: "io.grpc.testing")
- private let semaphore = DispatchSemaphore(value: 0)
- private var expectation: Expectation = .noExpectation
- private let quiescingSemaphore = DispatchSemaphore(value: 0)
- private enum Expectation {
- /// We have no expectation of any changes. We'll just ignore any changes.
- case noExpectation
- /// We expect one change.
- case one((Change) -> Void)
- /// We expect 'count' changes.
- case some(count: Int, recorded: [Change], ([Change]) -> Void)
- var count: Int {
- switch self {
- case .noExpectation:
- return 0
- case .one:
- return 1
- case let .some(count, _, _):
- return count
- }
- }
- }
- func connectivityStateDidChange(from oldState: ConnectivityState,
- to newState: ConnectivityState) {
- self.serialQueue.async {
- switch self.expectation {
- case let .one(verify):
- // We don't care about future changes.
- self.expectation = .noExpectation
- // Verify and notify.
- verify(Change(from: oldState, to: newState))
- self.semaphore.signal()
- case .some(let count, var recorded, let verify):
- recorded.append(Change(from: oldState, to: newState))
- if recorded.count == count {
- // We don't care about future changes.
- self.expectation = .noExpectation
- // Verify and notify.
- verify(recorded)
- self.semaphore.signal()
- } else {
- // Still need more responses.
- self.expectation = .some(count: count, recorded: recorded, verify)
- }
- case .noExpectation:
- // Ignore any changes.
- ()
- }
- }
- }
- func connectionStartedQuiescing() {
- self.serialQueue.async {
- self.quiescingSemaphore.signal()
- }
- }
- func expectChanges(_ count: Int, verify: @escaping ([Change]) -> Void) {
- self.serialQueue.async {
- self.expectation = .some(count: count, recorded: [], verify)
- }
- }
- func expectChange(verify: @escaping (Change) -> Void) {
- self.serialQueue.async {
- self.expectation = .one(verify)
- }
- }
- func waitForExpectedChanges(
- timeout: DispatchTimeInterval,
- file: StaticString = #file,
- line: UInt = #line
- ) {
- let result = self.semaphore.wait(timeout: .now() + timeout)
- switch result {
- case .success:
- ()
- case .timedOut:
- XCTFail(
- "Timed out before verifying \(self.expectation.count) change(s)",
- file: file, line: line
- )
- }
- }
- func waitForQuiescing(timeout: DispatchTimeInterval) {
- let result = self.quiescingSemaphore.wait(timeout: .now() + timeout)
- switch result {
- case .success:
- ()
- case .timedOut:
- XCTFail("Timed out waiting for connection to start quiescing")
- }
- }
- }
- private extension ConnectionBackoff {
- static let oneSecondFixed = ConnectionBackoff(
- initialBackoff: 1.0,
- maximumBackoff: 1.0,
- multiplier: 1.0,
- jitter: 0.0
- )
- }
- private struct DoomedChannelError: Error {}
- internal struct HookedChannelProvider: ConnectionManagerChannelProvider {
- internal var provider: (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
- init(_ provider: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>) {
- self.provider = provider
- }
- func makeChannel(
- managedBy connectionManager: ConnectionManager,
- onEventLoop eventLoop: EventLoop,
- connectTimeout: TimeAmount?,
- logger: Logger
- ) -> EventLoopFuture<Channel> {
- return self.provider(connectionManager, eventLoop)
- }
- }
|