ServerOnCloseTests.swift 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIOConcurrencyHelpers
  20. import NIOCore
  21. import NIOPosix
  22. import XCTest
  23. final class ServerOnCloseTests: GRPCTestCase {
  24. private var group: EventLoopGroup?
  25. private var server: Server?
  26. private var client: ClientConnection?
  27. private var echo: Echo_EchoNIOClient!
  28. private var eventLoop: EventLoop {
  29. return self.group!.next()
  30. }
  31. override func tearDown() {
  32. // Some tests shut down the client/server so we tolerate errors here.
  33. try? self.client?.close().wait()
  34. try? self.server?.close().wait()
  35. XCTAssertNoThrow(try self.group?.syncShutdownGracefully())
  36. super.tearDown()
  37. }
  38. override func setUp() {
  39. super.setUp()
  40. self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
  41. }
  42. private func setUp(provider: Echo_EchoProvider) throws {
  43. self.server = try Server.insecure(group: self.group!)
  44. .withLogger(self.serverLogger)
  45. .withServiceProviders([provider])
  46. .bind(host: "localhost", port: 0)
  47. .wait()
  48. self.client = ClientConnection.insecure(group: self.group!)
  49. .withBackgroundActivityLogger(self.clientLogger)
  50. .connect(host: "localhost", port: self.server!.channel.localAddress!.port!)
  51. self.echo = Echo_EchoNIOClient(
  52. channel: self.client!,
  53. defaultCallOptions: CallOptions(logger: self.clientLogger)
  54. )
  55. }
  56. private func startServer(
  57. echoDelegate: Echo_EchoProvider,
  58. onClose: @escaping (Result<Void, Error>) -> Void
  59. ) {
  60. let provider = OnCloseEchoProvider(delegate: echoDelegate, onClose: onClose)
  61. XCTAssertNoThrow(try self.setUp(provider: provider))
  62. }
  63. private func doTestUnary(
  64. echoProvider: Echo_EchoProvider,
  65. completesWithStatus code: GRPCStatus.Code
  66. ) {
  67. let promise = self.eventLoop.makePromise(of: Void.self)
  68. self.startServer(echoDelegate: echoProvider) { result in
  69. promise.completeWith(result)
  70. }
  71. let get = self.echo.get(.with { $0.text = "" })
  72. assertThat(try get.status.wait(), .hasCode(code))
  73. XCTAssertNoThrow(try promise.futureResult.wait())
  74. }
  75. func testUnaryOnCloseHappyPath() throws {
  76. self.doTestUnary(echoProvider: EchoProvider(), completesWithStatus: .ok)
  77. }
  78. func testUnaryOnCloseAfterUserFunctionFails() throws {
  79. self.doTestUnary(echoProvider: FailingEchoProvider(), completesWithStatus: .internalError)
  80. }
  81. func testUnaryOnCloseAfterClientKilled() throws {
  82. let promise = self.eventLoop.makePromise(of: Void.self)
  83. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { result in
  84. promise.completeWith(result)
  85. }
  86. // We want to wait until the client has sent the request parts before closing. We'll grab the
  87. // promise for sending end.
  88. let endSent = self.client!.eventLoop.makePromise(of: Void.self)
  89. self.echo.interceptors = DelegatingEchoClientInterceptorFactory { part, promise, context in
  90. switch part {
  91. case .metadata, .message:
  92. context.send(part, promise: promise)
  93. case .end:
  94. endSent.futureResult.cascade(to: promise)
  95. context.send(part, promise: endSent)
  96. }
  97. }
  98. _ = self.echo.get(.with { $0.text = "" })
  99. // Make sure end has been sent before closing the connection.
  100. XCTAssertNoThrow(try endSent.futureResult.wait())
  101. XCTAssertNoThrow(try self.client!.close().wait())
  102. XCTAssertNoThrow(try promise.futureResult.wait())
  103. }
  104. private func doTestClientStreaming(
  105. echoProvider: Echo_EchoProvider,
  106. completesWithStatus code: GRPCStatus.Code
  107. ) {
  108. let promise = self.eventLoop.makePromise(of: Void.self)
  109. self.startServer(echoDelegate: echoProvider) { result in
  110. promise.completeWith(result)
  111. }
  112. let collect = self.echo.collect()
  113. // We don't know if we'll send successfully or not.
  114. try? collect.sendEnd().wait()
  115. assertThat(try collect.status.wait(), .hasCode(code))
  116. XCTAssertNoThrow(try promise.futureResult.wait())
  117. }
  118. func testClientStreamingOnCloseHappyPath() throws {
  119. self.doTestClientStreaming(echoProvider: EchoProvider(), completesWithStatus: .ok)
  120. }
  121. func testClientStreamingOnCloseAfterUserFunctionFails() throws {
  122. self.doTestClientStreaming(
  123. echoProvider: FailingEchoProvider(),
  124. completesWithStatus: .internalError
  125. )
  126. }
  127. func testClientStreamingOnCloseAfterClientKilled() throws {
  128. let promise = self.eventLoop.makePromise(of: Void.self)
  129. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { error in
  130. promise.completeWith(error)
  131. }
  132. let collect = self.echo.collect()
  133. XCTAssertNoThrow(try collect.sendMessage(.with { $0.text = "" }).wait())
  134. XCTAssertNoThrow(try self.client!.close().wait())
  135. XCTAssertNoThrow(try promise.futureResult.wait())
  136. }
  137. private func doTestServerStreaming(
  138. echoProvider: Echo_EchoProvider,
  139. completesWithStatus code: GRPCStatus.Code
  140. ) {
  141. let promise = self.eventLoop.makePromise(of: Void.self)
  142. self.startServer(echoDelegate: echoProvider) { result in
  143. promise.completeWith(result)
  144. }
  145. let expand = self.echo.expand(.with { $0.text = "1 2 3" }) { _ in /* ignore responses */ }
  146. assertThat(try expand.status.wait(), .hasCode(code))
  147. XCTAssertNoThrow(try promise.futureResult.wait())
  148. }
  149. func testServerStreamingOnCloseHappyPath() throws {
  150. self.doTestServerStreaming(echoProvider: EchoProvider(), completesWithStatus: .ok)
  151. }
  152. func testServerStreamingOnCloseAfterUserFunctionFails() throws {
  153. self.doTestServerStreaming(
  154. echoProvider: FailingEchoProvider(),
  155. completesWithStatus: .internalError
  156. )
  157. }
  158. func testServerStreamingOnCloseAfterClientKilled() throws {
  159. let promise = self.eventLoop.makePromise(of: Void.self)
  160. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { result in
  161. promise.completeWith(result)
  162. }
  163. // We want to wait until the client has sent the request parts before closing. We'll grab the
  164. // promise for sending end.
  165. let endSent = self.client!.eventLoop.makePromise(of: Void.self)
  166. self.echo.interceptors = DelegatingEchoClientInterceptorFactory { part, promise, context in
  167. switch part {
  168. case .metadata, .message:
  169. context.send(part, promise: promise)
  170. case .end:
  171. endSent.futureResult.cascade(to: promise)
  172. context.send(part, promise: endSent)
  173. }
  174. }
  175. _ = self.echo.expand(.with { $0.text = "1 2 3" }) { _ in /* ignore responses */ }
  176. // Make sure end has been sent before closing the connection.
  177. XCTAssertNoThrow(try endSent.futureResult.wait())
  178. XCTAssertNoThrow(try self.client!.close().wait())
  179. XCTAssertNoThrow(try promise.futureResult.wait())
  180. }
  181. private func doTestBidirectionalStreaming(
  182. echoProvider: Echo_EchoProvider,
  183. completesWithStatus code: GRPCStatus.Code
  184. ) {
  185. let promise = self.eventLoop.makePromise(of: Void.self)
  186. self.startServer(echoDelegate: echoProvider) { result in
  187. promise.completeWith(result)
  188. }
  189. let update = self.echo.update { _ in /* ignored */ }
  190. // We don't know if we'll send successfully or not.
  191. try? update.sendEnd().wait()
  192. assertThat(try update.status.wait(), .hasCode(code))
  193. XCTAssertNoThrow(try promise.futureResult.wait())
  194. }
  195. func testBidirectionalStreamingOnCloseHappyPath() throws {
  196. self.doTestBidirectionalStreaming(echoProvider: EchoProvider(), completesWithStatus: .ok)
  197. }
  198. func testBidirectionalStreamingOnCloseAfterUserFunctionFails() throws {
  199. // TODO: https://github.com/grpc/grpc-swift/issues/1215
  200. // self.doTestBidirectionalStreaming(
  201. // echoProvider: FailingEchoProvider(),
  202. // completesWithStatus: .internalError
  203. // )
  204. }
  205. func testBidirectionalStreamingOnCloseAfterClientKilled() throws {
  206. let promise = self.eventLoop.makePromise(of: Void.self)
  207. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { result in
  208. promise.completeWith(result)
  209. }
  210. let update = self.echo.update { _ in /* ignored */ }
  211. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "" }).wait())
  212. XCTAssertNoThrow(try self.client!.close().wait())
  213. XCTAssertNoThrow(try promise.futureResult.wait())
  214. }
  215. }