ClientTransportTests.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import XCTest
  19. class ClientTransportTests: GRPCTestCase {
  20. override func setUp() {
  21. super.setUp()
  22. self.channel = EmbeddedChannel()
  23. }
  24. // MARK: - Setup Helpers
  25. private func makeDetails(type: GRPCCallType = .unary) -> CallDetails {
  26. return CallDetails(
  27. type: type,
  28. path: "/echo.Echo/Get",
  29. authority: "localhost",
  30. scheme: "https",
  31. options: .init(logger: self.logger)
  32. )
  33. }
  34. private var channel: EmbeddedChannel!
  35. private var transport: ClientTransport<String, String>!
  36. private var eventLoop: EventLoop {
  37. return self.channel.eventLoop
  38. }
  39. private func setUpTransport(
  40. details: CallDetails? = nil,
  41. interceptors: [ClientInterceptor<String, String>] = [],
  42. onResponsePart: @escaping (ClientResponsePart<String>) -> Void = { _ in }
  43. ) {
  44. self.transport = .init(
  45. details: details ?? self.makeDetails(),
  46. eventLoop: self.eventLoop,
  47. interceptors: interceptors,
  48. errorDelegate: nil,
  49. onResponsePart
  50. )
  51. }
  52. private func configureTransport(additionalHandlers handlers: [ChannelHandler] = []) {
  53. self.transport.configure {
  54. var handlers = handlers
  55. handlers.append($0)
  56. return self.channel.pipeline.addHandlers(handlers)
  57. }
  58. }
  59. private func configureTransport(_ body: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  60. self.transport.configure(body)
  61. }
  62. private func connect(file: StaticString = #file, line: UInt = #line) throws {
  63. let address = try assertNoThrow(SocketAddress(unixDomainSocketPath: "/whatever"))
  64. assertThat(
  65. try self.channel.connect(to: address).wait(),
  66. .doesNotThrow(),
  67. file: file,
  68. line: line
  69. )
  70. }
  71. private func sendRequest(
  72. _ part: ClientRequestPart<String>,
  73. promise: EventLoopPromise<Void>? = nil
  74. ) {
  75. self.transport.send(part, promise: promise)
  76. }
  77. private func cancel(promise: EventLoopPromise<Void>? = nil) {
  78. self.transport.cancel(promise: promise)
  79. }
  80. private func sendResponse(
  81. _ part: _GRPCClientResponsePart<String>,
  82. file: StaticString = #file,
  83. line: UInt = #line
  84. ) throws {
  85. assertThat(try self.channel.writeInbound(part), .doesNotThrow(), file: file, line: line)
  86. }
  87. }
  88. // MARK: - Tests
  89. extension ClientTransportTests {
  90. func testUnaryFlow() throws {
  91. let recorder = WriteRecorder<_GRPCClientRequestPart<String>>()
  92. let recorderInterceptor = RecordingInterceptor<String, String>()
  93. self.setUpTransport(interceptors: [recorderInterceptor])
  94. // Buffer up some parts.
  95. self.sendRequest(.metadata([:]))
  96. self.sendRequest(.message("0", .init(compress: false, flush: false)))
  97. // Configure the transport and connect. This will unbuffer the parts.
  98. self.configureTransport(additionalHandlers: [recorder])
  99. try self.connect()
  100. // Send the end, this shouldn't require buffering.
  101. self.sendRequest(.end)
  102. // We should have recorded 3 parts in the 'Channel' now.
  103. assertThat(recorder.writes, .hasCount(3))
  104. // Write some responses.
  105. try self.sendResponse(.initialMetadata([:]))
  106. try self.sendResponse(.message(.init("1", compressed: false)))
  107. try self.sendResponse(.trailingMetadata([:]))
  108. try self.sendResponse(.status(.ok))
  109. // The recording interceptor should now have three parts.
  110. assertThat(recorderInterceptor.responseParts, .hasCount(3))
  111. }
  112. func testCancelWhenIdle() throws {
  113. // Set up the transport, configure it and connect.
  114. self.setUpTransport { part in
  115. assertThat(part.error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
  116. }
  117. // Cancellation should succeed.
  118. let promise = self.eventLoop.makePromise(of: Void.self)
  119. self.cancel(promise: promise)
  120. assertThat(try promise.futureResult.wait(), .doesNotThrow())
  121. }
  122. func testCancelWhenAwaitingTransport() throws {
  123. // Set up the transport, configure it and connect.
  124. self.setUpTransport { part in
  125. assertThat(part.error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
  126. }
  127. // Start configuring the transport.
  128. let transportActivatedPromise = self.eventLoop.makePromise(of: Void.self)
  129. // Let's not leak this.
  130. defer {
  131. transportActivatedPromise.succeed(())
  132. }
  133. self.configureTransport { handler in
  134. self.channel.pipeline.addHandler(handler).flatMap {
  135. transportActivatedPromise.futureResult
  136. }
  137. }
  138. // Write a request.
  139. let p1 = self.eventLoop.makePromise(of: Void.self)
  140. self.sendRequest(.metadata([:]), promise: p1)
  141. let p2 = self.eventLoop.makePromise(of: Void.self)
  142. self.cancel(promise: p2)
  143. // Cancellation should succeed, and fail the write as a result.
  144. assertThat(try p2.futureResult.wait(), .doesNotThrow())
  145. assertThat(
  146. try p1.futureResult.wait(),
  147. .throws(.instanceOf(GRPCError.RPCCancelledByClient.self))
  148. )
  149. }
  150. func testCancelWhenActivating() throws {
  151. // Set up the transport, configure it and connect.
  152. // We use bidirectional streaming here so that we also flush after writing the metadata.
  153. self.setUpTransport(details: self.makeDetails(type: .bidirectionalStreaming)) { part in
  154. assertThat(part.error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
  155. }
  156. // Write a request. This will buffer.
  157. let writePromise1 = self.eventLoop.makePromise(of: Void.self)
  158. self.sendRequest(.metadata([:]), promise: writePromise1)
  159. // Chain a cancel from the first write promise.
  160. let cancelPromise = self.eventLoop.makePromise(of: Void.self)
  161. writePromise1.futureResult.whenSuccess {
  162. self.cancel(promise: cancelPromise)
  163. }
  164. // Enqueue a second write.
  165. let writePromise2 = self.eventLoop.makePromise(of: Void.self)
  166. self.sendRequest(.message("foo", .init(compress: false, flush: false)), promise: writePromise2)
  167. // Now we can configure and connect to trigger the unbuffering.
  168. // We don't actually want to record writes, by the recorder will fulfill promises as we catch
  169. // them; and we need that.
  170. self.configureTransport(additionalHandlers: [WriteRecorder<_GRPCClientRequestPart<String>>()])
  171. try self.connect()
  172. // The first write should succeed.
  173. assertThat(try writePromise1.futureResult.wait(), .doesNotThrow())
  174. // As should the cancellation.
  175. assertThat(try cancelPromise.futureResult.wait(), .doesNotThrow())
  176. // The second write should fail: the cancellation happened first.
  177. assertThat(
  178. try writePromise2.futureResult.wait(),
  179. .throws(.instanceOf(GRPCError.RPCCancelledByClient.self))
  180. )
  181. }
  182. func testCancelWhenActive() throws {
  183. // Set up the transport, configure it and connect. We'll record request parts in the `Channel`.
  184. let recorder = WriteRecorder<_GRPCClientRequestPart<String>>()
  185. self.setUpTransport()
  186. self.configureTransport(additionalHandlers: [recorder])
  187. try self.connect()
  188. // We should have an active transport now.
  189. self.sendRequest(.metadata([:]))
  190. self.sendRequest(.message("0", .init(compress: false, flush: false)))
  191. // We should have picked these parts up in the recorder.
  192. assertThat(recorder.writes, .hasCount(2))
  193. // Let's cancel now.
  194. let promise = self.eventLoop.makePromise(of: Void.self)
  195. self.cancel(promise: promise)
  196. // Cancellation should succeed.
  197. assertThat(try promise.futureResult.wait(), .doesNotThrow())
  198. }
  199. func testCancelWhenClosing() throws {
  200. self.setUpTransport()
  201. // Hold the configuration until we succeed the promise.
  202. let configuredPromise = self.eventLoop.makePromise(of: Void.self)
  203. self.configureTransport { handler in
  204. self.channel.pipeline.addHandler(handler).flatMap {
  205. configuredPromise.futureResult
  206. }
  207. }
  208. }
  209. func testCancelWhenClosed() throws {
  210. // Setup and close immediately.
  211. self.setUpTransport()
  212. self.configureTransport()
  213. try self.connect()
  214. assertThat(try self.channel.close().wait(), .doesNotThrow())
  215. // Let's cancel now.
  216. let promise = self.eventLoop.makePromise(of: Void.self)
  217. self.cancel(promise: promise)
  218. // Cancellation should fail, we're already closed.
  219. assertThat(
  220. try promise.futureResult.wait(),
  221. .throws(.instanceOf(GRPCError.AlreadyComplete.self))
  222. )
  223. }
  224. func testErrorWhenActive() throws {
  225. // Setup the transport, we only expect an error back.
  226. self.setUpTransport { part in
  227. assertThat(part.error, .is(.instanceOf(DummyError.self)))
  228. }
  229. // Configure and activate.
  230. self.configureTransport()
  231. try self.connect()
  232. // Send a request.
  233. let p1 = self.eventLoop.makePromise(of: Void.self)
  234. self.sendRequest(.metadata([:]), promise: p1)
  235. // The transport is for a unary call, so we need to send '.end' to emit a flush and for the
  236. // promise to be completed.
  237. self.sendRequest(.end, promise: nil)
  238. assertThat(try p1.futureResult.wait(), .doesNotThrow())
  239. // Fire an error back. (We'll see an error on the response handler.)
  240. self.channel.pipeline.fireErrorCaught(DummyError())
  241. // Writes should now fail, we're closed.
  242. let p2 = self.eventLoop.makePromise(of: Void.self)
  243. self.sendRequest(.end, promise: p2)
  244. assertThat(try p2.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
  245. }
  246. func testConfigurationFails() throws {
  247. self.setUpTransport { part in
  248. // We wrap the configuration error up, so we can't assert on its type.
  249. assertThat(part.error, .is(.notNil()))
  250. }
  251. let p1 = self.eventLoop.makePromise(of: Void.self)
  252. self.sendRequest(.metadata([:]), promise: p1)
  253. let p2 = self.eventLoop.makePromise(of: Void.self)
  254. self.sendRequest(.message("0", .init(compress: false, flush: false)), promise: p2)
  255. // Fail to configure the transport. Our promises should fail.
  256. self.configureTransport { _ in
  257. self.eventLoop.makeFailedFuture(DummyError())
  258. }
  259. // The promises should fail.
  260. assertThat(try p1.futureResult.wait(), .throws())
  261. assertThat(try p2.futureResult.wait(), .throws())
  262. // Cancellation should also fail because we're already closed.
  263. let p3 = self.eventLoop.makePromise(of: Void.self)
  264. self.transport.cancel(promise: p3)
  265. assertThat(try p3.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
  266. }
  267. }
  268. // MARK: - Helper Objects
  269. class WriteRecorder<Write>: ChannelOutboundHandler {
  270. typealias OutboundIn = Write
  271. var writes: [Write] = []
  272. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  273. self.writes.append(self.unwrapOutboundIn(data))
  274. promise?.succeed(())
  275. }
  276. }
  277. private struct DummyError: Error {}