GRPCServerTests.swift 15 KB


  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCInProcessTransport
  19. import XCTest
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. final class GRPCServerTests: XCTestCase {
  22. func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) {
  23. let server = InProcessServerTransport()
  24. let client = InProcessClientTransport(server: server)
  25. return (client, server)
  26. }
  27. func withInProcessClientConnectedToServer(
  28. services: [any RegistrableRPCService],
  29. interceptors: [any ServerInterceptor] = [],
  30. _ body: (InProcessClientTransport, GRPCServer) async throws -> Void
  31. ) async throws {
  32. let inProcess = self.makeInProcessPair()
  33. let server = GRPCServer()
  34. server.transports.add(inProcess.server)
  35. for service in services {
  36. server.services.register(service)
  37. }
  38. for interceptor in interceptors {
  39. server.interceptors.add(interceptor)
  40. }
  41. try await withThrowingTaskGroup(of: Void.self) { group in
  42. group.addTask {
  43. try await server.run()
  44. }
  45. group.addTask {
  46. try await inProcess.client.connect(lazily: true)
  47. }
  48. try await body(inProcess.client, server)
  49. inProcess.client.close()
  50. server.stopListening()
  51. }
  52. }
  53. func testServerHandlesUnary() async throws {
  54. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  55. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  56. try await stream.outbound.write(.metadata([:]))
  57. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  58. stream.outbound.finish()
  59. var responseParts = stream.inbound.makeAsyncIterator()
  60. let metadata = try await responseParts.next()
  61. XCTAssertMetadata(metadata)
  62. let message = try await responseParts.next()
  63. XCTAssertMessage(message) {
  64. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  65. }
  66. let status = try await responseParts.next()
  67. XCTAssertStatus(status) { status, _ in
  68. XCTAssertEqual(status.code, .ok)
  69. }
  70. }
  71. }
  72. }
  73. func testServerHandlesClientStreaming() async throws {
  74. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  75. try await client.withStream(descriptor: BinaryEcho.Methods.collect) { stream in
  76. try await stream.outbound.write(.metadata([:]))
  77. try await stream.outbound.write(.message([3]))
  78. try await stream.outbound.write(.message([1]))
  79. try await stream.outbound.write(.message([4]))
  80. try await stream.outbound.write(.message([1]))
  81. try await stream.outbound.write(.message([5]))
  82. stream.outbound.finish()
  83. var responseParts = stream.inbound.makeAsyncIterator()
  84. let metadata = try await responseParts.next()
  85. XCTAssertMetadata(metadata)
  86. let message = try await responseParts.next()
  87. XCTAssertMessage(message) {
  88. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  89. }
  90. let status = try await responseParts.next()
  91. XCTAssertStatus(status) { status, _ in
  92. XCTAssertEqual(status.code, .ok)
  93. }
  94. }
  95. }
  96. }
  97. func testServerHandlesServerStreaming() async throws {
  98. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  99. try await client.withStream(descriptor: BinaryEcho.Methods.expand) { stream in
  100. try await stream.outbound.write(.metadata([:]))
  101. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  102. stream.outbound.finish()
  103. var responseParts = stream.inbound.makeAsyncIterator()
  104. let metadata = try await responseParts.next()
  105. XCTAssertMetadata(metadata)
  106. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  107. let message = try await responseParts.next()
  108. XCTAssertMessage(message) {
  109. XCTAssertEqual($0, [byte])
  110. }
  111. }
  112. let status = try await responseParts.next()
  113. XCTAssertStatus(status) { status, _ in
  114. XCTAssertEqual(status.code, .ok)
  115. }
  116. }
  117. }
  118. }
  119. func testServerHandlesBidirectionalStreaming() async throws {
  120. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  121. try await client.withStream(descriptor: BinaryEcho.Methods.update) { stream in
  122. try await stream.outbound.write(.metadata([:]))
  123. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  124. try await stream.outbound.write(.message([byte]))
  125. }
  126. stream.outbound.finish()
  127. var responseParts = stream.inbound.makeAsyncIterator()
  128. let metadata = try await responseParts.next()
  129. XCTAssertMetadata(metadata)
  130. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  131. let message = try await responseParts.next()
  132. XCTAssertMessage(message) {
  133. XCTAssertEqual($0, [byte])
  134. }
  135. }
  136. let status = try await responseParts.next()
  137. XCTAssertStatus(status) { status, _ in
  138. XCTAssertEqual(status.code, .ok)
  139. }
  140. }
  141. }
  142. }
  143. func testUnimplementedMethod() async throws {
  144. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  145. try await client.withStream(
  146. descriptor: MethodDescriptor(service: "not", method: "implemented")
  147. ) { stream in
  148. try await stream.outbound.write(.metadata([:]))
  149. stream.outbound.finish()
  150. var responseParts = stream.inbound.makeAsyncIterator()
  151. let status = try await responseParts.next()
  152. XCTAssertStatus(status) { status, _ in
  153. XCTAssertEqual(status.code, .unimplemented)
  154. }
  155. }
  156. }
  157. }
  158. func testMultipleConcurrentRequests() async throws {
  159. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  160. await withThrowingTaskGroup(of: Void.self) { group in
  161. for i in UInt8.min ..< UInt8.max {
  162. group.addTask {
  163. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  164. try await stream.outbound.write(.metadata([:]))
  165. try await stream.outbound.write(.message([i]))
  166. stream.outbound.finish()
  167. var responseParts = stream.inbound.makeAsyncIterator()
  168. let metadata = try await responseParts.next()
  169. XCTAssertMetadata(metadata)
  170. let message = try await responseParts.next()
  171. XCTAssertMessage(message) { XCTAssertEqual($0, [i]) }
  172. let status = try await responseParts.next()
  173. XCTAssertStatus(status) { status, _ in
  174. XCTAssertEqual(status.code, .ok)
  175. }
  176. }
  177. }
  178. }
  179. }
  180. }
  181. }
  182. func testInterceptorsAreAppliedInOrder() async throws {
  183. let counter1 = ManagedAtomic(0)
  184. let counter2 = ManagedAtomic(0)
  185. try await self.withInProcessClientConnectedToServer(
  186. services: [BinaryEcho()],
  187. interceptors: [
  188. .requestCounter(counter1),
  189. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  190. .requestCounter(counter2),
  191. ]
  192. ) { client, _ in
  193. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  194. try await stream.outbound.write(.metadata([:]))
  195. stream.outbound.finish()
  196. let parts = try await stream.inbound.collect()
  197. XCTAssertStatus(parts.first) { status, _ in
  198. XCTAssertEqual(status.code, .unavailable)
  199. }
  200. }
  201. }
  202. XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1)
  203. XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0)
  204. }
  205. func testInterceptorsAreNotAppliedToUnimplementedMethods() async throws {
  206. let counter = ManagedAtomic(0)
  207. try await self.withInProcessClientConnectedToServer(
  208. services: [BinaryEcho()],
  209. interceptors: [.requestCounter(counter)]
  210. ) { client, _ in
  211. try await client.withStream(
  212. descriptor: MethodDescriptor(service: "not", method: "implemented")
  213. ) { stream in
  214. try await stream.outbound.write(.metadata([:]))
  215. stream.outbound.finish()
  216. let parts = try await stream.inbound.collect()
  217. XCTAssertStatus(parts.first) { status, _ in
  218. XCTAssertEqual(status.code, .unimplemented)
  219. }
  220. }
  221. }
  222. XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 0)
  223. }
  224. func testNoNewRPCsAfterServerStopListening() async throws {
  225. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  226. // Run an RPC so we know the server is up.
  227. try await self.doEchoGet(using: client)
  228. // New streams should fail immediately after this.
  229. server.stopListening()
  230. // RPC should fail now.
  231. await XCTAssertThrowsRPCErrorAsync {
  232. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  233. XCTFail("Stream shouldn't be opened")
  234. }
  235. } errorHandler: { error in
  236. XCTAssertEqual(error.code, .failedPrecondition)
  237. }
  238. }
  239. }
  240. func testInFlightRPCsCanContinueAfterServerStopListening() async throws {
  241. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  242. try await client.withStream(descriptor: BinaryEcho.Methods.update) { stream in
  243. try await stream.outbound.write(.metadata([:]))
  244. var iterator = stream.inbound.makeAsyncIterator()
  245. // Don't need to validate the response, just that the server is running.
  246. let metadata = try await iterator.next()
  247. XCTAssertMetadata(metadata)
  248. // New streams should fail immediately after this.
  249. server.stopListening()
  250. try await stream.outbound.write(.message([0]))
  251. stream.outbound.finish()
  252. let message = try await iterator.next()
  253. XCTAssertMessage(message) { XCTAssertEqual($0, [0]) }
  254. let status = try await iterator.next()
  255. XCTAssertStatus(status)
  256. }
  257. }
  258. }
  259. func testCancelRunningServer() async throws {
  260. let inProcess = self.makeInProcessPair()
  261. let task = Task {
  262. let server = GRPCServer()
  263. server.services.register(BinaryEcho())
  264. server.transports.add(inProcess.server)
  265. try await server.run()
  266. }
  267. try await withThrowingTaskGroup(of: Void.self) { group in
  268. group.addTask {
  269. try? await inProcess.client.connect(lazily: true)
  270. }
  271. try await self.doEchoGet(using: inProcess.client)
  272. // The server must be running at this point as an RPC has completed.
  273. task.cancel()
  274. try await task.value
  275. group.cancelAll()
  276. }
  277. }
  278. func testTestRunServerWithNoTransport() async throws {
  279. let server = GRPCServer()
  280. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  281. try await server.run()
  282. } errorHandler: { error in
  283. XCTAssertEqual(error.code, .noTransportsConfigured)
  284. }
  285. }
  286. func testTestRunStoppedServer() async throws {
  287. let server = GRPCServer()
  288. server.transports.add(InProcessServerTransport())
  289. // Run the server.
  290. let task = Task { try await server.run() }
  291. task.cancel()
  292. try await task.value
  293. // Server is stopped, should throw an error.
  294. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  295. try await server.run()
  296. } errorHandler: { error in
  297. XCTAssertEqual(error.code, .serverIsStopped)
  298. }
  299. }
  300. func testRunServerWhenTransportThrows() async throws {
  301. let server = GRPCServer()
  302. server.transports.add(ThrowOnRunServerTransport())
  303. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  304. try await server.run()
  305. } errorHandler: { error in
  306. XCTAssertEqual(error.code, .failedToStartTransport)
  307. }
  308. }
  309. func testRunServerDrainsRunningTransportsWhenOneFailsToStart() async throws {
  310. let server = GRPCServer()
  311. // Register the in process transport first and allow it to come up.
  312. let inProcess = self.makeInProcessPair()
  313. server.transports.add(inProcess.server)
  314. // Register a transport waits for a signal before throwing.
  315. let signal = AsyncStream.makeStream(of: Void.self)
  316. server.transports.add(ThrowOnSignalServerTransport(signal: signal.stream))
  317. // Connect the in process client and start an RPC. When the stream is opened signal the
  318. // other transport to throw. This stream should be failed by the server.
  319. await withThrowingTaskGroup(of: Void.self) { group in
  320. group.addTask {
  321. try await inProcess.client.connect(lazily: true)
  322. }
  323. group.addTask {
  324. try await inProcess.client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  325. // The stream is open to the in-process transport. Let the other transport start.
  326. signal.continuation.finish()
  327. try await stream.outbound.write(.metadata([:]))
  328. stream.outbound.finish()
  329. let parts = try await stream.inbound.collect()
  330. XCTAssertStatus(parts.first) { status, _ in
  331. XCTAssertEqual(status.code, .unavailable)
  332. }
  333. }
  334. }
  335. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  336. try await server.run()
  337. } errorHandler: { error in
  338. XCTAssertEqual(error.code, .failedToStartTransport)
  339. }
  340. group.cancelAll()
  341. }
  342. }
  343. func testInterceptorsDescription() async throws {
  344. let server = GRPCServer()
  345. server.interceptors.add(.rejectAll(with: .init(code: .aborted, message: "")))
  346. server.interceptors.add(.requestCounter(.init(0)))
  347. let description = String(describing: server.interceptors)
  348. let expected = #"["RejectAllServerInterceptor", "RequestCountingServerInterceptor"]"#
  349. XCTAssertEqual(description, expected)
  350. }
  351. func testServicesDescription() async throws {
  352. let server = GRPCServer()
  353. let methods: [(String, String)] = [
  354. ("helloworld.Greeter", "SayHello"),
  355. ("echo.Echo", "Foo"),
  356. ("echo.Echo", "Bar"),
  357. ("echo.Echo", "Baz"),
  358. ]
  359. for (service, method) in methods {
  360. let descriptor = MethodDescriptor(service: service, method: method)
  361. server.services.router.registerHandler(
  362. forMethod: descriptor,
  363. deserializer: IdentityDeserializer(),
  364. serializer: IdentitySerializer()
  365. ) { _ in
  366. fatalError("Unreachable")
  367. }
  368. }
  369. let description = String(describing: server.services)
  370. let expected = """
  371. ["echo.Echo/Bar", "echo.Echo/Baz", "echo.Echo/Foo", "helloworld.Greeter/SayHello"]
  372. """
  373. XCTAssertEqual(description, expected)
  374. }
  375. private func doEchoGet(using transport: some ClientTransport) async throws {
  376. try await transport.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  377. try await stream.outbound.write(.metadata([:]))
  378. try await stream.outbound.write(.message([0]))
  379. stream.outbound.finish()
  380. // Don't need to validate the response, just that the server is running.
  381. let parts = try await stream.inbound.collect()
  382. XCTAssertEqual(parts.count, 3)
  383. }
  384. }
  385. }