GRPCServerTests.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Atomics
  17. import GRPCCore
  18. import GRPCInProcessTransport
  19. import XCTest
  20. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
  21. final class GRPCServerTests: XCTestCase {
  22. func withInProcessClientConnectedToServer(
  23. services: [any RegistrableRPCService],
  24. interceptors: [any ServerInterceptor] = [],
  25. _ body: (InProcessClientTransport, GRPCServer) async throws -> Void
  26. ) async throws {
  27. let inProcess = InProcessTransport.makePair()
  28. let server = GRPCServer(
  29. transports: [inProcess.server],
  30. services: services,
  31. interceptors: interceptors
  32. )
  33. try await withThrowingTaskGroup(of: Void.self) { group in
  34. group.addTask {
  35. try await server.run()
  36. }
  37. group.addTask {
  38. try await inProcess.client.connect(lazily: true)
  39. }
  40. try await body(inProcess.client, server)
  41. inProcess.client.close()
  42. server.stopListening()
  43. }
  44. }
  45. func testServerHandlesUnary() async throws {
  46. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  47. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  48. try await stream.outbound.write(.metadata([:]))
  49. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  50. stream.outbound.finish()
  51. var responseParts = stream.inbound.makeAsyncIterator()
  52. let metadata = try await responseParts.next()
  53. XCTAssertMetadata(metadata)
  54. let message = try await responseParts.next()
  55. XCTAssertMessage(message) {
  56. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  57. }
  58. let status = try await responseParts.next()
  59. XCTAssertStatus(status) { status, _ in
  60. XCTAssertEqual(status.code, .ok)
  61. }
  62. }
  63. }
  64. }
  65. func testServerHandlesClientStreaming() async throws {
  66. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  67. try await client.withStream(descriptor: BinaryEcho.Methods.collect) { stream in
  68. try await stream.outbound.write(.metadata([:]))
  69. try await stream.outbound.write(.message([3]))
  70. try await stream.outbound.write(.message([1]))
  71. try await stream.outbound.write(.message([4]))
  72. try await stream.outbound.write(.message([1]))
  73. try await stream.outbound.write(.message([5]))
  74. stream.outbound.finish()
  75. var responseParts = stream.inbound.makeAsyncIterator()
  76. let metadata = try await responseParts.next()
  77. XCTAssertMetadata(metadata)
  78. let message = try await responseParts.next()
  79. XCTAssertMessage(message) {
  80. XCTAssertEqual($0, [3, 1, 4, 1, 5])
  81. }
  82. let status = try await responseParts.next()
  83. XCTAssertStatus(status) { status, _ in
  84. XCTAssertEqual(status.code, .ok)
  85. }
  86. }
  87. }
  88. }
  89. func testServerHandlesServerStreaming() async throws {
  90. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  91. try await client.withStream(descriptor: BinaryEcho.Methods.expand) { stream in
  92. try await stream.outbound.write(.metadata([:]))
  93. try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
  94. stream.outbound.finish()
  95. var responseParts = stream.inbound.makeAsyncIterator()
  96. let metadata = try await responseParts.next()
  97. XCTAssertMetadata(metadata)
  98. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  99. let message = try await responseParts.next()
  100. XCTAssertMessage(message) {
  101. XCTAssertEqual($0, [byte])
  102. }
  103. }
  104. let status = try await responseParts.next()
  105. XCTAssertStatus(status) { status, _ in
  106. XCTAssertEqual(status.code, .ok)
  107. }
  108. }
  109. }
  110. }
  111. func testServerHandlesBidirectionalStreaming() async throws {
  112. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  113. try await client.withStream(descriptor: BinaryEcho.Methods.update) { stream in
  114. try await stream.outbound.write(.metadata([:]))
  115. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  116. try await stream.outbound.write(.message([byte]))
  117. }
  118. stream.outbound.finish()
  119. var responseParts = stream.inbound.makeAsyncIterator()
  120. let metadata = try await responseParts.next()
  121. XCTAssertMetadata(metadata)
  122. for byte in [3, 1, 4, 1, 5] as [UInt8] {
  123. let message = try await responseParts.next()
  124. XCTAssertMessage(message) {
  125. XCTAssertEqual($0, [byte])
  126. }
  127. }
  128. let status = try await responseParts.next()
  129. XCTAssertStatus(status) { status, _ in
  130. XCTAssertEqual(status.code, .ok)
  131. }
  132. }
  133. }
  134. }
  135. func testUnimplementedMethod() async throws {
  136. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  137. try await client.withStream(
  138. descriptor: MethodDescriptor(service: "not", method: "implemented")
  139. ) { stream in
  140. try await stream.outbound.write(.metadata([:]))
  141. stream.outbound.finish()
  142. var responseParts = stream.inbound.makeAsyncIterator()
  143. let status = try await responseParts.next()
  144. XCTAssertStatus(status) { status, _ in
  145. XCTAssertEqual(status.code, .unimplemented)
  146. }
  147. }
  148. }
  149. }
  150. func testMultipleConcurrentRequests() async throws {
  151. try await self.withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, _ in
  152. await withThrowingTaskGroup(of: Void.self) { group in
  153. for i in UInt8.min ..< UInt8.max {
  154. group.addTask {
  155. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  156. try await stream.outbound.write(.metadata([:]))
  157. try await stream.outbound.write(.message([i]))
  158. stream.outbound.finish()
  159. var responseParts = stream.inbound.makeAsyncIterator()
  160. let metadata = try await responseParts.next()
  161. XCTAssertMetadata(metadata)
  162. let message = try await responseParts.next()
  163. XCTAssertMessage(message) { XCTAssertEqual($0, [i]) }
  164. let status = try await responseParts.next()
  165. XCTAssertStatus(status) { status, _ in
  166. XCTAssertEqual(status.code, .ok)
  167. }
  168. }
  169. }
  170. }
  171. }
  172. }
  173. }
  174. func testInterceptorsAreAppliedInOrder() async throws {
  175. let counter1 = ManagedAtomic(0)
  176. let counter2 = ManagedAtomic(0)
  177. try await self.withInProcessClientConnectedToServer(
  178. services: [BinaryEcho()],
  179. interceptors: [
  180. .requestCounter(counter1),
  181. .rejectAll(with: RPCError(code: .unavailable, message: "")),
  182. .requestCounter(counter2),
  183. ]
  184. ) { client, _ in
  185. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  186. try await stream.outbound.write(.metadata([:]))
  187. stream.outbound.finish()
  188. let parts = try await stream.inbound.collect()
  189. XCTAssertStatus(parts.first) { status, _ in
  190. XCTAssertEqual(status.code, .unavailable)
  191. }
  192. }
  193. }
  194. XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1)
  195. XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0)
  196. }
  197. func testInterceptorsAreNotAppliedToUnimplementedMethods() async throws {
  198. let counter = ManagedAtomic(0)
  199. try await self.withInProcessClientConnectedToServer(
  200. services: [BinaryEcho()],
  201. interceptors: [.requestCounter(counter)]
  202. ) { client, _ in
  203. try await client.withStream(
  204. descriptor: MethodDescriptor(service: "not", method: "implemented")
  205. ) { stream in
  206. try await stream.outbound.write(.metadata([:]))
  207. stream.outbound.finish()
  208. let parts = try await stream.inbound.collect()
  209. XCTAssertStatus(parts.first) { status, _ in
  210. XCTAssertEqual(status.code, .unimplemented)
  211. }
  212. }
  213. }
  214. XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 0)
  215. }
  216. func testNoNewRPCsAfterServerStopListening() async throws {
  217. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  218. // Run an RPC so we know the server is up.
  219. try await self.doEchoGet(using: client)
  220. // New streams should fail immediately after this.
  221. server.stopListening()
  222. // RPC should fail now.
  223. await XCTAssertThrowsRPCErrorAsync {
  224. try await client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  225. XCTFail("Stream shouldn't be opened")
  226. }
  227. } errorHandler: { error in
  228. XCTAssertEqual(error.code, .failedPrecondition)
  229. }
  230. }
  231. }
  232. func testInFlightRPCsCanContinueAfterServerStopListening() async throws {
  233. try await withInProcessClientConnectedToServer(services: [BinaryEcho()]) { client, server in
  234. try await client.withStream(descriptor: BinaryEcho.Methods.update) { stream in
  235. try await stream.outbound.write(.metadata([:]))
  236. var iterator = stream.inbound.makeAsyncIterator()
  237. // Don't need to validate the response, just that the server is running.
  238. let metadata = try await iterator.next()
  239. XCTAssertMetadata(metadata)
  240. // New streams should fail immediately after this.
  241. server.stopListening()
  242. try await stream.outbound.write(.message([0]))
  243. stream.outbound.finish()
  244. let message = try await iterator.next()
  245. XCTAssertMessage(message) { XCTAssertEqual($0, [0]) }
  246. let status = try await iterator.next()
  247. XCTAssertStatus(status)
  248. }
  249. }
  250. }
  251. func testCancelRunningServer() async throws {
  252. let inProcess = InProcessTransport.makePair()
  253. let task = Task {
  254. let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()])
  255. try await server.run()
  256. }
  257. try await withThrowingTaskGroup(of: Void.self) { group in
  258. group.addTask {
  259. try? await inProcess.client.connect(lazily: true)
  260. }
  261. try await self.doEchoGet(using: inProcess.client)
  262. // The server must be running at this point as an RPC has completed.
  263. task.cancel()
  264. try await task.value
  265. group.cancelAll()
  266. }
  267. }
  268. func testTestRunServerWithNoTransport() async throws {
  269. let server = GRPCServer(transports: [], services: [])
  270. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  271. try await server.run()
  272. } errorHandler: { error in
  273. XCTAssertEqual(error.code, .noTransportsConfigured)
  274. }
  275. }
  276. func testTestRunStoppedServer() async throws {
  277. let server = GRPCServer(transports: [InProcessServerTransport()], services: [])
  278. // Run the server.
  279. let task = Task { try await server.run() }
  280. task.cancel()
  281. try await task.value
  282. // Server is stopped, should throw an error.
  283. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  284. try await server.run()
  285. } errorHandler: { error in
  286. XCTAssertEqual(error.code, .serverIsStopped)
  287. }
  288. }
  289. func testRunServerWhenTransportThrows() async throws {
  290. let server = GRPCServer(transports: [ThrowOnRunServerTransport()], services: [])
  291. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  292. try await server.run()
  293. } errorHandler: { error in
  294. XCTAssertEqual(error.code, .failedToStartTransport)
  295. }
  296. }
  297. func testRunServerDrainsRunningTransportsWhenOneFailsToStart() async throws {
  298. // Register the in process transport first and allow it to come up.
  299. let inProcess = InProcessTransport.makePair()
  300. // Register a transport waits for a signal before throwing.
  301. let signal = AsyncStream.makeStream(of: Void.self)
  302. let server = GRPCServer(
  303. transports: [
  304. inProcess.server,
  305. ThrowOnSignalServerTransport(signal: signal.stream),
  306. ],
  307. services: []
  308. )
  309. // Connect the in process client and start an RPC. When the stream is opened signal the
  310. // other transport to throw. This stream should be failed by the server.
  311. await withThrowingTaskGroup(of: Void.self) { group in
  312. group.addTask {
  313. try await inProcess.client.connect(lazily: true)
  314. }
  315. group.addTask {
  316. try await inProcess.client.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  317. // The stream is open to the in-process transport. Let the other transport start.
  318. signal.continuation.finish()
  319. try await stream.outbound.write(.metadata([:]))
  320. stream.outbound.finish()
  321. let parts = try await stream.inbound.collect()
  322. XCTAssertStatus(parts.first) { status, _ in
  323. XCTAssertEqual(status.code, .unavailable)
  324. }
  325. }
  326. }
  327. await XCTAssertThrowsErrorAsync(ofType: ServerError.self) {
  328. try await server.run()
  329. } errorHandler: { error in
  330. XCTAssertEqual(error.code, .failedToStartTransport)
  331. }
  332. group.cancelAll()
  333. }
  334. }
  335. private func doEchoGet(using transport: some ClientTransport) async throws {
  336. try await transport.withStream(descriptor: BinaryEcho.Methods.get) { stream in
  337. try await stream.outbound.write(.metadata([:]))
  338. try await stream.outbound.write(.message([0]))
  339. stream.outbound.finish()
  340. // Don't need to validate the response, just that the server is running.
  341. let parts = try await stream.inbound.collect()
  342. XCTAssertEqual(parts.count, 3)
  343. }
  344. }
  345. }