ConnectionTests.swift 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. import GRPCCore
  18. @_spi(Package) @testable import GRPCHTTP2Core
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOHTTP2
  23. import NIOPosix
  24. import XCTest
  25. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  26. final class ConnectionTests: XCTestCase {
  27. func testConnectThenClose() async throws {
  28. try await ConnectionTest.run(connector: .posix()) { context, event in
  29. switch event {
  30. case .connectSucceeded:
  31. context.connection.close()
  32. default:
  33. ()
  34. }
  35. } validateEvents: { _, events in
  36. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  37. }
  38. }
  39. func testConnectThenIdleTimeout() async throws {
  40. try await ConnectionTest.run(connector: .posix(maxIdleTime: .milliseconds(50))) { _, events in
  41. XCTAssertEqual(events, [.connectSucceeded, .closed(.idleTimeout)])
  42. }
  43. }
  44. func testConnectThenKeepaliveTimeout() async throws {
  45. try await ConnectionTest.run(
  46. connector: .posix(
  47. keepaliveTime: .milliseconds(50),
  48. keepaliveTimeout: .milliseconds(10),
  49. keepaliveWithoutCalls: true,
  50. dropPingAcks: true
  51. )
  52. ) { _, events in
  53. XCTAssertEqual(events, [.connectSucceeded, .closed(.keepaliveTimeout)])
  54. }
  55. }
  56. func testGoAwayWhenConnected() async throws {
  57. try await ConnectionTest.run(connector: .posix()) { context, event in
  58. switch event {
  59. case .connectSucceeded:
  60. let goAway = HTTP2Frame(
  61. streamID: .rootStream,
  62. payload: .goAway(
  63. lastStreamID: 0,
  64. errorCode: .noError,
  65. opaqueData: ByteBuffer(string: "Hello!")
  66. )
  67. )
  68. let accepted = try context.server.acceptedChannel
  69. accepted.writeAndFlush(goAway, promise: nil)
  70. default:
  71. ()
  72. }
  73. } validateEvents: { _, events in
  74. XCTAssertEqual(events, [.connectSucceeded, .goingAway(.noError, "Hello!"), .closed(.remote)])
  75. }
  76. }
  77. func testConnectFails() async throws {
  78. let error = RPCError(code: .unimplemented, message: "")
  79. try await ConnectionTest.run(connector: .throwing(error)) { _, events in
  80. XCTAssertEqual(events, [.connectFailed(error)])
  81. }
  82. }
  83. func testConnectFailsOnAcceptedThenClosedTCPConnection() async throws {
  84. try await ConnectionTest.run(connector: .posix(), server: .closeOnAccept) { _, events in
  85. XCTAssertEqual(events.count, 1)
  86. let event = try XCTUnwrap(events.first)
  87. switch event {
  88. case .connectFailed(let error):
  89. XCTAssert(error, as: RPCError.self) { rpcError in
  90. XCTAssertEqual(rpcError.code, .unavailable)
  91. }
  92. default:
  93. XCTFail("Expected '.connectFailed', got '\(event)'")
  94. }
  95. }
  96. }
  97. func testMakeStreamOnActiveConnection() async throws {
  98. try await ConnectionTest.run(connector: .posix()) { context, event in
  99. switch event {
  100. case .connectSucceeded:
  101. let stream = try await context.connection.makeStream(
  102. descriptor: .echoGet,
  103. options: .defaults
  104. )
  105. try await stream.execute { inbound, outbound in
  106. try await outbound.write(.metadata(["foo": "bar", "bar": "baz"]))
  107. try await outbound.write(.message([0, 1, 2]))
  108. outbound.finish()
  109. var parts = [RPCResponsePart]()
  110. for try await part in inbound {
  111. switch part {
  112. case .metadata(let metadata):
  113. // Filter out any transport specific metadata
  114. parts.append(.metadata(Metadata(metadata.suffix(2))))
  115. case .message, .status:
  116. parts.append(part)
  117. }
  118. }
  119. let expected: [RPCResponsePart] = [
  120. .metadata(["foo": "bar", "bar": "baz"]),
  121. .message([0, 1, 2]),
  122. .status(Status(code: .ok, message: ""), [:]),
  123. ]
  124. XCTAssertEqual(parts, expected)
  125. }
  126. context.connection.close()
  127. default:
  128. ()
  129. }
  130. } validateEvents: { _, events in
  131. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  132. }
  133. }
  134. func testMakeStreamOnClosedConnection() async throws {
  135. try await ConnectionTest.run(connector: .posix()) { context, event in
  136. switch event {
  137. case .connectSucceeded:
  138. context.connection.close()
  139. case .closed:
  140. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  141. _ = try await context.connection.makeStream(descriptor: .echoGet, options: .defaults)
  142. } errorHandler: { error in
  143. XCTAssertEqual(error.code, .unavailable)
  144. }
  145. default:
  146. ()
  147. }
  148. } validateEvents: { context, events in
  149. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  150. }
  151. }
  152. func testMakeStreamOnNotRunningConnection() async throws {
  153. let connection = Connection(
  154. address: .ipv4(host: "ignored", port: 0),
  155. http2Connector: .never,
  156. defaultCompression: .none,
  157. enabledCompression: .none
  158. )
  159. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  160. _ = try await connection.makeStream(descriptor: .echoGet, options: .defaults)
  161. } errorHandler: { error in
  162. XCTAssertEqual(error.code, .unavailable)
  163. }
  164. }
  165. }
  166. extension ClientBootstrap {
  167. func connect<T>(
  168. to address: GRPCHTTP2Core.SocketAddress,
  169. _ configure: @Sendable @escaping (Channel) -> EventLoopFuture<T>
  170. ) async throws -> T {
  171. if let ipv4 = address.ipv4 {
  172. return try await self.connect(
  173. host: ipv4.host,
  174. port: ipv4.port,
  175. channelInitializer: configure
  176. )
  177. } else if let ipv6 = address.ipv6 {
  178. return try await self.connect(
  179. host: ipv6.host,
  180. port: ipv6.port,
  181. channelInitializer: configure
  182. )
  183. } else if let uds = address.unixDomainSocket {
  184. return try await self.connect(
  185. unixDomainSocketPath: uds.path,
  186. channelInitializer: configure
  187. )
  188. } else if let vsock = address.virtualSocket {
  189. return try await self.connect(
  190. to: VsockAddress(
  191. cid: .init(Int(vsock.contextID.rawValue)),
  192. port: .init(Int(vsock.port.rawValue))
  193. ),
  194. channelInitializer: configure
  195. )
  196. } else {
  197. throw RPCError(code: .unimplemented, message: "Unhandled socket address: \(address)")
  198. }
  199. }
  200. }
  201. extension Metadata {
  202. init(_ sequence: some Sequence<Element>) {
  203. var metadata = Metadata()
  204. for (key, value) in sequence {
  205. switch value {
  206. case .string(let value):
  207. metadata.addString(value, forKey: key)
  208. case .binary(let value):
  209. metadata.addBinary(value, forKey: key)
  210. }
  211. }
  212. self = metadata
  213. }
  214. }