|
|
@@ -41,26 +41,30 @@ class ConnectionManagerTests: GRPCTestCase {
|
|
|
from: ConnectivityState,
|
|
|
to: ConnectivityState,
|
|
|
timeout: DispatchTimeInterval = .seconds(1),
|
|
|
+ file: StaticString = #file,
|
|
|
+ line: UInt = #line,
|
|
|
body: () throws -> Result
|
|
|
) rethrows -> Result {
|
|
|
self.recorder.expectChange {
|
|
|
- XCTAssertEqual($0, Change(from: from, to: to))
|
|
|
+ XCTAssertEqual($0, Change(from: from, to: to), file: file, line: line)
|
|
|
}
|
|
|
let result = try body()
|
|
|
- self.recorder.waitForExpectedChanges(timeout: timeout)
|
|
|
+ self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
private func waitForStateChanges<Result>(
|
|
|
_ changes: [Change],
|
|
|
timeout: DispatchTimeInterval = .seconds(1),
|
|
|
+ file: StaticString = #file,
|
|
|
+ line: UInt = #line,
|
|
|
body: () throws -> Result
|
|
|
) rethrows -> Result {
|
|
|
self.recorder.expectChanges(changes.count) {
|
|
|
XCTAssertEqual($0, changes)
|
|
|
}
|
|
|
let result = try body()
|
|
|
- self.recorder.waitForExpectedChanges(timeout: timeout)
|
|
|
+ self.recorder.waitForExpectedChanges(timeout: timeout, file: file, line: line)
|
|
|
return result
|
|
|
}
|
|
|
}
|
|
|
@@ -115,12 +119,10 @@ extension ConnectionManagerTests {
|
|
|
}
|
|
|
|
|
|
// Start the connection.
|
|
|
- let readyChannel: EventLoopFuture<Channel> = self
|
|
|
- .waitForStateChange(from: .idle, to: .connecting) {
|
|
|
- let readyChannel = manager.getChannel()
|
|
|
- self.loop.run()
|
|
|
- return readyChannel
|
|
|
- }
|
|
|
+ self.waitForStateChange(from: .idle, to: .connecting) {
|
|
|
+ _ = manager.getChannel()
|
|
|
+ self.loop.run()
|
|
|
+ }
|
|
|
|
|
|
// Setup the real channel and activate it.
|
|
|
let channel = EmbeddedChannel(
|
|
|
@@ -141,8 +143,10 @@ extension ConnectionManagerTests {
|
|
|
|
|
|
// Close the channel.
|
|
|
try self.waitForStateChange(from: .ready, to: .shutdown) {
|
|
|
- // Now the channel should be available: shut it down,
|
|
|
- XCTAssertNoThrow(try readyChannel.flatMap { $0.close(mode: .all) }.wait())
|
|
|
+ // Now the channel should be available: shut it down.
|
|
|
+ let shutdown = manager.shutdown()
|
|
|
+ self.loop.run()
|
|
|
+ XCTAssertNoThrow(try shutdown.wait())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -289,7 +293,9 @@ extension ConnectionManagerTests {
|
|
|
|
|
|
try self.waitForStateChange(from: .connecting, to: .shutdown) {
|
|
|
// Okay: now close the channel; the `readyChannel` future has not been completed yet.
|
|
|
- XCTAssertNoThrow(try channel.close(mode: .all).wait())
|
|
|
+ let shutdown = manager.shutdown()
|
|
|
+ self.loop.run()
|
|
|
+ XCTAssertNoThrow(try shutdown.wait())
|
|
|
}
|
|
|
|
|
|
// We failed to get a channel and we don't have reconnect configured: we should be shutdown and
|
|
|
@@ -590,7 +596,14 @@ extension ConnectionManagerTests {
|
|
|
// Channel should now be ready.
|
|
|
XCTAssertNoThrow(try readyChannel.wait())
|
|
|
|
|
|
- // Kill the first channel.
|
|
|
+ // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle.
|
|
|
+ let streamCreated = NIOHTTP2StreamCreatedEvent(
|
|
|
+ streamID: 1,
|
|
|
+ localInitialWindowSize: nil,
|
|
|
+ remoteInitialWindowSize: nil
|
|
|
+ )
|
|
|
+ firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated)
|
|
|
+
|
|
|
try self.waitForStateChange(from: .ready, to: .transientFailure) {
|
|
|
XCTAssertNoThrow(try firstChannel.close().wait())
|
|
|
}
|
|
|
@@ -865,14 +878,62 @@ extension ConnectionManagerTests {
|
|
|
XCTAssertNoThrow(try readyChannel.wait())
|
|
|
|
|
|
// Now drop the connection.
|
|
|
- self.waitForStateChange(from: .ready, to: .shutdown) {
|
|
|
- channel.pipeline.fireChannelInactive()
|
|
|
+ try self.waitForStateChange(from: .ready, to: .shutdown) {
|
|
|
+ let shutdown = manager.shutdown()
|
|
|
+ self.loop.run()
|
|
|
+ XCTAssertNoThrow(try shutdown.wait())
|
|
|
}
|
|
|
|
|
|
// Fire a connection idled event, i.e. keepalive timeout has fired. This should be a no-op.
|
|
|
// Previously this would hit a precondition failure.
|
|
|
channel.pipeline.fireUserInboundEventTriggered(ConnectionIdledEvent())
|
|
|
}
|
|
|
+
|
|
|
+ func testCloseWithoutActiveRPCs() throws {
|
|
|
+ let channelPromise = self.loop.makePromise(of: Channel.self)
|
|
|
+ let manager = ConnectionManager.testingOnly(
|
|
|
+ configuration: self.defaultConfiguration,
|
|
|
+ logger: self.logger
|
|
|
+ ) {
|
|
|
+ channelPromise.futureResult
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start the connection.
|
|
|
+ let readyChannel = self.waitForStateChange(
|
|
|
+ from: .idle,
|
|
|
+ to: .connecting
|
|
|
+ ) { () -> EventLoopFuture<Channel> in
|
|
|
+ let readyChannel = manager.getChannel()
|
|
|
+ self.loop.run()
|
|
|
+ return readyChannel
|
|
|
+ }
|
|
|
+
|
|
|
+ // Setup the actual channel and activate it.
|
|
|
+ let channel = EmbeddedChannel(loop: self.loop)
|
|
|
+ XCTAssertNoThrow(try channel.pipeline.addHandlers([
|
|
|
+ GRPCIdleHandler(mode: .client(manager), logger: manager.logger),
|
|
|
+ ]).wait())
|
|
|
+ channelPromise.succeed(channel)
|
|
|
+ self.loop.run()
|
|
|
+
|
|
|
+ let connect = channel.connect(to: try SocketAddress(unixDomainSocketPath: "/ignored"))
|
|
|
+ XCTAssertNoThrow(try connect.wait())
|
|
|
+
|
|
|
+ // "ready" the connection.
|
|
|
+ try self.waitForStateChange(from: .connecting, to: .ready) {
|
|
|
+ let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
|
|
|
+ XCTAssertNoThrow(try channel.writeInbound(frame))
|
|
|
+ }
|
|
|
+
|
|
|
+ // The channel should now be ready.
|
|
|
+ XCTAssertNoThrow(try readyChannel.wait())
|
|
|
+
|
|
|
+ // Close the channel. There are no active RPCs so we should idle rather than be in the transient
|
|
|
+ // failure state.
|
|
|
+ self.waitForStateChange(from: .ready, to: .idle) {
|
|
|
+ channel.pipeline.fireChannelInactive()
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
internal struct Change: Hashable, CustomStringConvertible {
|
|
|
@@ -956,13 +1017,20 @@ internal class RecordingConnectivityDelegate: ConnectivityStateDelegate {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- func waitForExpectedChanges(timeout: DispatchTimeInterval) {
|
|
|
+ func waitForExpectedChanges(
|
|
|
+ timeout: DispatchTimeInterval,
|
|
|
+ file: StaticString = #file,
|
|
|
+ line: UInt = #line
|
|
|
+ ) {
|
|
|
let result = self.semaphore.wait(timeout: .now() + timeout)
|
|
|
switch result {
|
|
|
case .success:
|
|
|
()
|
|
|
case .timedOut:
|
|
|
- XCTFail("Timed out before verifying \(self.expectation.count) change(s)")
|
|
|
+ XCTFail(
|
|
|
+ "Timed out before verifying \(self.expectation.count) change(s)",
|
|
|
+ file: file, line: line
|
|
|
+ )
|
|
|
}
|
|
|
}
|
|
|
}
|