ConnectionTests.swift 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import DequeModule
  17. import GRPCCore
  18. @_spi(Package) @testable import GRPCHTTP2Core
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOHPACK
  22. import NIOHTTP2
  23. import NIOPosix
  24. import XCTest
  25. @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
  26. final class ConnectionTests: XCTestCase {
  27. func testConnectThenClose() async throws {
  28. try await ConnectionTest.run(connector: .posix()) { context, event in
  29. switch event {
  30. case .connectSucceeded:
  31. context.connection.close()
  32. default:
  33. ()
  34. }
  35. } validateEvents: { _, events in
  36. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  37. }
  38. }
  39. func testConnectThenIdleTimeout() async throws {
  40. try await ConnectionTest.run(connector: .posix(maxIdleTime: .milliseconds(50))) { _, events in
  41. XCTAssertEqual(events, [.connectSucceeded, .closed(.idleTimeout)])
  42. }
  43. }
  44. func testConnectThenKeepaliveTimeout() async throws {
  45. try await ConnectionTest.run(
  46. connector: .posix(
  47. keepaliveTime: .milliseconds(50),
  48. keepaliveTimeout: .milliseconds(10),
  49. keepaliveWithoutCalls: true,
  50. dropPingAcks: true
  51. )
  52. ) { _, events in
  53. XCTAssertEqual(events, [.connectSucceeded, .closed(.keepaliveTimeout)])
  54. }
  55. }
  56. func testGoAwayWhenConnected() async throws {
  57. try await ConnectionTest.run(connector: .posix()) { context, event in
  58. switch event {
  59. case .connectSucceeded:
  60. let goAway = HTTP2Frame(
  61. streamID: .rootStream,
  62. payload: .goAway(
  63. lastStreamID: 0,
  64. errorCode: .noError,
  65. opaqueData: ByteBuffer(string: "Hello!")
  66. )
  67. )
  68. let accepted = try context.server.acceptedChannel
  69. accepted.writeAndFlush(goAway, promise: nil)
  70. default:
  71. ()
  72. }
  73. } validateEvents: { _, events in
  74. XCTAssertEqual(events, [.connectSucceeded, .goingAway(.noError, "Hello!"), .closed(.remote)])
  75. }
  76. }
  77. func testConnectFails() async throws {
  78. let error = RPCError(code: .unimplemented, message: "")
  79. try await ConnectionTest.run(connector: .throwing(error)) { _, events in
  80. XCTAssertEqual(events, [.connectFailed(error)])
  81. }
  82. }
  83. func testMakeStreamOnActiveConnection() async throws {
  84. try await ConnectionTest.run(connector: .posix()) { context, event in
  85. switch event {
  86. case .connectSucceeded:
  87. let stream = try await context.connection.makeStream(
  88. descriptor: .echoGet,
  89. options: .defaults
  90. )
  91. try await stream.execute { inbound, outbound in
  92. try await outbound.write(.metadata(["foo": "bar", "bar": "baz"]))
  93. try await outbound.write(.message([0, 1, 2]))
  94. outbound.finish()
  95. var parts = [RPCResponsePart]()
  96. for try await part in inbound {
  97. switch part {
  98. case .metadata(let metadata):
  99. // Filter out any transport specific metadata
  100. parts.append(.metadata(Metadata(metadata.suffix(2))))
  101. case .message, .status:
  102. parts.append(part)
  103. }
  104. }
  105. let expected: [RPCResponsePart] = [
  106. .metadata(["foo": "bar", "bar": "baz"]),
  107. .message([0, 1, 2]),
  108. .status(Status(code: .ok, message: ""), [:]),
  109. ]
  110. XCTAssertEqual(parts, expected)
  111. }
  112. context.connection.close()
  113. default:
  114. ()
  115. }
  116. } validateEvents: { _, events in
  117. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  118. }
  119. }
  120. func testMakeStreamOnClosedConnection() async throws {
  121. try await ConnectionTest.run(connector: .posix()) { context, event in
  122. switch event {
  123. case .connectSucceeded:
  124. context.connection.close()
  125. case .closed:
  126. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  127. _ = try await context.connection.makeStream(descriptor: .echoGet, options: .defaults)
  128. } errorHandler: { error in
  129. XCTAssertEqual(error.code, .unavailable)
  130. }
  131. default:
  132. ()
  133. }
  134. } validateEvents: { context, events in
  135. XCTAssertEqual(events, [.connectSucceeded, .closed(.initiatedLocally)])
  136. }
  137. }
  138. func testMakeStreamOnNotRunningConnection() async throws {
  139. let connection = Connection(
  140. address: .ipv4(host: "ignored", port: 0),
  141. http2Connector: .never,
  142. defaultCompression: .none,
  143. enabledCompression: .none
  144. )
  145. await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
  146. _ = try await connection.makeStream(descriptor: .echoGet, options: .defaults)
  147. } errorHandler: { error in
  148. XCTAssertEqual(error.code, .unavailable)
  149. }
  150. }
  151. }
  152. extension ClientBootstrap {
  153. func connect<T>(
  154. to address: GRPCHTTP2Core.SocketAddress,
  155. _ configure: @Sendable @escaping (Channel) -> EventLoopFuture<T>
  156. ) async throws -> T {
  157. if let ipv4 = address.ipv4 {
  158. return try await self.connect(
  159. host: ipv4.host,
  160. port: ipv4.port,
  161. channelInitializer: configure
  162. )
  163. } else if let ipv6 = address.ipv6 {
  164. return try await self.connect(
  165. host: ipv6.host,
  166. port: ipv6.port,
  167. channelInitializer: configure
  168. )
  169. } else if let uds = address.unixDomainSocket {
  170. return try await self.connect(
  171. unixDomainSocketPath: uds.path,
  172. channelInitializer: configure
  173. )
  174. } else if let vsock = address.virtualSocket {
  175. return try await self.connect(
  176. to: VsockAddress(
  177. cid: .init(Int(vsock.contextID.rawValue)),
  178. port: .init(Int(vsock.port.rawValue))
  179. ),
  180. channelInitializer: configure
  181. )
  182. } else {
  183. throw RPCError(code: .unimplemented, message: "Unhandled socket address: \(address)")
  184. }
  185. }
  186. }
  187. extension Metadata {
  188. init(_ sequence: some Sequence<Element>) {
  189. var metadata = Metadata()
  190. for (key, value) in sequence {
  191. switch value {
  192. case .string(let value):
  193. metadata.addString(value, forKey: key)
  194. case .binary(let value):
  195. metadata.addBinary(value, forKey: key)
  196. }
  197. }
  198. self = metadata
  199. }
  200. }