ConnectionTests.swift 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. import GRPCCore
  18. import GRPCHTTP2Core
  19. import NIOCore
  20. import NIOHPACK
  21. import NIOHTTP2
  22. import NIOPosix
  23. import XCTest
  24. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  25. final class ConnectionTests: XCTestCase {
  26. func testConnectThenClose() async throws {
  27. try await ConnectionTest.run(connector: .posix()) { context, event in
  28. switch event {
  29. case .connectSucceeded:
  30. context.connection.close()
  31. default:
  32. ()
  33. }
  34. } validateEvents: { _, events in
  35. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  36. }
  37. }
  38. func testConnectThenIdleTimeout() async throws {
  39. try await ConnectionTest.run(connector: .posix(maxIdleTime: .milliseconds(50))) { _, events in
  40. XCTAssertEqual(events, [.connectSucceeded, .closed(.idleTimeout)])
  41. }
  42. }
  43. func testConnectThenKeepaliveTimeout() async throws {
  44. try await ConnectionTest.run(
  45. connector: .posix(
  46. keepaliveTime: .milliseconds(50),
  47. keepaliveTimeout: .milliseconds(10),
  48. keepaliveWithoutCalls: true,
  49. dropPingAcks: true
  50. )
  51. ) { _, events in
  52. XCTAssertEqual(events, [.connectSucceeded, .closed(.keepaliveTimeout)])
  53. }
  54. }
  55. func testGoAwayWhenConnected() async throws {
  56. try await ConnectionTest.run(connector: .posix()) { context, event in
  57. switch event {
  58. case .connectSucceeded:
  59. let goAway = HTTP2Frame(
  60. streamID: .rootStream,
  61. payload: .goAway(
  62. lastStreamID: 0,
  63. errorCode: .noError,
  64. opaqueData: ByteBuffer(string: "Hello!")
  65. )
  66. )
  67. let accepted = try context.server.acceptedChannel
  68. accepted.writeAndFlush(goAway, promise: nil)
  69. default:
  70. ()
  71. }
  72. } validateEvents: { _, events in
  73. XCTAssertEqual(events, [.connectSucceeded, .goingAway(.noError, "Hello!"), .closed(.remote)])
  74. }
  75. }
  76. func testConnectionDropWhenConnected() async throws {
  77. try await ConnectionTest.run(connector: .posix()) { context, event in
  78. switch event {
  79. case .connectSucceeded:
  80. let accepted = try context.server.acceptedChannel
  81. accepted.close(mode: .all, promise: nil)
  82. default:
  83. ()
  84. }
  85. } validateEvents: { _, events in
  86. let error = RPCError(
  87. code: .unavailable,
  88. message: "The TCP connection was dropped unexpectedly."
  89. )
  90. let expected: [Connection.Event] = [.connectSucceeded, .closed(.error(error, wasIdle: true))]
  91. XCTAssertEqual(events, expected)
  92. }
  93. }
  94. func testConnectFails() async throws {
  95. let error = RPCError(code: .unimplemented, message: "")
  96. try await ConnectionTest.run(connector: .throwing(error)) { _, events in
  97. XCTAssertEqual(events, [.connectFailed(error)])
  98. }
  99. }
  100. func testConnectFailsOnAcceptedThenClosedTCPConnection() async throws {
  101. try await ConnectionTest.run(connector: .posix(), server: .closeOnAccept) { _, events in
  102. XCTAssertEqual(events.count, 1)
  103. let event = try XCTUnwrap(events.first)
  104. switch event {
  105. case .connectFailed(let error):
  106. XCTAssert(error, as: RPCError.self) { rpcError in
  107. XCTAssertEqual(rpcError.code, .unavailable)
  108. }
  109. default:
  110. XCTFail("Expected '.connectFailed', got '\(event)'")
  111. }
  112. }
  113. }
  114. func testMakeStreamOnActiveConnection() async throws {
  115. try await ConnectionTest.run(connector: .posix()) { context, event in
  116. switch event {
  117. case .connectSucceeded:
  118. let stream = try await context.connection.makeStream(
  119. descriptor: .echoGet,
  120. options: .defaults
  121. )
  122. try await stream.execute { inbound, outbound in
  123. try await outbound.write(.metadata(["foo": "bar", "bar": "baz"]))
  124. try await outbound.write(.message([0, 1, 2]))
  125. outbound.finish()
  126. var parts = [RPCResponsePart]()
  127. for try await part in inbound {
  128. switch part {
  129. case .metadata(let metadata):
  130. // Filter out any transport specific metadata
  131. parts.append(.metadata(Metadata(metadata.suffix(2))))
  132. case .message, .status:
  133. parts.append(part)
  134. }
  135. }
  136. let expected: [RPCResponsePart] = [
  137. .metadata(["foo": "bar", "bar": "baz"]),
  138. .message([0, 1, 2]),
  139. .status(Status(code: .ok, message: ""), [:]),
  140. ]
  141. XCTAssertEqual(parts, expected)
  142. }
  143. context.connection.close()
  144. default:
  145. ()
  146. }
  147. } validateEvents: { _, events in
  148. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  149. }
  150. }
  151. func testMakeStreamOnClosedConnection() async throws {
  152. try await ConnectionTest.run(connector: .posix()) { context, event in
  153. switch event {
  154. case .connectSucceeded:
  155. context.connection.close()
  156. case .closed:
  157. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  158. _ = try await context.connection.makeStream(descriptor: .echoGet, options: .defaults)
  159. } errorHandler: { error in
  160. XCTAssertEqual(error.code, .unavailable)
  161. }
  162. default:
  163. ()
  164. }
  165. } validateEvents: { context, events in
  166. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  167. }
  168. }
  169. func testMakeStreamOnNotRunningConnection() async throws {
  170. let connection = Connection(
  171. address: .ipv4(host: "ignored", port: 0),
  172. http2Connector: .never,
  173. defaultCompression: .none,
  174. enabledCompression: .none
  175. )
  176. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  177. _ = try await connection.makeStream(descriptor: .echoGet, options: .defaults)
  178. } errorHandler: { error in
  179. XCTAssertEqual(error.code, .unavailable)
  180. }
  181. }
  182. }
  183. extension ClientBootstrap {
  184. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  185. func connect<T: Sendable>(
  186. to address: GRPCHTTP2Core.SocketAddress,
  187. _ configure: @Sendable @escaping (any Channel) -> EventLoopFuture<T>
  188. ) async throws -> T {
  189. if let ipv4 = address.ipv4 {
  190. return try await self.connect(
  191. host: ipv4.host,
  192. port: ipv4.port,
  193. channelInitializer: configure
  194. )
  195. } else if let ipv6 = address.ipv6 {
  196. return try await self.connect(
  197. host: ipv6.host,
  198. port: ipv6.port,
  199. channelInitializer: configure
  200. )
  201. } else if let uds = address.unixDomainSocket {
  202. return try await self.connect(
  203. unixDomainSocketPath: uds.path,
  204. channelInitializer: configure
  205. )
  206. } else if let vsock = address.virtualSocket {
  207. return try await self.connect(
  208. to: VsockAddress(
  209. cid: .init(Int(vsock.contextID.rawValue)),
  210. port: .init(Int(vsock.port.rawValue))
  211. ),
  212. channelInitializer: configure
  213. )
  214. } else {
  215. throw RPCError(code: .unimplemented, message: "Unhandled socket address: \(address)")
  216. }
  217. }
  218. }
  219. extension Metadata {
  220. init(_ sequence: some Sequence<Element>) {
  221. var metadata = Metadata()
  222. for (key, value) in sequence {
  223. switch value {
  224. case .string(let value):
  225. metadata.addString(value, forKey: key)
  226. case .binary(let value):
  227. metadata.addBinary(value, forKey: key)
  228. }
  229. }
  230. self = metadata
  231. }
  232. }