ChannelTransportTests.swift 16 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import EchoModel
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ChannelTransportTests: GRPCTestCase {
  22. typealias Request = Echo_EchoRequest
  23. typealias RequestPart = _GRPCClientRequestPart<Request>
  24. typealias Response = Echo_EchoResponse
  25. typealias ResponsePart = _GRPCClientResponsePart<Response>
  26. private func makeEmbeddedTransport(
  27. channel: EmbeddedChannel,
  28. container: ResponsePartContainer<Response>,
  29. deadline: NIODeadline = .distantFuture
  30. ) -> ChannelTransport<Request, Response> {
  31. let transport = ChannelTransport<Request, Response>(
  32. eventLoop: channel.eventLoop,
  33. responseContainer: container,
  34. timeLimit: .deadline(deadline),
  35. errorDelegate: nil,
  36. logger: self.logger
  37. ) { call, promise in
  38. channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).whenComplete { result in
  39. switch result {
  40. case .success:
  41. promise.succeed(channel)
  42. case .failure(let error):
  43. promise.fail(error)
  44. }
  45. }
  46. }
  47. return transport
  48. }
  49. private func makeRequestHead() -> _GRPCRequestHead {
  50. return _GRPCRequestHead(
  51. method: "POST",
  52. scheme: "http",
  53. path: "/foo/bar",
  54. host: "localhost",
  55. deadline: .distantFuture,
  56. customMetadata: [:],
  57. encoding: .disabled
  58. )
  59. }
  60. private func makeRequest(_ text: String) -> _MessageContext<Request> {
  61. return _MessageContext(Request.with { $0.text = text }, compressed: false)
  62. }
  63. private func makeResponse(_ text: String) -> _MessageContext<Response> {
  64. return _MessageContext(Response.with { $0.text = text }, compressed: false)
  65. }
  66. // MARK: - Happy path
  67. func testUnaryHappyPath() throws {
  68. let channel = EmbeddedChannel()
  69. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  70. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop, unaryResponsePromise: responsePromise)
  71. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  72. // Okay, let's send a unary request.
  73. transport.sendUnary(self.makeRequestHead(), request: .with { $0.text = "hello" }, compressed: false)
  74. // We haven't activated yet so the transport should buffer the message.
  75. XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
  76. // Activate the channel.
  77. channel.pipeline.fireChannelActive()
  78. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  79. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  80. XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
  81. transport.receiveResponse(.initialMetadata([:]))
  82. transport.receiveResponse(.message(.init(.with { $0.text = "Hello!" }, compressed: false)))
  83. transport.receiveResponse(.trailingMetadata([:]))
  84. transport.receiveResponse(.status(.ok))
  85. XCTAssertNoThrow(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  86. XCTAssertNoThrow(try responsePromise.futureResult.wait())
  87. XCTAssertNoThrow(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  88. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  89. }
  90. func testBidirectionalHappyPath() throws {
  91. let channel = EmbeddedChannel()
  92. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  93. XCTFail("No response expected but got: \(response)")
  94. }
  95. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  96. // Okay, send the request. We'll do it before activating.
  97. transport.sendRequests([
  98. .head(self.makeRequestHead()),
  99. .message(self.makeRequest("1")),
  100. .message(self.makeRequest("2")),
  101. .message(self.makeRequest("3")),
  102. .end
  103. ], promise: nil)
  104. // We haven't activated yet so the transport should buffer the messages.
  105. XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
  106. // Activate the channel.
  107. channel.pipeline.fireChannelActive()
  108. // Read the parts.
  109. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  110. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  111. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  112. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  113. XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
  114. // Write some responses.
  115. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
  116. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.trailingMetadata([:])))
  117. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.status(.ok)))
  118. // Check the responses.
  119. XCTAssertNoThrow(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  120. XCTAssertNoThrow(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  121. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  122. }
  123. // MARK: - Timeout
  124. func testTimeoutBeforeActivating() throws {
  125. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  126. let channel = EmbeddedChannel()
  127. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  128. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop, unaryResponsePromise: responsePromise)
  129. let transport = self.makeEmbeddedTransport(channel: channel, container: container, deadline: deadline)
  130. // Advance time beyond the timeout.
  131. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  132. XCTAssertThrowsError(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  133. XCTAssertThrowsError(try responsePromise.futureResult.wait())
  134. XCTAssertThrowsError(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  135. XCTAssertEqual(try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(), .deadlineExceeded)
  136. // Writing should fail.
  137. let sendPromise = channel.eventLoop.makePromise(of: Void.self)
  138. transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
  139. XCTAssertThrowsError(try sendPromise.futureResult.wait())
  140. }
  141. func testTimeoutAfterActivating() throws {
  142. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  143. let channel = EmbeddedChannel()
  144. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  145. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop, unaryResponsePromise: responsePromise)
  146. let transport = self.makeEmbeddedTransport(channel: channel, container: container, deadline: deadline)
  147. // Activate the channel.
  148. channel.pipeline.fireChannelActive()
  149. // Advance time beyond the timeout.
  150. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  151. XCTAssertThrowsError(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  152. XCTAssertThrowsError(try responsePromise.futureResult.wait())
  153. XCTAssertThrowsError(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  154. XCTAssertEqual(try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(), .deadlineExceeded)
  155. // Writing should fail.
  156. let sendPromise = channel.eventLoop.makePromise(of: Void.self)
  157. transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
  158. XCTAssertThrowsError(try sendPromise.futureResult.wait())
  159. }
  160. func testTimeoutMidRPC() throws {
  161. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  162. let channel = EmbeddedChannel()
  163. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  164. XCTFail("No response expected but got: \(response)")
  165. }
  166. let transport = self.makeEmbeddedTransport(channel: channel, container: container, deadline: deadline)
  167. // Activate the channel.
  168. channel.pipeline.fireChannelActive()
  169. // Okay, send some requests.
  170. transport.sendRequests([
  171. .head(self.makeRequestHead()),
  172. .message(self.makeRequest("1"))
  173. ], promise: nil)
  174. // Read the parts.
  175. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  176. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  177. // We'll send back the initial metadata.
  178. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
  179. XCTAssertNoThrow(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  180. // Advance time beyond the timeout.
  181. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  182. // Check the remaining response parts.
  183. XCTAssertThrowsError(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  184. XCTAssertEqual(try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(), .deadlineExceeded)
  185. }
  186. // MARK: - Channel errors
  187. func testChannelBecomesInactive() throws {
  188. let channel = EmbeddedChannel()
  189. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  190. XCTFail("No response expected but got: \(response)")
  191. }
  192. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  193. // Activate and deactivate the channel.
  194. channel.pipeline.fireChannelActive()
  195. channel.pipeline.fireChannelInactive()
  196. // Everything should fail.
  197. XCTAssertThrowsError(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  198. XCTAssertThrowsError(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  199. // Except the status, that will never fail.
  200. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  201. }
  202. func testChannelError() throws {
  203. let channel = EmbeddedChannel()
  204. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  205. XCTFail("No response expected but got: \(response)")
  206. }
  207. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  208. // Activate the channel.
  209. channel.pipeline.fireChannelActive()
  210. // Fire an error.
  211. channel.pipeline.fireErrorCaught(GRPCStatus.processingError)
  212. // Everything should fail.
  213. XCTAssertThrowsError(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait())
  214. XCTAssertThrowsError(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait())
  215. // Except the status, that will never fail.
  216. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  217. }
  218. // MARK: - Test Transport after Shutdown
  219. func testOutboundMethodsAfterShutdown() throws {
  220. let channel = EmbeddedChannel()
  221. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  222. XCTFail("No response expected but got: \(response)")
  223. }
  224. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  225. // Close the channel.
  226. XCTAssertNoThrow(try channel.close().wait())
  227. // Sending should fail.
  228. let sendRequestPromise = channel.eventLoop.makePromise(of: Void.self)
  229. transport.sendRequest(.head(self.makeRequestHead()), promise: sendRequestPromise)
  230. XCTAssertThrowsError(try sendRequestPromise.futureResult.wait()) { error in
  231. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  232. }
  233. // Sending many should fail.
  234. let sendRequestsPromise = channel.eventLoop.makePromise(of: Void.self)
  235. transport.sendRequests([.end], promise: sendRequestsPromise)
  236. XCTAssertThrowsError(try sendRequestsPromise.futureResult.wait()) { error in
  237. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  238. }
  239. // Cancelling should fail.
  240. let cancelPromise = channel.eventLoop.makePromise(of: Void.self)
  241. transport.cancel(promise: cancelPromise)
  242. XCTAssertThrowsError(try cancelPromise.futureResult.wait()) { error in
  243. XCTAssertEqual(error as? ChannelError, ChannelError.alreadyClosed)
  244. }
  245. let channelFuture = transport.streamChannel()
  246. XCTAssertThrowsError(try channelFuture.wait()) { error in
  247. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  248. }
  249. }
  250. func testInboundMethodsAfterShutdown() throws {
  251. let channel = EmbeddedChannel()
  252. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  253. XCTFail("No response expected but got: \(response)")
  254. }
  255. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  256. // Close the channel.
  257. XCTAssertNoThrow(try channel.close().wait())
  258. // We'll fail the handler in the container if this one is received.
  259. transport.receiveResponse(.message(self.makeResponse("ignored!")))
  260. transport.receiveError(GRPCStatus.processingError)
  261. }
  262. func testBufferedWritesAreFailedOnClose() throws {
  263. let channel = EmbeddedChannel()
  264. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) { (response: Response) in
  265. XCTFail("No response expected but got: \(response)")
  266. }
  267. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  268. let requestHeadPromise = channel.eventLoop.makePromise(of: Void.self)
  269. transport.sendRequest(.head(self.makeRequestHead()), promise: requestHeadPromise)
  270. // Close the channel.
  271. XCTAssertNoThrow(try channel.close().wait())
  272. // Promise should fail.
  273. XCTAssertThrowsError(try requestHeadPromise.futureResult.wait())
  274. }
  275. func testErrorsAreNotAlwaysStatus() throws {
  276. let channel = EmbeddedChannel()
  277. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  278. let container = ResponsePartContainer<Response>(
  279. eventLoop: channel.eventLoop,
  280. unaryResponsePromise: responsePromise
  281. )
  282. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  283. transport.activate(stream: channel)
  284. // Send an error
  285. transport.receiveError(GRPCError.RPCCancelledByClient())
  286. XCTAssertThrowsError(try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult().wait()) { error in
  287. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  288. }
  289. XCTAssertThrowsError(try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult().wait()) { error in
  290. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  291. }
  292. XCTAssertThrowsError(try responsePromise.futureResult.wait()) { error in
  293. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  294. }
  295. // Status never fails.
  296. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  297. }
  298. }
  299. extension _GRPCClientRequestPart {
  300. var requestHead: _GRPCRequestHead? {
  301. switch self {
  302. case .head(let head):
  303. return head
  304. case .message, .end:
  305. return nil
  306. }
  307. }
  308. var message: Request? {
  309. switch self {
  310. case .message(let message):
  311. return message.message
  312. case .head, .end:
  313. return nil
  314. }
  315. }
  316. var isEnd: Bool {
  317. switch self {
  318. case .end:
  319. return true
  320. case .head, .message:
  321. return false
  322. }
  323. }
  324. }