ConnectionTests.swift 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. import GRPCCore
  18. @_spi(Package) @testable import GRPCHTTP2Core
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOHTTP2
  23. import NIOPosix
  24. import XCTest
  25. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  26. final class ConnectionTests: XCTestCase {
  27. func testConnectThenClose() async throws {
  28. try await ConnectionTest.run(connector: .posix()) { context, event in
  29. switch event {
  30. case .connectSucceeded:
  31. context.connection.close()
  32. default:
  33. ()
  34. }
  35. } validateEvents: { _, events in
  36. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  37. }
  38. }
  39. func testConnectThenIdleTimeout() async throws {
  40. try await ConnectionTest.run(connector: .posix(maxIdleTime: .milliseconds(50))) { _, events in
  41. XCTAssertEqual(events, [.connectSucceeded, .closed(.idleTimeout)])
  42. }
  43. }
  44. func testConnectThenKeepaliveTimeout() async throws {
  45. try await ConnectionTest.run(
  46. connector: .posix(
  47. keepaliveTime: .milliseconds(50),
  48. keepaliveTimeout: .milliseconds(10),
  49. keepaliveWithoutCalls: true,
  50. dropPingAcks: true
  51. )
  52. ) { _, events in
  53. XCTAssertEqual(events, [.connectSucceeded, .closed(.keepaliveTimeout)])
  54. }
  55. }
  56. func testGoAwayWhenConnected() async throws {
  57. try await ConnectionTest.run(connector: .posix()) { context, event in
  58. switch event {
  59. case .connectSucceeded:
  60. let goAway = HTTP2Frame(
  61. streamID: .rootStream,
  62. payload: .goAway(
  63. lastStreamID: 0,
  64. errorCode: .noError,
  65. opaqueData: ByteBuffer(string: "Hello!")
  66. )
  67. )
  68. let accepted = try context.server.acceptedChannel
  69. accepted.writeAndFlush(goAway, promise: nil)
  70. default:
  71. ()
  72. }
  73. } validateEvents: { _, events in
  74. XCTAssertEqual(events, [.connectSucceeded, .goingAway(.noError, "Hello!"), .closed(.remote)])
  75. }
  76. }
  77. func testConnectFails() async throws {
  78. let error = RPCError(code: .unimplemented, message: "")
  79. try await ConnectionTest.run(connector: .throwing(error)) { _, events in
  80. XCTAssertEqual(events, [.connectFailed(error)])
  81. }
  82. }
  83. func testConnectFailsOnAcceptedThenClosedTCPConnection() async throws {
  84. try await ConnectionTest.run(connector: .posix(), server: .closeOnAccept) { _, events in
  85. XCTAssertEqual(events.count, 1)
  86. let event = try XCTUnwrap(events.first)
  87. switch event {
  88. case .connectFailed(let error):
  89. XCTAssert(error, as: RPCError.self) { rpcError in
  90. XCTAssertEqual(rpcError.code, .unavailable)
  91. }
  92. default:
  93. XCTFail("Expected '.connectFailed', got '\(event)'")
  94. }
  95. }
  96. }
  97. func testMakeStreamOnActiveConnection() async throws {
  98. try await ConnectionTest.run(connector: .posix()) { context, event in
  99. switch event {
  100. case .connectSucceeded:
  101. let stream = try await context.connection.makeStream(
  102. descriptor: .echoGet,
  103. options: .defaults
  104. )
  105. try await stream.execute { inbound, outbound in
  106. try await outbound.write(.metadata(["foo": "bar", "bar": "baz"]))
  107. try await outbound.write(.message([0, 1, 2]))
  108. outbound.finish()
  109. var parts = [RPCResponsePart]()
  110. for try await part in inbound {
  111. switch part {
  112. case .metadata(let metadata):
  113. // Filter out any transport specific metadata
  114. parts.append(.metadata(Metadata(metadata.suffix(2))))
  115. case .message, .status:
  116. parts.append(part)
  117. }
  118. }
  119. let expected: [RPCResponsePart] = [
  120. .metadata(["foo": "bar", "bar": "baz"]),
  121. .message([0, 1, 2]),
  122. .status(Status(code: .ok, message: ""), [:]),
  123. ]
  124. XCTAssertEqual(parts, expected)
  125. }
  126. context.connection.close()
  127. default:
  128. ()
  129. }
  130. } validateEvents: { _, events in
  131. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  132. }
  133. }
  134. func testMakeStreamOnClosedConnection() async throws {
  135. try await ConnectionTest.run(connector: .posix()) { context, event in
  136. switch event {
  137. case .connectSucceeded:
  138. context.connection.close()
  139. case .closed:
  140. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  141. _ = try await context.connection.makeStream(descriptor: .echoGet, options: .defaults)
  142. } errorHandler: { error in
  143. XCTAssertEqual(error.code, .unavailable)
  144. }
  145. default:
  146. ()
  147. }
  148. } validateEvents: { context, events in
  149. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  150. }
  151. }
  152. func testMakeStreamOnNotRunningConnection() async throws {
  153. let connection = Connection(
  154. address: .ipv4(host: "ignored", port: 0),
  155. http2Connector: .never,
  156. defaultCompression: .none,
  157. enabledCompression: .none
  158. )
  159. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  160. _ = try await connection.makeStream(descriptor: .echoGet, options: .defaults)
  161. } errorHandler: { error in
  162. XCTAssertEqual(error.code, .unavailable)
  163. }
  164. }
  165. }
  166. extension ClientBootstrap {
  167. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  168. func connect<T>(
  169. to address: GRPCHTTP2Core.SocketAddress,
  170. _ configure: @Sendable @escaping (Channel) -> EventLoopFuture<T>
  171. ) async throws -> T {
  172. if let ipv4 = address.ipv4 {
  173. return try await self.connect(
  174. host: ipv4.host,
  175. port: ipv4.port,
  176. channelInitializer: configure
  177. )
  178. } else if let ipv6 = address.ipv6 {
  179. return try await self.connect(
  180. host: ipv6.host,
  181. port: ipv6.port,
  182. channelInitializer: configure
  183. )
  184. } else if let uds = address.unixDomainSocket {
  185. return try await self.connect(
  186. unixDomainSocketPath: uds.path,
  187. channelInitializer: configure
  188. )
  189. } else if let vsock = address.virtualSocket {
  190. return try await self.connect(
  191. to: VsockAddress(
  192. cid: .init(Int(vsock.contextID.rawValue)),
  193. port: .init(Int(vsock.port.rawValue))
  194. ),
  195. channelInitializer: configure
  196. )
  197. } else {
  198. throw RPCError(code: .unimplemented, message: "Unhandled socket address: \(address)")
  199. }
  200. }
  201. }
  202. extension Metadata {
  203. init(_ sequence: some Sequence<Element>) {
  204. var metadata = Metadata()
  205. for (key, value) in sequence {
  206. switch value {
  207. case .string(let value):
  208. metadata.addString(value, forKey: key)
  209. case .binary(let value):
  210. metadata.addBinary(value, forKey: key)
  211. }
  212. }
  213. self = metadata
  214. }
  215. }