ServerOnCloseTests.swift 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import GRPC
  19. import NIO
  20. import NIOConcurrencyHelpers
  21. import XCTest
  22. final class ServerOnCloseTests: GRPCTestCase {
  23. private var group: EventLoopGroup?
  24. private var server: Server?
  25. private var client: ClientConnection?
  26. private var echo: Echo_EchoClient!
  27. private var eventLoop: EventLoop {
  28. return self.group!.next()
  29. }
  30. override func tearDown() {
  31. // Some tests shut down the client/server so we tolerate errors here.
  32. try? self.client?.close().wait()
  33. try? self.server?.close().wait()
  34. XCTAssertNoThrow(try self.group?.syncShutdownGracefully())
  35. super.tearDown()
  36. }
  37. override func setUp() {
  38. super.setUp()
  39. self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
  40. }
  41. private func setUp(provider: Echo_EchoProvider) throws {
  42. self.server = try Server.insecure(group: self.group!)
  43. .withLogger(self.serverLogger)
  44. .withServiceProviders([provider])
  45. .bind(host: "localhost", port: 0)
  46. .wait()
  47. self.client = ClientConnection.insecure(group: self.group!)
  48. .withBackgroundActivityLogger(self.clientLogger)
  49. .connect(host: "localhost", port: self.server!.channel.localAddress!.port!)
  50. self.echo = Echo_EchoClient(
  51. channel: self.client!,
  52. defaultCallOptions: CallOptions(logger: self.clientLogger)
  53. )
  54. }
  55. private func startServer(
  56. echoDelegate: Echo_EchoProvider,
  57. onClose: @escaping (Result<Void, Error>) -> Void
  58. ) {
  59. let provider = OnCloseEchoProvider(delegate: echoDelegate, onClose: onClose)
  60. XCTAssertNoThrow(try self.setUp(provider: provider))
  61. }
  62. private func doTestUnary(
  63. echoProvider: Echo_EchoProvider,
  64. completesWithStatus code: GRPCStatus.Code
  65. ) {
  66. let promise = self.eventLoop.makePromise(of: Void.self)
  67. self.startServer(echoDelegate: echoProvider) { result in
  68. promise.completeWith(result)
  69. }
  70. let get = self.echo.get(.with { $0.text = "" })
  71. assertThat(try get.status.wait(), .hasCode(code))
  72. XCTAssertNoThrow(try promise.futureResult.wait())
  73. }
  74. func testUnaryOnCloseHappyPath() throws {
  75. self.doTestUnary(echoProvider: EchoProvider(), completesWithStatus: .ok)
  76. }
  77. func testUnaryOnCloseAfterUserFunctionFails() throws {
  78. self.doTestUnary(echoProvider: FailingEchoProvider(), completesWithStatus: .internalError)
  79. }
  80. func testUnaryOnCloseAfterClientKilled() throws {
  81. let promise = self.eventLoop.makePromise(of: Void.self)
  82. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { result in
  83. promise.completeWith(result)
  84. }
  85. // We want to wait until the client has sent the request parts before closing. We'll grab the
  86. // promise for sending end.
  87. let endSent = self.client!.eventLoop.makePromise(of: Void.self)
  88. self.echo.interceptors = DelegatingEchoClientInterceptorFactory { part, promise, context in
  89. switch part {
  90. case .metadata, .message:
  91. context.send(part, promise: promise)
  92. case .end:
  93. endSent.futureResult.cascade(to: promise)
  94. context.send(part, promise: endSent)
  95. }
  96. }
  97. _ = self.echo.get(.with { $0.text = "" })
  98. // Make sure end has been sent before closing the connection.
  99. XCTAssertNoThrow(try endSent.futureResult.wait())
  100. XCTAssertNoThrow(try self.client!.close().wait())
  101. XCTAssertNoThrow(try promise.futureResult.wait())
  102. }
  103. private func doTestClientStreaming(
  104. echoProvider: Echo_EchoProvider,
  105. completesWithStatus code: GRPCStatus.Code
  106. ) {
  107. let promise = self.eventLoop.makePromise(of: Void.self)
  108. self.startServer(echoDelegate: echoProvider) { result in
  109. promise.completeWith(result)
  110. }
  111. let collect = self.echo.collect()
  112. // We don't know if we'll send successfully or not.
  113. try? collect.sendEnd().wait()
  114. assertThat(try collect.status.wait(), .hasCode(code))
  115. XCTAssertNoThrow(try promise.futureResult.wait())
  116. }
  117. func testClientStreamingOnCloseHappyPath() throws {
  118. self.doTestClientStreaming(echoProvider: EchoProvider(), completesWithStatus: .ok)
  119. }
  120. func testClientStreamingOnCloseAfterUserFunctionFails() throws {
  121. self.doTestClientStreaming(
  122. echoProvider: FailingEchoProvider(),
  123. completesWithStatus: .internalError
  124. )
  125. }
  126. func testClientStreamingOnCloseAfterClientKilled() throws {
  127. let promise = self.eventLoop.makePromise(of: Void.self)
  128. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { error in
  129. promise.completeWith(error)
  130. }
  131. let collect = self.echo.collect()
  132. XCTAssertNoThrow(try collect.sendMessage(.with { $0.text = "" }).wait())
  133. XCTAssertNoThrow(try self.client!.close().wait())
  134. XCTAssertNoThrow(try promise.futureResult.wait())
  135. }
  136. private func doTestServerStreaming(
  137. echoProvider: Echo_EchoProvider,
  138. completesWithStatus code: GRPCStatus.Code
  139. ) {
  140. let promise = self.eventLoop.makePromise(of: Void.self)
  141. self.startServer(echoDelegate: echoProvider) { result in
  142. promise.completeWith(result)
  143. }
  144. let expand = self.echo.expand(.with { $0.text = "1 2 3" }) { _ in /* ignore responses */ }
  145. assertThat(try expand.status.wait(), .hasCode(code))
  146. XCTAssertNoThrow(try promise.futureResult.wait())
  147. }
  148. func testServerStreamingOnCloseHappyPath() throws {
  149. self.doTestServerStreaming(echoProvider: EchoProvider(), completesWithStatus: .ok)
  150. }
  151. func testServerStreamingOnCloseAfterUserFunctionFails() throws {
  152. self.doTestServerStreaming(
  153. echoProvider: FailingEchoProvider(),
  154. completesWithStatus: .internalError
  155. )
  156. }
  157. func testServerStreamingOnCloseAfterClientKilled() throws {
  158. let promise = self.eventLoop.makePromise(of: Void.self)
  159. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { result in
  160. promise.completeWith(result)
  161. }
  162. // We want to wait until the client has sent the request parts before closing. We'll grab the
  163. // promise for sending end.
  164. let endSent = self.client!.eventLoop.makePromise(of: Void.self)
  165. self.echo.interceptors = DelegatingEchoClientInterceptorFactory { part, promise, context in
  166. switch part {
  167. case .metadata, .message:
  168. context.send(part, promise: promise)
  169. case .end:
  170. endSent.futureResult.cascade(to: promise)
  171. context.send(part, promise: endSent)
  172. }
  173. }
  174. _ = self.echo.expand(.with { $0.text = "1 2 3" }) { _ in /* ignore responses */ }
  175. // Make sure end has been sent before closing the connection.
  176. XCTAssertNoThrow(try endSent.futureResult.wait())
  177. XCTAssertNoThrow(try self.client!.close().wait())
  178. XCTAssertNoThrow(try promise.futureResult.wait())
  179. }
  180. private func doTestBidirectionalStreaming(
  181. echoProvider: Echo_EchoProvider,
  182. completesWithStatus code: GRPCStatus.Code
  183. ) {
  184. let promise = self.eventLoop.makePromise(of: Void.self)
  185. self.startServer(echoDelegate: echoProvider) { result in
  186. promise.completeWith(result)
  187. }
  188. let update = self.echo.update { _ in /* ignored */ }
  189. // We don't know if we'll send successfully or not.
  190. try? update.sendEnd().wait()
  191. assertThat(try update.status.wait(), .hasCode(code))
  192. XCTAssertNoThrow(try promise.futureResult.wait())
  193. }
  194. func testBidirectionalStreamingOnCloseHappyPath() throws {
  195. self.doTestBidirectionalStreaming(echoProvider: EchoProvider(), completesWithStatus: .ok)
  196. }
  197. func testBidirectionalStreamingOnCloseAfterUserFunctionFails() throws {
  198. // TODO: https://github.com/grpc/grpc-swift/issues/1215
  199. // self.doTestBidirectionalStreaming(
  200. // echoProvider: FailingEchoProvider(),
  201. // completesWithStatus: .internalError
  202. // )
  203. }
  204. func testBidirectionalStreamingOnCloseAfterClientKilled() throws {
  205. let promise = self.eventLoop.makePromise(of: Void.self)
  206. self.startServer(echoDelegate: NeverResolvingEchoProvider()) { result in
  207. promise.completeWith(result)
  208. }
  209. let update = self.echo.update { _ in /* ignored */ }
  210. XCTAssertNoThrow(try update.sendMessage(.with { $0.text = "" }).wait())
  211. XCTAssertNoThrow(try self.client!.close().wait())
  212. XCTAssertNoThrow(try promise.futureResult.wait())
  213. }
  214. }