GRPCServerTests.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCInProcessTransport
  19. import XCTest
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. final class GRPCServerTests: XCTestCase {
  22. func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) {
  23. let server = InProcessServerTransport()
  24. let client = InProcessClientTransport(server: server)
  25. return (client, server)
  26. }
  27. func withInProcessClientConnectedToServer(
  28. services: [any RegistrableRPCService],
  29. interceptors: [any ServerInterceptor] = [],
  30. _ body: (InProcessClientTransport, GRPCServer) async throws -> Void
  31. ) async throws {
  32. let inProcess = self.makeInProcessPair()
  33. let server = GRPCServer(
  34. transports: [inProcess.server],
  35. services: services,
  36. interceptors: interceptors
  37. )
  38. try await withThrowingTaskGroup(of: Void.self) { group in
  39. group.addTask {
  40. try await server.run()
  41. }
  42. group.addTask {
  43. try await inProcess.client.connect(lazily: true)
  44. }
  45. try await body(inProcess.client, server)
  46. inProcess.client.close()
  47. server.stopListening()
  48. }
  49. }
  50. func testServerHandlesUnary() async throws {
  51. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  52. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  53. try await stream.outbound.write(.metadata([:]))
  54. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  55. stream.outbound.finish()
  56. var responseParts = stream.inbound.makeAsyncIterator()
  57. let metadata = try await responseParts.next()
  58. XCTAssertMetadata(metadata)
  59. let message = try await responseParts.next()
  60. XCTAssertMessage(message) {
  61. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  62. }
  63. let status = try await responseParts.next()
  64. XCTAssertStatus(status) { status, _ in
  65. XCTAssertEqual(status.code, .ok)
  66. }
  67. }
  68. }
  69. }
  70. func testServerHandlesClientStreaming() async throws {
  71. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  72. try await client.withStream(descriptor: BinaryEcho.Methods.collect) { stream in
  73. try await stream.outbound.write(.metadata([:]))
  74. try await stream.outbound.write(.message([3]))
  75. try await stream.outbound.write(.message([1]))
  76. try await stream.outbound.write(.message([4]))
  77. try await stream.outbound.write(.message([1]))
  78. try await stream.outbound.write(.message([5]))
  79. stream.outbound.finish()
  80. var responseParts = stream.inbound.makeAsyncIterator()
  81. let metadata = try await responseParts.next()
  82. XCTAssertMetadata(metadata)
  83. let message = try await responseParts.next()
  84. XCTAssertMessage(message) {
  85. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  86. }
  87. let status = try await responseParts.next()
  88. XCTAssertStatus(status) { status, _ in
  89. XCTAssertEqual(status.code, .ok)
  90. }
  91. }
  92. }
  93. }
  94. func testServerHandlesServerStreaming() async throws {
  95. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  96. try await client.withStream(descriptor: BinaryEcho.Methods.expand) { stream in
  97. try await stream.outbound.write(.metadata([:]))
  98. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  99. stream.outbound.finish()
  100. var responseParts = stream.inbound.makeAsyncIterator()
  101. let metadata = try await responseParts.next()
  102. XCTAssertMetadata(metadata)
  103. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  104. let message = try await responseParts.next()
  105. XCTAssertMessage(message) {
  106. XCTAssertEqual($0, [byte])
  107. }
  108. }
  109. let status = try await responseParts.next()
  110. XCTAssertStatus(status) { status, _ in
  111. XCTAssertEqual(status.code, .ok)
  112. }
  113. }
  114. }
  115. }
  116. func testServerHandlesBidirectionalStreaming() async throws {
  117. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  118. try await client.withStream(descriptor: BinaryEcho.Methods.update) { stream in
  119. try await stream.outbound.write(.metadata([:]))
  120. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  121. try await stream.outbound.write(.message([byte]))
  122. }
  123. stream.outbound.finish()
  124. var responseParts = stream.inbound.makeAsyncIterator()
  125. let metadata = try await responseParts.next()
  126. XCTAssertMetadata(metadata)
  127. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  128. let message = try await responseParts.next()
  129. XCTAssertMessage(message) {
  130. XCTAssertEqual($0, [byte])
  131. }
  132. }
  133. let status = try await responseParts.next()
  134. XCTAssertStatus(status) { status, _ in
  135. XCTAssertEqual(status.code, .ok)
  136. }
  137. }
  138. }
  139. }
  140. func testUnimplementedMethod() async throws {
  141. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  142. try await client.withStream(
  143. descriptor: MethodDescriptor(service: "not", method: "implemented")
  144. ) { stream in
  145. try await stream.outbound.write(.metadata([:]))
  146. stream.outbound.finish()
  147. var responseParts = stream.inbound.makeAsyncIterator()
  148. let status = try await responseParts.next()
  149. XCTAssertStatus(status) { status, _ in
  150. XCTAssertEqual(status.code, .unimplemented)
  151. }
  152. }
  153. }
  154. }
  155. func testMultipleConcurrentRequests() async throws {
  156. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  157. await withThrowingTaskGroup(of: Void.self) { group in
  158. for i in UInt8.min ..< UInt8.max {
  159. group.addTask {
  160. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  161. try await stream.outbound.write(.metadata([:]))
  162. try await stream.outbound.write(.message([i]))
  163. stream.outbound.finish()
  164. var responseParts = stream.inbound.makeAsyncIterator()
  165. let metadata = try await responseParts.next()
  166. XCTAssertMetadata(metadata)
  167. let message = try await responseParts.next()
  168. XCTAssertMessage(message) { XCTAssertEqual($0, [i]) }
  169. let status = try await responseParts.next()
  170. XCTAssertStatus(status) { status, _ in
  171. XCTAssertEqual(status.code, .ok)
  172. }
  173. }
  174. }
  175. }
  176. }
  177. }
  178. }
  179. func testInterceptorsAreAppliedInOrder() async throws {
  180. let counter1 = ManagedAtomic(0)
  181. let counter2 = ManagedAtomic(0)
  182. try await self.withInProcessClientConnectedToServer(
  183. services: [BinaryEcho()],
  184. interceptors: [
  185. .requestCounter(counter1),
  186. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  187. .requestCounter(counter2),
  188. ]
  189. ) { client, _ in
  190. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  191. try await stream.outbound.write(.metadata([:]))
  192. stream.outbound.finish()
  193. let parts = try await stream.inbound.collect()
  194. XCTAssertStatus(parts.first) { status, _ in
  195. XCTAssertEqual(status.code, .unavailable)
  196. }
  197. }
  198. }
  199. XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1)
  200. XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0)
  201. }
  202. func testInterceptorsAreNotAppliedToUnimplementedMethods() async throws {
  203. let counter = ManagedAtomic(0)
  204. try await self.withInProcessClientConnectedToServer(
  205. services: [BinaryEcho()],
  206. interceptors: [.requestCounter(counter)]
  207. ) { client, _ in
  208. try await client.withStream(
  209. descriptor: MethodDescriptor(service: "not", method: "implemented")
  210. ) { stream in
  211. try await stream.outbound.write(.metadata([:]))
  212. stream.outbound.finish()
  213. let parts = try await stream.inbound.collect()
  214. XCTAssertStatus(parts.first) { status, _ in
  215. XCTAssertEqual(status.code, .unimplemented)
  216. }
  217. }
  218. }
  219. XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 0)
  220. }
  221. func testNoNewRPCsAfterServerStopListening() async throws {
  222. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  223. // Run an RPC so we know the server is up.
  224. try await self.doEchoGet(using: client)
  225. // New streams should fail immediately after this.
  226. server.stopListening()
  227. // RPC should fail now.
  228. await XCTAssertThrowsRPCErrorAsync {
  229. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  230. XCTFail("Stream shouldn't be opened")
  231. }
  232. } errorHandler: { error in
  233. XCTAssertEqual(error.code, .failedPrecondition)
  234. }
  235. }
  236. }
  237. func testInFlightRPCsCanContinueAfterServerStopListening() async throws {
  238. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  239. try await client.withStream(descriptor: BinaryEcho.Methods.update) { stream in
  240. try await stream.outbound.write(.metadata([:]))
  241. var iterator = stream.inbound.makeAsyncIterator()
  242. // Don't need to validate the response, just that the server is running.
  243. let metadata = try await iterator.next()
  244. XCTAssertMetadata(metadata)
  245. // New streams should fail immediately after this.
  246. server.stopListening()
  247. try await stream.outbound.write(.message([0]))
  248. stream.outbound.finish()
  249. let message = try await iterator.next()
  250. XCTAssertMessage(message) { XCTAssertEqual($0, [0]) }
  251. let status = try await iterator.next()
  252. XCTAssertStatus(status)
  253. }
  254. }
  255. }
  256. func testCancelRunningServer() async throws {
  257. let inProcess = self.makeInProcessPair()
  258. let task = Task {
  259. let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()])
  260. try await server.run()
  261. }
  262. try await withThrowingTaskGroup(of: Void.self) { group in
  263. group.addTask {
  264. try? await inProcess.client.connect(lazily: true)
  265. }
  266. try await self.doEchoGet(using: inProcess.client)
  267. // The server must be running at this point as an RPC has completed.
  268. task.cancel()
  269. try await task.value
  270. group.cancelAll()
  271. }
  272. }
  273. func testTestRunServerWithNoTransport() async throws {
  274. let server = GRPCServer(transports: [], services: [])
  275. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  276. try await server.run()
  277. } errorHandler: { error in
  278. XCTAssertEqual(error.code, .noTransportsConfigured)
  279. }
  280. }
  281. func testTestRunStoppedServer() async throws {
  282. let server = GRPCServer(transports: [InProcessServerTransport()], services: [])
  283. // Run the server.
  284. let task = Task { try await server.run() }
  285. task.cancel()
  286. try await task.value
  287. // Server is stopped, should throw an error.
  288. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  289. try await server.run()
  290. } errorHandler: { error in
  291. XCTAssertEqual(error.code, .serverIsStopped)
  292. }
  293. }
  294. func testRunServerWhenTransportThrows() async throws {
  295. let server = GRPCServer(transports: [ThrowOnRunServerTransport()], services: [])
  296. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  297. try await server.run()
  298. } errorHandler: { error in
  299. XCTAssertEqual(error.code, .failedToStartTransport)
  300. }
  301. }
  302. func testRunServerDrainsRunningTransportsWhenOneFailsToStart() async throws {
  303. // Register the in process transport first and allow it to come up.
  304. let inProcess = self.makeInProcessPair()
  305. // Register a transport waits for a signal before throwing.
  306. let signal = AsyncStream.makeStream(of: Void.self)
  307. let server = GRPCServer(
  308. transports: [
  309. inProcess.server,
  310. ThrowOnSignalServerTransport(signal: signal.stream),
  311. ],
  312. services: []
  313. )
  314. // Connect the in process client and start an RPC. When the stream is opened signal the
  315. // other transport to throw. This stream should be failed by the server.
  316. await withThrowingTaskGroup(of: Void.self) { group in
  317. group.addTask {
  318. try await inProcess.client.connect(lazily: true)
  319. }
  320. group.addTask {
  321. try await inProcess.client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  322. // The stream is open to the in-process transport. Let the other transport start.
  323. signal.continuation.finish()
  324. try await stream.outbound.write(.metadata([:]))
  325. stream.outbound.finish()
  326. let parts = try await stream.inbound.collect()
  327. XCTAssertStatus(parts.first) { status, _ in
  328. XCTAssertEqual(status.code, .unavailable)
  329. }
  330. }
  331. }
  332. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  333. try await server.run()
  334. } errorHandler: { error in
  335. XCTAssertEqual(error.code, .failedToStartTransport)
  336. }
  337. group.cancelAll()
  338. }
  339. }
  340. private func doEchoGet(using transport: some ClientTransport) async throws {
  341. try await transport.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  342. try await stream.outbound.write(.metadata([:]))
  343. try await stream.outbound.write(.message([0]))
  344. stream.outbound.finish()
  345. // Don't need to validate the response, just that the server is running.
  346. let parts = try await stream.inbound.collect()
  347. XCTAssertEqual(parts.count, 3)
  348. }
  349. }
  350. }