ClientTransportTests.swift 11 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import XCTest
  19. class ClientTransportTests: GRPCTestCase {
  20. override func setUp() {
  21. super.setUp()
  22. self.channel = EmbeddedChannel()
  23. }
  24. // MARK: - Setup Helpers
  25. private func makeDetails(type: GRPCCallType = .unary) -> CallDetails {
  26. return CallDetails(
  27. type: type,
  28. path: "/echo.Echo/Get",
  29. authority: "localhost",
  30. scheme: "https",
  31. options: .init(logger: self.logger)
  32. )
  33. }
  34. private var channel: EmbeddedChannel!
  35. private var transport: ClientTransport<String, String>!
  36. private var eventLoop: EventLoop {
  37. return self.channel.eventLoop
  38. }
  39. private func setUpTransport(
  40. details: CallDetails? = nil,
  41. interceptors: [ClientInterceptor<String, String>] = [],
  42. onError: @escaping (Error) -> Void = { _ in },
  43. onResponsePart: @escaping (GRPCClientResponsePart<String>) -> Void = { _ in }
  44. ) {
  45. self.transport = .init(
  46. details: details ?? self.makeDetails(),
  47. eventLoop: self.eventLoop,
  48. interceptors: interceptors,
  49. errorDelegate: nil,
  50. onError: onError,
  51. onResponsePart: onResponsePart
  52. )
  53. }
  54. private func configureTransport(additionalHandlers handlers: [ChannelHandler] = []) {
  55. self.transport.configure {
  56. var handlers = handlers
  57. handlers.append($0)
  58. return self.channel.pipeline.addHandlers(handlers)
  59. }
  60. }
  61. private func configureTransport(_ body: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
  62. self.transport.configure(body)
  63. }
  64. private func connect(file: StaticString = #file, line: UInt = #line) throws {
  65. let address = try assertNoThrow(SocketAddress(unixDomainSocketPath: "/whatever"))
  66. assertThat(
  67. try self.channel.connect(to: address).wait(),
  68. .doesNotThrow(),
  69. file: file,
  70. line: line
  71. )
  72. }
  73. private func sendRequest(
  74. _ part: GRPCClientRequestPart<String>,
  75. promise: EventLoopPromise<Void>? = nil
  76. ) {
  77. self.transport.send(part, promise: promise)
  78. }
  79. private func cancel(promise: EventLoopPromise<Void>? = nil) {
  80. self.transport.cancel(promise: promise)
  81. }
  82. private func sendResponse(
  83. _ part: _GRPCClientResponsePart<String>,
  84. file: StaticString = #file,
  85. line: UInt = #line
  86. ) throws {
  87. assertThat(try self.channel.writeInbound(part), .doesNotThrow(), file: file, line: line)
  88. }
  89. }
  90. // MARK: - Tests
  91. extension ClientTransportTests {
  92. func testUnaryFlow() throws {
  93. let recorder = WriteRecorder<_GRPCClientRequestPart<String>>()
  94. let recorderInterceptor = RecordingInterceptor<String, String>()
  95. self.setUpTransport(interceptors: [recorderInterceptor])
  96. // Buffer up some parts.
  97. self.sendRequest(.metadata([:]))
  98. self.sendRequest(.message("0", .init(compress: false, flush: false)))
  99. // Configure the transport and connect. This will unbuffer the parts.
  100. self.configureTransport(additionalHandlers: [recorder])
  101. try self.connect()
  102. // Send the end, this shouldn't require buffering.
  103. self.sendRequest(.end)
  104. // We should have recorded 3 parts in the 'Channel' now.
  105. assertThat(recorder.writes, .hasCount(3))
  106. // Write some responses.
  107. try self.sendResponse(.initialMetadata([:]))
  108. try self.sendResponse(.message(.init("1", compressed: false)))
  109. try self.sendResponse(.trailingMetadata([:]))
  110. try self.sendResponse(.status(.ok))
  111. // The recording interceptor should now have three parts.
  112. assertThat(recorderInterceptor.responseParts, .hasCount(3))
  113. }
  114. func testCancelWhenIdle() throws {
  115. // Set up the transport, configure it and connect.
  116. self.setUpTransport(onError: { error in
  117. assertThat(error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
  118. })
  119. // Cancellation should succeed.
  120. let promise = self.eventLoop.makePromise(of: Void.self)
  121. self.cancel(promise: promise)
  122. assertThat(try promise.futureResult.wait(), .doesNotThrow())
  123. }
  124. func testCancelWhenAwaitingTransport() throws {
  125. // Set up the transport, configure it and connect.
  126. self.setUpTransport(onError: { error in
  127. assertThat(error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
  128. })
  129. // Start configuring the transport.
  130. let transportActivatedPromise = self.eventLoop.makePromise(of: Void.self)
  131. // Let's not leak this.
  132. defer {
  133. transportActivatedPromise.succeed(())
  134. }
  135. self.configureTransport { handler in
  136. self.channel.pipeline.addHandler(handler).flatMap {
  137. transportActivatedPromise.futureResult
  138. }
  139. }
  140. // Write a request.
  141. let p1 = self.eventLoop.makePromise(of: Void.self)
  142. self.sendRequest(.metadata([:]), promise: p1)
  143. let p2 = self.eventLoop.makePromise(of: Void.self)
  144. self.cancel(promise: p2)
  145. // Cancellation should succeed, and fail the write as a result.
  146. assertThat(try p2.futureResult.wait(), .doesNotThrow())
  147. assertThat(
  148. try p1.futureResult.wait(),
  149. .throws(.instanceOf(GRPCError.RPCCancelledByClient.self))
  150. )
  151. }
  152. func testCancelWhenActivating() throws {
  153. // Set up the transport, configure it and connect.
  154. // We use bidirectional streaming here so that we also flush after writing the metadata.
  155. self.setUpTransport(
  156. details: self.makeDetails(type: .bidirectionalStreaming),
  157. onError: { error in
  158. assertThat(error, .is(.instanceOf(GRPCError.RPCCancelledByClient.self)))
  159. }
  160. )
  161. // Write a request. This will buffer.
  162. let writePromise1 = self.eventLoop.makePromise(of: Void.self)
  163. self.sendRequest(.metadata([:]), promise: writePromise1)
  164. // Chain a cancel from the first write promise.
  165. let cancelPromise = self.eventLoop.makePromise(of: Void.self)
  166. writePromise1.futureResult.whenSuccess {
  167. self.cancel(promise: cancelPromise)
  168. }
  169. // Enqueue a second write.
  170. let writePromise2 = self.eventLoop.makePromise(of: Void.self)
  171. self.sendRequest(.message("foo", .init(compress: false, flush: false)), promise: writePromise2)
  172. // Now we can configure and connect to trigger the unbuffering.
  173. // We don't actually want to record writes, by the recorder will fulfill promises as we catch
  174. // them; and we need that.
  175. self.configureTransport(additionalHandlers: [WriteRecorder<_GRPCClientRequestPart<String>>()])
  176. try self.connect()
  177. // The first write should succeed.
  178. assertThat(try writePromise1.futureResult.wait(), .doesNotThrow())
  179. // As should the cancellation.
  180. assertThat(try cancelPromise.futureResult.wait(), .doesNotThrow())
  181. // The second write should fail: the cancellation happened first.
  182. assertThat(
  183. try writePromise2.futureResult.wait(),
  184. .throws(.instanceOf(GRPCError.RPCCancelledByClient.self))
  185. )
  186. }
  187. func testCancelWhenActive() throws {
  188. // Set up the transport, configure it and connect. We'll record request parts in the `Channel`.
  189. let recorder = WriteRecorder<_GRPCClientRequestPart<String>>()
  190. self.setUpTransport()
  191. self.configureTransport(additionalHandlers: [recorder])
  192. try self.connect()
  193. // We should have an active transport now.
  194. self.sendRequest(.metadata([:]))
  195. self.sendRequest(.message("0", .init(compress: false, flush: false)))
  196. // We should have picked these parts up in the recorder.
  197. assertThat(recorder.writes, .hasCount(2))
  198. // Let's cancel now.
  199. let promise = self.eventLoop.makePromise(of: Void.self)
  200. self.cancel(promise: promise)
  201. // Cancellation should succeed.
  202. assertThat(try promise.futureResult.wait(), .doesNotThrow())
  203. }
  204. func testCancelWhenClosing() throws {
  205. self.setUpTransport()
  206. // Hold the configuration until we succeed the promise.
  207. let configuredPromise = self.eventLoop.makePromise(of: Void.self)
  208. self.configureTransport { handler in
  209. self.channel.pipeline.addHandler(handler).flatMap {
  210. configuredPromise.futureResult
  211. }
  212. }
  213. }
  214. func testCancelWhenClosed() throws {
  215. // Setup and close immediately.
  216. self.setUpTransport()
  217. self.configureTransport()
  218. try self.connect()
  219. assertThat(try self.channel.close().wait(), .doesNotThrow())
  220. // Let's cancel now.
  221. let promise = self.eventLoop.makePromise(of: Void.self)
  222. self.cancel(promise: promise)
  223. // Cancellation should fail, we're already closed.
  224. assertThat(
  225. try promise.futureResult.wait(),
  226. .throws(.instanceOf(GRPCError.AlreadyComplete.self))
  227. )
  228. }
  229. func testErrorWhenActive() throws {
  230. // Setup the transport, we only expect an error back.
  231. self.setUpTransport(onError: { error in
  232. assertThat(error, .is(.instanceOf(DummyError.self)))
  233. })
  234. // Configure and activate.
  235. self.configureTransport()
  236. try self.connect()
  237. // Send a request.
  238. let p1 = self.eventLoop.makePromise(of: Void.self)
  239. self.sendRequest(.metadata([:]), promise: p1)
  240. // The transport is for a unary call, so we need to send '.end' to emit a flush and for the
  241. // promise to be completed.
  242. self.sendRequest(.end, promise: nil)
  243. assertThat(try p1.futureResult.wait(), .doesNotThrow())
  244. // Fire an error back. (We'll see an error on the response handler.)
  245. self.channel.pipeline.fireErrorCaught(DummyError())
  246. // Writes should now fail, we're closed.
  247. let p2 = self.eventLoop.makePromise(of: Void.self)
  248. self.sendRequest(.end, promise: p2)
  249. assertThat(try p2.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
  250. }
  251. func testConfigurationFails() throws {
  252. self.setUpTransport()
  253. let p1 = self.eventLoop.makePromise(of: Void.self)
  254. self.sendRequest(.metadata([:]), promise: p1)
  255. let p2 = self.eventLoop.makePromise(of: Void.self)
  256. self.sendRequest(.message("0", .init(compress: false, flush: false)), promise: p2)
  257. // Fail to configure the transport. Our promises should fail.
  258. self.configureTransport { _ in
  259. self.eventLoop.makeFailedFuture(DummyError())
  260. }
  261. // The promises should fail.
  262. assertThat(try p1.futureResult.wait(), .throws())
  263. assertThat(try p2.futureResult.wait(), .throws())
  264. // Cancellation should also fail because we're already closed.
  265. let p3 = self.eventLoop.makePromise(of: Void.self)
  266. self.transport.cancel(promise: p3)
  267. assertThat(try p3.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
  268. }
  269. }
  270. // MARK: - Helper Objects
  271. class WriteRecorder<Write>: ChannelOutboundHandler {
  272. typealias OutboundIn = Write
  273. var writes: [Write] = []
  274. func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
  275. self.writes.append(self.unwrapOutboundIn(data))
  276. promise?.succeed(())
  277. }
  278. }
  279. private struct DummyError: Error {}