GRPCAsyncClientCallTests.swift 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.5)
  17. import EchoImplementation
  18. import EchoModel
  19. @testable import GRPC
  20. import NIOHPACK
  21. import NIOPosix
  22. import XCTest
  23. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  24. class GRPCAsyncClientCallTests: GRPCTestCase {
  25. private var group: MultiThreadedEventLoopGroup?
  26. private var server: Server?
  27. private var channel: ClientConnection?
  28. private static let OKInitialMetadata = HPACKHeaders([
  29. (":status", "200"),
  30. ("content-type", "application/grpc"),
  31. ])
  32. private static let OKTrailingMetadata = HPACKHeaders([
  33. ("grpc-status", "0"),
  34. ])
  35. private func setUpServerAndChannel() throws -> ClientConnection {
  36. let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  37. self.group = group
  38. let server = try Server.insecure(group: group)
  39. .withServiceProviders([EchoProvider()])
  40. .withLogger(self.serverLogger)
  41. .bind(host: "127.0.0.1", port: 0)
  42. .wait()
  43. self.server = server
  44. let channel = ClientConnection.insecure(group: group)
  45. .withBackgroundActivityLogger(self.clientLogger)
  46. .connect(host: "127.0.0.1", port: server.channel.localAddress!.port!)
  47. self.channel = channel
  48. return channel
  49. }
  50. override func tearDown() {
  51. if let channel = self.channel {
  52. XCTAssertNoThrow(try channel.close().wait())
  53. }
  54. if let server = self.server {
  55. XCTAssertNoThrow(try server.close().wait())
  56. }
  57. if let group = self.group {
  58. XCTAssertNoThrow(try group.syncShutdownGracefully())
  59. }
  60. super.tearDown()
  61. }
  62. func testAsyncUnaryCall() throws { XCTAsyncTest {
  63. let channel = try self.setUpServerAndChannel()
  64. let get: GRPCAsyncUnaryCall<Echo_EchoRequest, Echo_EchoResponse> = channel.makeAsyncUnaryCall(
  65. path: "/echo.Echo/Get",
  66. request: .with { $0.text = "holt" },
  67. callOptions: .init()
  68. )
  69. await assertThat(try await get.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  70. await assertThat(try await get.response, .doesNotThrow())
  71. await assertThat(try await get.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  72. await assertThat(await get.status, .hasCode(.ok))
  73. print(try await get.trailingMetadata)
  74. } }
  75. func testAsyncClientStreamingCall() throws { XCTAsyncTest {
  76. let channel = try self.setUpServerAndChannel()
  77. let collect: GRPCAsyncClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  78. .makeAsyncClientStreamingCall(
  79. path: "/echo.Echo/Collect",
  80. callOptions: .init()
  81. )
  82. for word in ["boyle", "jeffers", "holt"] {
  83. try await collect.sendMessage(.with { $0.text = word })
  84. }
  85. try await collect.sendEnd()
  86. await assertThat(try await collect.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  87. await assertThat(try await collect.response, .doesNotThrow())
  88. await assertThat(try await collect.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  89. await assertThat(await collect.status, .hasCode(.ok))
  90. } }
  91. func testAsyncServerStreamingCall() throws { XCTAsyncTest {
  92. let channel = try self.setUpServerAndChannel()
  93. let expand: GRPCAsyncServerStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  94. .makeAsyncServerStreamingCall(
  95. path: "/echo.Echo/Expand",
  96. request: .with { $0.text = "boyle jeffers holt" },
  97. callOptions: .init()
  98. )
  99. await assertThat(try await expand.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  100. let numResponses = try await expand.responses.map { _ in 1 }.reduce(0, +)
  101. await assertThat(numResponses, .is(.equalTo(3)))
  102. await assertThat(try await expand.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  103. await assertThat(await expand.status, .hasCode(.ok))
  104. } }
  105. func testAsyncBidirectionalStreamingCall() throws { XCTAsyncTest {
  106. let channel = try self.setUpServerAndChannel()
  107. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  108. .makeAsyncBidirectionalStreamingCall(
  109. path: "/echo.Echo/Update",
  110. callOptions: .init()
  111. )
  112. for word in ["boyle", "jeffers", "holt"] {
  113. try await update.sendMessage(.with { $0.text = word })
  114. }
  115. try await update.sendEnd()
  116. let numResponses = try await update.responses.map { _ in 1 }.reduce(0, +)
  117. await assertThat(numResponses, .is(.equalTo(3)))
  118. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  119. await assertThat(await update.status, .hasCode(.ok))
  120. } }
  121. func testAsyncBidirectionalStreamingCall_InterleavedRequestsAndResponses() throws { XCTAsyncTest {
  122. let channel = try self.setUpServerAndChannel()
  123. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  124. .makeAsyncBidirectionalStreamingCall(
  125. path: "/echo.Echo/Update",
  126. callOptions: .init()
  127. )
  128. await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  129. var responseStreamIterator = update.responses.makeAsyncIterator()
  130. for word in ["boyle", "jeffers", "holt"] {
  131. try await update.sendMessage(.with { $0.text = word })
  132. await assertThat(try await responseStreamIterator.next(), .is(.notNil()))
  133. }
  134. try await update.sendEnd()
  135. await assertThat(try await responseStreamIterator.next(), .is(.nil()))
  136. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  137. await assertThat(await update.status, .hasCode(.ok))
  138. } }
  139. func testAsyncBidirectionalStreamingCall_ConcurrentTasks() throws { XCTAsyncTest {
  140. let channel = try self.setUpServerAndChannel()
  141. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  142. .makeAsyncBidirectionalStreamingCall(
  143. path: "/echo.Echo/Update",
  144. callOptions: .init()
  145. )
  146. await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  147. let counter = RequestResponseCounter()
  148. // Send the requests and get responses in separate concurrent tasks and await the group.
  149. _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in
  150. // Send requests, then end, in a task.
  151. taskGroup.addTask {
  152. for word in ["boyle", "jeffers", "holt"] {
  153. try await update.sendMessage(.with { $0.text = word })
  154. await counter.incrementRequests()
  155. }
  156. try await update.sendEnd()
  157. }
  158. // Get responses in a separate task.
  159. taskGroup.addTask {
  160. for try await _ in update.responses {
  161. await counter.incrementResponses()
  162. }
  163. }
  164. }
  165. await assertThat(await counter.numRequests, .is(.equalTo(3)))
  166. await assertThat(await counter.numResponses, .is(.equalTo(3)))
  167. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  168. await assertThat(await update.status, .hasCode(.ok))
  169. } }
  170. }
  171. // Workaround https://bugs.swift.org/browse/SR-15070 (compiler crashes when defining a class/actor
  172. // in an async context).
  173. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  174. fileprivate actor RequestResponseCounter {
  175. var numResponses = 0
  176. var numRequests = 0
  177. func incrementResponses() async {
  178. self.numResponses += 1
  179. }
  180. func incrementRequests() async {
  181. self.numRequests += 1
  182. }
  183. }
  184. #endif