2
0

AcceptedServerTests.swift 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. /*
  2. * Copyright 2025, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import GRPCSampleData
  20. import Logging
  21. import NIOConcurrencyHelpers
  22. import NIOPosix
  23. import XCTest
  24. #if canImport(Darwin)
  25. import Darwin
  26. private let sys_bind = Darwin.bind
  27. private let sys_listen = Darwin.listen
  28. private let sys_close = Darwin.close
  29. private let sys_accept = Darwin.accept
  30. private let sys_strerror = Darwin.strerror
  31. #elseif canImport(Glibc)
  32. import Glibc
  33. private let sys_bind = Glibc.bind
  34. private let sys_listen = Glibc.listen
  35. private let sys_close = Glibc.close
  36. private let sys_accept = Glibc.accept
  37. private let sys_strerror = Glibc.strerror
  38. #endif
  39. final class AcceptedServerTests: GRPCTestCase {
  40. private func withListener<Result>(
  41. _ handle: (ListeningServer) throws -> Result
  42. ) throws -> Result {
  43. let server = try ListeningServer.bind(logger: self.logger)
  44. do {
  45. return try handle(server)
  46. } catch {
  47. server.close()
  48. throw error
  49. }
  50. }
  51. func testBasicCommunication() throws {
  52. try self.withListener { listener in
  53. let client = try GRPCChannelPool.with(
  54. target: .host(listener.host, port: listener.port),
  55. transportSecurity: .plaintext,
  56. eventLoopGroup: .singletonMultiThreadedEventLoopGroup.next()
  57. )
  58. defer {
  59. try? client.close().wait()
  60. }
  61. // Start an RPC to trigger a connect.
  62. let echo = Echo_EchoNIOClient(channel: client)
  63. let response = echo.get(.with { $0.text = "Hello!" })
  64. // Now accept a connection and start the server.
  65. let acceptedFD = try listener.accept()
  66. let server = try Server.insecure(group: .singletonMultiThreadedEventLoopGroup)
  67. .withLogger(self.serverLogger)
  68. .withServiceProviders([EchoProvider()])
  69. .fromAcceptedConnection(takingOwnershipOf: acceptedFD)
  70. .wait()
  71. defer { try? server.close().wait() }
  72. XCTAssertEqual(try response.response.wait().text, "Swift echo get: Hello!")
  73. }
  74. }
  75. #if canImport(NIOSSL)
  76. func testBasicCommunicationWithTLS() throws {
  77. try self.withListener { listener in
  78. let client = try GRPCChannelPool.with(
  79. target: .host(listener.host, port: listener.port),
  80. transportSecurity: .tls(
  81. .makeClientConfigurationBackedByNIOSSL(
  82. trustRoots: .certificates([SampleCertificate.ca.certificate]),
  83. hostnameOverride: "localhost"
  84. )
  85. ),
  86. eventLoopGroup: .singletonMultiThreadedEventLoopGroup.next()
  87. )
  88. defer {
  89. try? client.close().wait()
  90. }
  91. // Start an RPC to trigger a connect.
  92. let echo = Echo_EchoNIOClient(channel: client)
  93. let response = echo.get(.with { $0.text = "Hello!" })
  94. // Now accept a connection and start the server.
  95. let acceptedFD = try listener.accept()
  96. let server = try Server.usingTLSBackedByNIOSSL(
  97. on: .singletonMultiThreadedEventLoopGroup,
  98. certificateChain: [SampleCertificate.server.certificate],
  99. privateKey: SamplePrivateKey.server
  100. )
  101. .withTLS(trustRoots: .certificates([SampleCertificate.ca.certificate]))
  102. .withLogger(self.serverLogger)
  103. .withServiceProviders([EchoProvider()])
  104. .fromAcceptedConnection(takingOwnershipOf: acceptedFD)
  105. .wait()
  106. defer { try? server.close().wait() }
  107. XCTAssertEqual(try response.response.wait().text, "Swift echo get: Hello!")
  108. }
  109. }
  110. #endif // canImport(NIOSSL)
  111. func testGracefulShutdownOfServer() throws {
  112. try self.withListener { listener in
  113. let group = MultiThreadedEventLoopGroup.singleton
  114. let client = try GRPCChannelPool.with(
  115. target: .host(listener.host, port: listener.port),
  116. transportSecurity: .plaintext,
  117. eventLoopGroup: group.next()
  118. )
  119. defer {
  120. try? client.close().wait()
  121. }
  122. // Start an RPC to trigger a connect.
  123. let echo = Echo_EchoNIOClient(channel: client)
  124. let messages = NIOLockedValueBox<[String]>([])
  125. let update = echo.update { reply in
  126. messages.withLockedValue({ $0.append(reply.text) })
  127. }
  128. // Now accept a connection and start the server.
  129. let acceptedFD = try listener.accept()
  130. let server = try Server.insecure(group: group)
  131. .withLogger(self.serverLogger)
  132. .withServiceProviders([EchoProvider()])
  133. .fromAcceptedConnection(takingOwnershipOf: acceptedFD)
  134. .wait()
  135. defer { try? server.close().wait() }
  136. // Initial metadata indicates both peers know about the RPC
  137. XCTAssertNoThrow(try update.initialMetadata.wait())
  138. // Begin graceful shutdown; 'update' can complete, new RPCs should fail.
  139. let shutdown = server.initiateGracefulShutdown()
  140. // Start a new RPC, it should fail.
  141. let getResponse = echo.get(.with { $0.text = "Bye!" })
  142. XCTAssertThrowsError(try getResponse.response.wait())
  143. // Update should still work.
  144. update.sendMessage(.with { $0.text = "Hello!" }, promise: nil)
  145. update.sendEnd(promise: nil)
  146. XCTAssertEqual(try update.status.wait().code, .ok)
  147. XCTAssertEqual(messages.withLockedValue { $0 }, ["Swift echo update (0): Hello!"])
  148. XCTAssertNoThrow(try shutdown.wait())
  149. }
  150. }
  151. }
  152. struct ListeningServer {
  153. private var fd: Int32
  154. let port: Int
  155. var host: String { "127.0.0.1" }
  156. var logger: Logger
  157. private init(fd: Int32, port: Int, logger: Logger) {
  158. self.fd = fd
  159. self.port = port
  160. self.logger = logger
  161. }
  162. func accept() throws(SocketError) -> CInt {
  163. self.logger.debug("Accepting new client connection")
  164. let fd = try Self.acceptConnection(on: self.fd)
  165. self.logger.debug("Accepted new connection", metadata: ["fd": "\(fd)"])
  166. return fd
  167. }
  168. func close() {
  169. self.logger.debug("Closing listener socket")
  170. _ = sys_close(self.fd)
  171. }
  172. static func bind(logger: Logger) throws(SocketError) -> Self {
  173. let fd = try Self.makeListeningSocket()
  174. let port = try Self.getListeningPort(for: fd)
  175. let server = ListeningServer(fd: fd, port: port, logger: logger)
  176. logger.info(
  177. "Opened listening socket",
  178. metadata: ["addr": "\(server.host):\(server.port)", "fd": "\(fd)"]
  179. )
  180. return server
  181. }
  182. enum SocketError: Error {
  183. case creationFailed
  184. case bindFailed
  185. case listenFailed
  186. case acceptFailed(String)
  187. case getsocknameFailed
  188. }
  189. private static func makeListeningSocket() throws(SocketError) -> CInt {
  190. #if canImport(Darwin)
  191. let sockfd = socket(AF_INET, SOCK_STREAM, 0)
  192. #elseif canImport(Glibc)
  193. let sockfd = socket(AF_INET, CInt(SOCK_STREAM.rawValue), 0)
  194. #else
  195. fatalError("Unsupported libc")
  196. #endif
  197. if sockfd == -1 {
  198. throw .creationFailed
  199. }
  200. // Allow address reuse
  201. var yes = 1
  202. setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, socklen_t(MemoryLayout<Int>.size))
  203. var addr = sockaddr_in()
  204. addr.sin_family = sa_family_t(AF_INET)
  205. addr.sin_port = 0
  206. addr.sin_addr.s_addr = inet_addr("127.0.0.1")
  207. let bindResult = withUnsafePointer(to: &addr) {
  208. $0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
  209. sys_bind(sockfd, $0, socklen_t(MemoryLayout<sockaddr_in>.size))
  210. }
  211. }
  212. if bindResult == -1 {
  213. _ = sys_close(sockfd)
  214. throw .bindFailed
  215. }
  216. if sys_listen(sockfd, 5) == -1 {
  217. _ = sys_close(sockfd)
  218. throw .listenFailed
  219. }
  220. return sockfd
  221. }
  222. private static func getListeningPort(
  223. for listener: CInt,
  224. ) throws(SocketError) -> Int {
  225. var address = sockaddr_in()
  226. var addressLength = socklen_t(MemoryLayout<sockaddr_in>.size)
  227. let getsocknameResult = withUnsafeMutablePointer(to: &address) {
  228. $0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
  229. getsockname(listener, $0, &addressLength)
  230. }
  231. }
  232. if getsocknameResult == 0 {
  233. return Int(UInt16(bigEndian: address.sin_port))
  234. } else {
  235. _ = sys_close(listener)
  236. throw .getsocknameFailed
  237. }
  238. }
  239. private static func acceptConnection(
  240. on listener: CInt,
  241. ) throws(SocketError) -> CInt {
  242. var clientAddress = sockaddr_in()
  243. var clientAddressLength = socklen_t(MemoryLayout<sockaddr_in>.size)
  244. let clientSocket = withUnsafeMutablePointer(to: &clientAddress) {
  245. $0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
  246. sys_accept(listener, $0, &clientAddressLength)
  247. }
  248. }
  249. if clientSocket == -1 {
  250. throw .acceptFailed(String(cString: sys_strerror(errno)!))
  251. } else {
  252. return clientSocket
  253. }
  254. }
  255. }