GRPCAsyncClientCallTests.swift 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. @testable import GRPC
  19. import NIOHPACK
  20. import NIOPosix
  21. import XCTest
  22. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  23. class GRPCAsyncClientCallTests: GRPCTestCase {
  24. private var group: MultiThreadedEventLoopGroup?
  25. private var server: Server?
  26. private var channel: ClientConnection?
  27. private static let OKInitialMetadata = HPACKHeaders([
  28. (":status", "200"),
  29. ("content-type", "application/grpc"),
  30. ])
  31. private static let OKTrailingMetadata = HPACKHeaders([
  32. ("grpc-status", "0"),
  33. ])
  34. private func setUpServerAndChannel() throws -> ClientConnection {
  35. let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  36. self.group = group
  37. let server = try Server.insecure(group: group)
  38. .withServiceProviders([EchoProvider()])
  39. .withLogger(self.serverLogger)
  40. .bind(host: "127.0.0.1", port: 0)
  41. .wait()
  42. self.server = server
  43. let channel = ClientConnection.insecure(group: group)
  44. .withBackgroundActivityLogger(self.clientLogger)
  45. .connect(host: "127.0.0.1", port: server.channel.localAddress!.port!)
  46. self.channel = channel
  47. return channel
  48. }
  49. override func tearDown() {
  50. if let channel = self.channel {
  51. XCTAssertNoThrow(try channel.close().wait())
  52. }
  53. if let server = self.server {
  54. XCTAssertNoThrow(try server.close().wait())
  55. }
  56. if let group = self.group {
  57. XCTAssertNoThrow(try group.syncShutdownGracefully())
  58. }
  59. super.tearDown()
  60. }
  61. func testAsyncUnaryCall() async throws {
  62. let channel = try self.setUpServerAndChannel()
  63. let get: GRPCAsyncUnaryCall<Echo_EchoRequest, Echo_EchoResponse> = channel.makeAsyncUnaryCall(
  64. path: "/echo.Echo/Get",
  65. request: .with { $0.text = "holt" },
  66. callOptions: .init()
  67. )
  68. await assertThat(try await get.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  69. await assertThat(try await get.response, .doesNotThrow())
  70. await assertThat(try await get.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  71. await assertThat(await get.status, .hasCode(.ok))
  72. print(try await get.trailingMetadata)
  73. }
  74. func testAsyncClientStreamingCall() async throws {
  75. let channel = try self.setUpServerAndChannel()
  76. let collect: GRPCAsyncClientStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  77. .makeAsyncClientStreamingCall(
  78. path: "/echo.Echo/Collect",
  79. callOptions: .init()
  80. )
  81. for word in ["boyle", "jeffers", "holt"] {
  82. try await collect.requestStream.send(.with { $0.text = word })
  83. }
  84. collect.requestStream.finish()
  85. await assertThat(try await collect.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  86. await assertThat(try await collect.response, .doesNotThrow())
  87. await assertThat(try await collect.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  88. await assertThat(await collect.status, .hasCode(.ok))
  89. }
  90. func testAsyncServerStreamingCall() async throws {
  91. let channel = try self.setUpServerAndChannel()
  92. let expand: GRPCAsyncServerStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  93. .makeAsyncServerStreamingCall(
  94. path: "/echo.Echo/Expand",
  95. request: .with { $0.text = "boyle jeffers holt" },
  96. callOptions: .init()
  97. )
  98. await assertThat(try await expand.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  99. let numResponses = try await expand.responseStream.map { _ in 1 }.reduce(0, +)
  100. await assertThat(numResponses, .is(.equalTo(3)))
  101. await assertThat(try await expand.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  102. await assertThat(await expand.status, .hasCode(.ok))
  103. }
  104. func testAsyncBidirectionalStreamingCall() async throws {
  105. let channel = try self.setUpServerAndChannel()
  106. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  107. .makeAsyncBidirectionalStreamingCall(
  108. path: "/echo.Echo/Update",
  109. callOptions: .init()
  110. )
  111. let requests = ["boyle", "jeffers", "holt"]
  112. .map { word in Echo_EchoRequest.with { $0.text = word } }
  113. for request in requests {
  114. try await update.requestStream.send(request)
  115. }
  116. try await update.requestStream.send(requests)
  117. update.requestStream.finish()
  118. let numResponses = try await update.responseStream.map { _ in 1 }.reduce(0, +)
  119. await assertThat(numResponses, .is(.equalTo(6)))
  120. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  121. await assertThat(await update.status, .hasCode(.ok))
  122. }
  123. func testAsyncBidirectionalStreamingCall_InterleavedRequestsAndResponses() async throws {
  124. let channel = try self.setUpServerAndChannel()
  125. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  126. .makeAsyncBidirectionalStreamingCall(
  127. path: "/echo.Echo/Update",
  128. callOptions: .init()
  129. )
  130. await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  131. var responseStreamIterator = update.responseStream.makeAsyncIterator()
  132. for word in ["boyle", "jeffers", "holt"] {
  133. try await update.requestStream.send(.with { $0.text = word })
  134. await assertThat(try await responseStreamIterator.next(), .is(.notNil()))
  135. }
  136. update.requestStream.finish()
  137. await assertThat(try await responseStreamIterator.next(), .is(.nil()))
  138. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  139. await assertThat(await update.status, .hasCode(.ok))
  140. }
  141. func testAsyncBidirectionalStreamingCall_ConcurrentTasks() async throws {
  142. let channel = try self.setUpServerAndChannel()
  143. let update: GRPCAsyncBidirectionalStreamingCall<Echo_EchoRequest, Echo_EchoResponse> = channel
  144. .makeAsyncBidirectionalStreamingCall(
  145. path: "/echo.Echo/Update",
  146. callOptions: .init()
  147. )
  148. await assertThat(try await update.initialMetadata, .is(.equalTo(Self.OKInitialMetadata)))
  149. let counter = RequestResponseCounter()
  150. // Send the requests and get responses in separate concurrent tasks and await the group.
  151. _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in
  152. // Send requests, then end, in a task.
  153. taskGroup.addTask {
  154. for word in ["boyle", "jeffers", "holt"] {
  155. try await update.requestStream.send(.with { $0.text = word })
  156. await counter.incrementRequests()
  157. }
  158. update.requestStream.finish()
  159. }
  160. // Get responses in a separate task.
  161. taskGroup.addTask {
  162. for try await _ in update.responseStream {
  163. await counter.incrementResponses()
  164. }
  165. }
  166. }
  167. await assertThat(await counter.numRequests, .is(.equalTo(3)))
  168. await assertThat(await counter.numResponses, .is(.equalTo(3)))
  169. await assertThat(try await update.trailingMetadata, .is(.equalTo(Self.OKTrailingMetadata)))
  170. await assertThat(await update.status, .hasCode(.ok))
  171. }
  172. }
  173. // Workaround https://bugs.swift.org/browse/SR-15070 (compiler crashes when defining a class/actor
  174. // in an async context).
  175. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  176. private actor RequestResponseCounter {
  177. var numResponses = 0
  178. var numRequests = 0
  179. func incrementResponses() async {
  180. self.numResponses += 1
  181. }
  182. func incrementRequests() async {
  183. self.numRequests += 1
  184. }
  185. }