GRPCAsyncClientCallTests.swift 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import NIOHPACK
  19. import NIOPosix
  20. import XCTest
  21. @testable import GRPC
  22. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  23. class GRPCAsyncClientCallTests: GRPCTestCase {
  24. private var group: MultiThreadedEventLoopGroup?
  25. private var server: Server?
  26. private var channel: ClientConnection?
  27. private static let OKInitialMetadata = HPACKHeaders([
  28. (":status", "200"),
  29. ("content-type", "application/grpc"),
  30. ])
  31. private static let OKTrailingMetadata = HPACKHeaders([
  32. ("grpc-status", "0")
  33. ])
  34. private func setUpServerAndChannel(
  35. service: CallHandlerProvider = EchoProvider()
  36. ) throws -> ClientConnection {
  37. let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  38. self.group = group
  39. let server = try Server.insecure(group: group)
  40. .withServiceProviders([service])
  41. .withLogger(self.serverLogger)
  42. .bind(host: "127.0.0.1", port: 0)
  43. .wait()
  44. self.server = server
  45. let channel = ClientConnection.insecure(group: group)
  46. .withBackgroundActivityLogger(self.clientLogger)
  47. .connect(host: "127.0.0.1", port: server.channel.localAddress!.port!)
  48. self.channel = channel
  49. return channel
  50. }
  51. override func tearDown() {
  52. if let channel = self.channel {
  53. XCTAssertNoThrow(try channel.close().wait())
  54. }
  55. if let server = self.server {
  56. XCTAssertNoThrow(try server.close().wait())
  57. }
  58. if let group = self.group {
  59. XCTAssertNoThrow(try group.syncShutdownGracefully())
  60. }
  61. super.tearDown()
  62. }
  63. func testAsyncUnaryCall() async throws {
  64. let channel = try self.setUpServerAndChannel()
  65. let get: GRPCAsyncUnaryCall<Echo_EchoRequest, Echo_EchoResponse> = channel.makeAsyncUnaryCall(
  66. path: "/echo.Echo/Get",
  67. request: .with { $0.text = "holt" },
  68. callOptions: .init()
  69. )
  70. await assertThat(try await get.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  71. await assertThat(try await get.response, .doesNotThrow())
  72. await assertThat(try await get.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  73. await assertThat(await get.status, .hasCode(.ok))
  74. print(try await get.trailingMetadata)
  75. }
  76. func testAsyncClientStreamingCall() async throws {
  77. let channel = try self.setUpServerAndChannel()
  78. let collect: GRPCAsyncClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse> =
  79. channel
  80. .makeAsyncClientStreamingCall(
  81. path: "/echo.Echo/Collect",
  82. callOptions: .init()
  83. )
  84. for word in ["boyle", "jeffers", "holt"] {
  85. try await collect.requestStream.send(.with { $0.text = word })
  86. }
  87. collect.requestStream.finish()
  88. await assertThat(try await collect.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  89. await assertThat(try await collect.response, .doesNotThrow())
  90. await assertThat(try await collect.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  91. await assertThat(await collect.status, .hasCode(.ok))
  92. }
  93. func testAsyncServerStreamingCall() async throws {
  94. let channel = try self.setUpServerAndChannel()
  95. let expand: GRPCAsyncServerStreamingCall<Echo_EchoRequest, Echo_EchoResponse> =
  96. channel
  97. .makeAsyncServerStreamingCall(
  98. path: "/echo.Echo/Expand",
  99. request: .with { $0.text = "boyle jeffers holt" },
  100. callOptions: .init()
  101. )
  102. await assertThat(try await expand.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  103. let numResponses = try await expand.responseStream.map { _ in 1 }.reduce(0, +)
  104. await assertThat(numResponses, .is(.equalTo(3)))
  105. await assertThat(try await expand.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  106. await assertThat(await expand.status, .hasCode(.ok))
  107. }
  108. func testAsyncBidirectionalStreamingCall() async throws {
  109. let channel = try self.setUpServerAndChannel()
  110. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> =
  111. channel
  112. .makeAsyncBidirectionalStreamingCall(
  113. path: "/echo.Echo/Update",
  114. callOptions: .init()
  115. )
  116. let requests = ["boyle", "jeffers", "holt"]
  117. .map { word in Echo_EchoRequest.with { $0.text = word } }
  118. for request in requests {
  119. try await update.requestStream.send(request)
  120. }
  121. try await update.requestStream.send(requests)
  122. update.requestStream.finish()
  123. let numResponses = try await update.responseStream.map { _ in 1 }.reduce(0, +)
  124. await assertThat(numResponses, .is(.equalTo(6)))
  125. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  126. await assertThat(await update.status, .hasCode(.ok))
  127. }
  128. func testAsyncBidirectionalStreamingCall_InterleavedRequestsAndResponses() async throws {
  129. let channel = try self.setUpServerAndChannel()
  130. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> =
  131. channel
  132. .makeAsyncBidirectionalStreamingCall(
  133. path: "/echo.Echo/Update",
  134. callOptions: .init()
  135. )
  136. await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  137. var responseStreamIterator = update.responseStream.makeAsyncIterator()
  138. for word in ["boyle", "jeffers", "holt"] {
  139. try await update.requestStream.send(.with { $0.text = word })
  140. await assertThat(try await responseStreamIterator.next(), .is(.some()))
  141. }
  142. update.requestStream.finish()
  143. await assertThat(try await responseStreamIterator.next(), .is(.none()))
  144. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  145. await assertThat(await update.status, .hasCode(.ok))
  146. }
  147. func testAsyncBidirectionalStreamingCall_ConcurrentTasks() async throws {
  148. let channel = try self.setUpServerAndChannel()
  149. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> =
  150. channel
  151. .makeAsyncBidirectionalStreamingCall(
  152. path: "/echo.Echo/Update",
  153. callOptions: .init()
  154. )
  155. await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  156. let counter = RequestResponseCounter()
  157. // Send the requests and get responses in separate concurrent tasks and await the group.
  158. _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in
  159. // Send requests, then end, in a task.
  160. taskGroup.addTask {
  161. for word in ["boyle", "jeffers", "holt"] {
  162. try await update.requestStream.send(.with { $0.text = word })
  163. await counter.incrementRequests()
  164. }
  165. update.requestStream.finish()
  166. }
  167. // Get responses in a separate task.
  168. taskGroup.addTask {
  169. for try await _ in update.responseStream {
  170. await counter.incrementResponses()
  171. }
  172. }
  173. }
  174. await assertThat(await counter.numRequests, .is(.equalTo(3)))
  175. await assertThat(await counter.numResponses, .is(.equalTo(3)))
  176. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  177. await assertThat(await update.status, .hasCode(.ok))
  178. }
  179. func testExplicitAcceptUnary(twice: Bool, function: String = #function) async throws {
  180. let headers: HPACKHeaders = ["fn": function]
  181. let channel = try self.setUpServerAndChannel(
  182. service: AsyncEchoProvider(headers: headers, sendTwice: twice)
  183. )
  184. let echo = Echo_EchoAsyncClient(channel: channel)
  185. let call = echo.makeGetCall(.with { $0.text = "" })
  186. let responseHeaders = try await call.initialMetadata
  187. XCTAssertEqual(responseHeaders.first(name: "fn"), function)
  188. let status = await call.status
  189. XCTAssertEqual(status.code, .ok)
  190. }
  191. func testExplicitAcceptUnary() async throws {
  192. try await self.testExplicitAcceptUnary(twice: false)
  193. }
  194. func testExplicitAcceptTwiceUnary() async throws {
  195. try await self.testExplicitAcceptUnary(twice: true)
  196. }
  197. func testExplicitAcceptClientStreaming(twice: Bool, function: String = #function) async throws {
  198. let headers: HPACKHeaders = ["fn": function]
  199. let channel = try self.setUpServerAndChannel(
  200. service: AsyncEchoProvider(headers: headers, sendTwice: twice)
  201. )
  202. let echo = Echo_EchoAsyncClient(channel: channel)
  203. let call = echo.makeCollectCall()
  204. let responseHeaders = try await call.initialMetadata
  205. XCTAssertEqual(responseHeaders.first(name: "fn"), function)
  206. // Close request stream; the response should be empty.
  207. call.requestStream.finish()
  208. let response = try await call.response
  209. XCTAssertEqual(response.text, "")
  210. let status = await call.status
  211. XCTAssertEqual(status.code, .ok)
  212. }
  213. func testExplicitAcceptClientStreaming() async throws {
  214. try await self.testExplicitAcceptClientStreaming(twice: false)
  215. }
  216. func testExplicitAcceptTwiceClientStreaming() async throws {
  217. try await self.testExplicitAcceptClientStreaming(twice: true)
  218. }
  219. func testExplicitAcceptServerStreaming(twice: Bool, function: String = #function) async throws {
  220. let headers: HPACKHeaders = ["fn": #function]
  221. let channel = try self.setUpServerAndChannel(
  222. service: AsyncEchoProvider(headers: headers, sendTwice: twice)
  223. )
  224. let echo = Echo_EchoAsyncClient(channel: channel)
  225. let call = echo.makeExpandCall(.with { $0.text = "foo bar baz" })
  226. let responseHeaders = try await call.initialMetadata
  227. XCTAssertEqual(responseHeaders.first(name: "fn"), #function)
  228. // Close request stream; the response should be empty.
  229. let responses = try await call.responseStream.collect()
  230. XCTAssertEqual(responses.count, 3)
  231. let status = await call.status
  232. XCTAssertEqual(status.code, .ok)
  233. }
  234. func testExplicitAcceptServerStreaming() async throws {
  235. try await self.testExplicitAcceptServerStreaming(twice: false)
  236. }
  237. func testExplicitAcceptTwiceServerStreaming() async throws {
  238. try await self.testExplicitAcceptServerStreaming(twice: true)
  239. }
  240. func testExplicitAcceptBidirectionalStreaming(
  241. twice: Bool,
  242. function: String = #function
  243. ) async throws {
  244. let headers: HPACKHeaders = ["fn": function]
  245. let channel = try self.setUpServerAndChannel(
  246. service: AsyncEchoProvider(headers: headers, sendTwice: twice)
  247. )
  248. let echo = Echo_EchoAsyncClient(channel: channel)
  249. let call = echo.makeUpdateCall()
  250. let responseHeaders = try await call.initialMetadata
  251. XCTAssertEqual(responseHeaders.first(name: "fn"), function)
  252. // Close request stream; there should be no responses.
  253. call.requestStream.finish()
  254. let responses = try await call.responseStream.collect()
  255. XCTAssertEqual(responses.count, 0)
  256. let status = await call.status
  257. XCTAssertEqual(status.code, .ok)
  258. }
  259. func testExplicitAcceptBidirectionalStreaming() async throws {
  260. try await self.testExplicitAcceptBidirectionalStreaming(twice: false)
  261. }
  262. func testExplicitAcceptTwiceBidirectionalStreaming() async throws {
  263. try await self.testExplicitAcceptBidirectionalStreaming(twice: true)
  264. }
  265. }
  266. // Workaround https://bugs.swift.org/browse/SR-15070 (compiler crashes when defining a class/actor
  267. // in an async context).
  268. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  269. private actor RequestResponseCounter {
  270. var numResponses = 0
  271. var numRequests = 0
  272. func incrementResponses() async {
  273. self.numResponses += 1
  274. }
  275. func incrementRequests() async {
  276. self.numRequests += 1
  277. }
  278. }
  279. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  280. private final class AsyncEchoProvider: Echo_EchoAsyncProvider {
  281. let headers: HPACKHeaders
  282. let sendTwice: Bool
  283. init(headers: HPACKHeaders, sendTwice: Bool = false) {
  284. self.headers = headers
  285. self.sendTwice = sendTwice
  286. }
  287. private func accept(context: GRPCAsyncServerCallContext) async {
  288. await context.acceptRPC(headers: self.headers)
  289. if self.sendTwice {
  290. await context.acceptRPC(headers: self.headers) // Should be a no-op.
  291. }
  292. }
  293. func get(
  294. request: Echo_EchoRequest,
  295. context: GRPCAsyncServerCallContext
  296. ) async throws -> Echo_EchoResponse {
  297. await self.accept(context: context)
  298. return Echo_EchoResponse.with { $0.text = request.text }
  299. }
  300. func expand(
  301. request: Echo_EchoRequest,
  302. responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
  303. context: GRPCAsyncServerCallContext
  304. ) async throws {
  305. await self.accept(context: context)
  306. for part in request.text.components(separatedBy: " ") {
  307. let response = Echo_EchoResponse.with {
  308. $0.text = part
  309. }
  310. try await responseStream.send(response)
  311. }
  312. }
  313. func collect(
  314. requestStream: GRPCAsyncRequestStream<Echo_EchoRequest>,
  315. context: GRPCAsyncServerCallContext
  316. ) async throws -> Echo_EchoResponse {
  317. await self.accept(context: context)
  318. let collected = try await requestStream.map { $0.text }.collect().joined(separator: " ")
  319. return Echo_EchoResponse.with { $0.text = collected }
  320. }
  321. func update(
  322. requestStream: GRPCAsyncRequestStream<Echo_EchoRequest>,
  323. responseStream: GRPCAsyncResponseStreamWriter<Echo_EchoResponse>,
  324. context: GRPCAsyncServerCallContext
  325. ) async throws {
  326. await self.accept(context: context)
  327. for try await request in requestStream {
  328. let response = Echo_EchoResponse.with { $0.text = request.text }
  329. try await responseStream.send(response)
  330. }
  331. }
  332. }