ChannelTransportTests.swift 18 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ChannelTransportTests: GRPCTestCase {
  22. typealias Request = Echo_EchoRequest
  23. typealias RequestPart = _GRPCClientRequestPart<Request>
  24. typealias Response = Echo_EchoResponse
  25. typealias ResponsePart = _GRPCClientResponsePart<Response>
  26. private func makeEmbeddedTransport(
  27. channel: EmbeddedChannel,
  28. container: ResponsePartContainer<Response>,
  29. deadline: NIODeadline = .distantFuture
  30. ) -> ChannelTransport<Request, Response> {
  31. let transport = ChannelTransport<Request, Response>(
  32. eventLoop: channel.eventLoop,
  33. responseContainer: container,
  34. timeLimit: .deadline(deadline),
  35. errorDelegate: nil,
  36. logger: self.logger
  37. ) { call, promise in
  38. channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).whenComplete { result in
  39. switch result {
  40. case .success:
  41. promise.succeed(channel)
  42. case let .failure(error):
  43. promise.fail(error)
  44. }
  45. }
  46. }
  47. return transport
  48. }
  49. private func makeTransport(
  50. on eventLoop: EventLoop,
  51. container: ResponsePartContainer<Response>,
  52. deadline: NIODeadline = .distantFuture,
  53. channelProvider: @escaping (ChannelTransport<Request, Response>, EventLoopPromise<Channel>)
  54. -> Void
  55. ) -> ChannelTransport<Request, Response> {
  56. let transport = ChannelTransport<Request, Response>(
  57. eventLoop: eventLoop,
  58. responseContainer: container,
  59. timeLimit: .deadline(deadline),
  60. errorDelegate: nil,
  61. logger: self.logger,
  62. channelProvider: channelProvider
  63. )
  64. return transport
  65. }
  66. private func makeRequestHead() -> _GRPCRequestHead {
  67. return _GRPCRequestHead(
  68. method: "POST",
  69. scheme: "http",
  70. path: "/foo/bar",
  71. host: "localhost",
  72. deadline: .distantFuture,
  73. customMetadata: [:],
  74. encoding: .disabled
  75. )
  76. }
  77. private func makeRequest(_ text: String) -> _MessageContext<Request> {
  78. return _MessageContext(Request.with { $0.text = text }, compressed: false)
  79. }
  80. private func makeResponse(_ text: String) -> _MessageContext<Response> {
  81. return _MessageContext(Response.with { $0.text = text }, compressed: false)
  82. }
  83. // MARK: - Happy path
  84. func testUnaryHappyPath() throws {
  85. let channel = EmbeddedChannel()
  86. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  87. let container = ResponsePartContainer<Response>(
  88. eventLoop: channel.eventLoop,
  89. unaryResponsePromise: responsePromise
  90. )
  91. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  92. // Okay, let's send a unary request.
  93. transport.sendUnary(
  94. self.makeRequestHead(),
  95. request: .with { $0.text = "hello" },
  96. compressed: false
  97. )
  98. // We haven't activated yet so the transport should buffer the message.
  99. XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
  100. // Activate the channel.
  101. channel.pipeline.fireChannelActive()
  102. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  103. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  104. XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
  105. transport.receiveResponse(.initialMetadata([:]))
  106. transport.receiveResponse(.message(.init(.with { $0.text = "Hello!" }, compressed: false)))
  107. transport.receiveResponse(.trailingMetadata([:]))
  108. transport.receiveResponse(.status(.ok))
  109. XCTAssertNoThrow(
  110. try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  111. .wait()
  112. )
  113. XCTAssertNoThrow(try responsePromise.futureResult.wait())
  114. XCTAssertNoThrow(
  115. try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
  116. .wait()
  117. )
  118. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  119. }
  120. func testBidirectionalHappyPath() throws {
  121. let channel = EmbeddedChannel()
  122. let container = ResponsePartContainer<Response>(
  123. eventLoop: channel
  124. .eventLoop
  125. ) { (response: Response) in
  126. XCTFail("No response expected but got: \(response)")
  127. }
  128. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  129. // Okay, send the request. We'll do it before activating.
  130. transport.sendRequests([
  131. .head(self.makeRequestHead()),
  132. .message(self.makeRequest("1")),
  133. .message(self.makeRequest("2")),
  134. .message(self.makeRequest("3")),
  135. .end,
  136. ], promise: nil)
  137. // We haven't activated yet so the transport should buffer the messages.
  138. XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
  139. // Activate the channel.
  140. channel.pipeline.fireChannelActive()
  141. // Read the parts.
  142. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  143. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  144. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  145. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  146. XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
  147. // Write some responses.
  148. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
  149. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.trailingMetadata([:])))
  150. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.status(.ok)))
  151. // Check the responses.
  152. XCTAssertNoThrow(
  153. try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  154. .wait()
  155. )
  156. XCTAssertNoThrow(
  157. try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
  158. .wait()
  159. )
  160. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  161. }
  162. // MARK: - Timeout
  163. func testTimeoutBeforeActivating() throws {
  164. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  165. let channel = EmbeddedChannel()
  166. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  167. let container = ResponsePartContainer<Response>(
  168. eventLoop: channel.eventLoop,
  169. unaryResponsePromise: responsePromise
  170. )
  171. let transport = self.makeEmbeddedTransport(
  172. channel: channel,
  173. container: container,
  174. deadline: deadline
  175. )
  176. // Advance time beyond the timeout.
  177. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  178. XCTAssertThrowsError(
  179. try transport.responseContainer.lazyInitialMetadataPromise
  180. .getFutureResult().wait()
  181. )
  182. XCTAssertThrowsError(try responsePromise.futureResult.wait())
  183. XCTAssertThrowsError(
  184. try transport.responseContainer.lazyTrailingMetadataPromise
  185. .getFutureResult().wait()
  186. )
  187. XCTAssertEqual(
  188. try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
  189. .deadlineExceeded
  190. )
  191. // Writing should fail.
  192. let sendPromise = channel.eventLoop.makePromise(of: Void.self)
  193. transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
  194. XCTAssertThrowsError(try sendPromise.futureResult.wait())
  195. }
  196. func testTimeoutAfterActivating() throws {
  197. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  198. let channel = EmbeddedChannel()
  199. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  200. let container = ResponsePartContainer<Response>(
  201. eventLoop: channel.eventLoop,
  202. unaryResponsePromise: responsePromise
  203. )
  204. let transport = self.makeEmbeddedTransport(
  205. channel: channel,
  206. container: container,
  207. deadline: deadline
  208. )
  209. // Activate the channel.
  210. channel.pipeline.fireChannelActive()
  211. // Advance time beyond the timeout.
  212. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  213. XCTAssertThrowsError(
  214. try transport.responseContainer.lazyInitialMetadataPromise
  215. .getFutureResult().wait()
  216. )
  217. XCTAssertThrowsError(try responsePromise.futureResult.wait())
  218. XCTAssertThrowsError(
  219. try transport.responseContainer.lazyTrailingMetadataPromise
  220. .getFutureResult().wait()
  221. )
  222. XCTAssertEqual(
  223. try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
  224. .deadlineExceeded
  225. )
  226. // Writing should fail.
  227. let sendPromise = channel.eventLoop.makePromise(of: Void.self)
  228. transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
  229. XCTAssertThrowsError(try sendPromise.futureResult.wait())
  230. }
  231. func testTimeoutMidRPC() throws {
  232. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  233. let channel = EmbeddedChannel()
  234. let container = ResponsePartContainer<Response>(
  235. eventLoop: channel
  236. .eventLoop
  237. ) { (response: Response) in
  238. XCTFail("No response expected but got: \(response)")
  239. }
  240. let transport = self.makeEmbeddedTransport(
  241. channel: channel,
  242. container: container,
  243. deadline: deadline
  244. )
  245. // Activate the channel.
  246. channel.pipeline.fireChannelActive()
  247. // Okay, send some requests.
  248. transport.sendRequests([
  249. .head(self.makeRequestHead()),
  250. .message(self.makeRequest("1")),
  251. ], promise: nil)
  252. // Read the parts.
  253. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  254. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  255. // We'll send back the initial metadata.
  256. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
  257. XCTAssertNoThrow(
  258. try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  259. .wait()
  260. )
  261. // Advance time beyond the timeout.
  262. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  263. // Check the remaining response parts.
  264. XCTAssertThrowsError(
  265. try transport.responseContainer.lazyTrailingMetadataPromise
  266. .getFutureResult().wait()
  267. )
  268. XCTAssertEqual(
  269. try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
  270. .deadlineExceeded
  271. )
  272. }
  273. // MARK: - Channel errors
  274. func testChannelBecomesInactive() throws {
  275. let channel = EmbeddedChannel()
  276. let container = ResponsePartContainer<Response>(
  277. eventLoop: channel
  278. .eventLoop
  279. ) { (response: Response) in
  280. XCTFail("No response expected but got: \(response)")
  281. }
  282. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  283. // Activate and deactivate the channel.
  284. channel.pipeline.fireChannelActive()
  285. channel.pipeline.fireChannelInactive()
  286. // Everything should fail.
  287. XCTAssertThrowsError(
  288. try transport.responseContainer.lazyInitialMetadataPromise
  289. .getFutureResult().wait()
  290. )
  291. XCTAssertThrowsError(
  292. try transport.responseContainer.lazyTrailingMetadataPromise
  293. .getFutureResult().wait()
  294. )
  295. // Except the status, that will never fail.
  296. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  297. }
  298. func testChannelError() throws {
  299. let channel = EmbeddedChannel()
  300. let container = ResponsePartContainer<Response>(
  301. eventLoop: channel
  302. .eventLoop
  303. ) { (response: Response) in
  304. XCTFail("No response expected but got: \(response)")
  305. }
  306. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  307. // Activate the channel.
  308. channel.pipeline.fireChannelActive()
  309. // Fire an error.
  310. channel.pipeline.fireErrorCaught(GRPCStatus.processingError)
  311. // Everything should fail.
  312. XCTAssertThrowsError(
  313. try transport.responseContainer.lazyInitialMetadataPromise
  314. .getFutureResult().wait()
  315. )
  316. XCTAssertThrowsError(
  317. try transport.responseContainer.lazyTrailingMetadataPromise
  318. .getFutureResult().wait()
  319. )
  320. // Except the status, that will never fail.
  321. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  322. }
  323. func testChannelFutureError() throws {
  324. let channel = EmbeddedChannel()
  325. let container = ResponsePartContainer<Response>(eventLoop: channel.eventLoop) {
  326. XCTFail("No response expected but got: \($0)")
  327. }
  328. struct DoomedChannelError: Error {}
  329. let transport = self.makeTransport(on: channel.eventLoop, container: container) { _, promise in
  330. promise.fail(GRPCStatus(code: .unavailable, message: "\(DoomedChannelError())"))
  331. }
  332. let status = try transport.responseContainer.lazyStatusPromise.getFutureResult().wait()
  333. XCTAssertEqual(status, GRPCStatus(code: .unavailable, message: "\(DoomedChannelError())"))
  334. }
  335. // MARK: - Test Transport after Shutdown
  336. func testOutboundMethodsAfterShutdown() throws {
  337. let channel = EmbeddedChannel()
  338. let container = ResponsePartContainer<Response>(
  339. eventLoop: channel
  340. .eventLoop
  341. ) { (response: Response) in
  342. XCTFail("No response expected but got: \(response)")
  343. }
  344. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  345. // Close the channel.
  346. XCTAssertNoThrow(try channel.close().wait())
  347. // Sending should fail.
  348. let sendRequestPromise = channel.eventLoop.makePromise(of: Void.self)
  349. transport.sendRequest(.head(self.makeRequestHead()), promise: sendRequestPromise)
  350. XCTAssertThrowsError(try sendRequestPromise.futureResult.wait()) { error in
  351. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  352. }
  353. // Sending many should fail.
  354. let sendRequestsPromise = channel.eventLoop.makePromise(of: Void.self)
  355. transport.sendRequests([.end], promise: sendRequestsPromise)
  356. XCTAssertThrowsError(try sendRequestsPromise.futureResult.wait()) { error in
  357. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  358. }
  359. // Cancelling should fail.
  360. let cancelPromise = channel.eventLoop.makePromise(of: Void.self)
  361. transport.cancel(promise: cancelPromise)
  362. XCTAssertThrowsError(try cancelPromise.futureResult.wait()) { error in
  363. XCTAssertEqual(error as? ChannelError, ChannelError.alreadyClosed)
  364. }
  365. let channelFuture = transport.streamChannel()
  366. XCTAssertThrowsError(try channelFuture.wait()) { error in
  367. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  368. }
  369. }
  370. func testInboundMethodsAfterShutdown() throws {
  371. let channel = EmbeddedChannel()
  372. let container = ResponsePartContainer<Response>(
  373. eventLoop: channel
  374. .eventLoop
  375. ) { (response: Response) in
  376. XCTFail("No response expected but got: \(response)")
  377. }
  378. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  379. // Close the channel.
  380. XCTAssertNoThrow(try channel.close().wait())
  381. // We'll fail the handler in the container if this one is received.
  382. transport.receiveResponse(.message(self.makeResponse("ignored!")))
  383. transport.receiveError(GRPCStatus.processingError)
  384. }
  385. func testBufferedWritesAreFailedOnClose() throws {
  386. let channel = EmbeddedChannel()
  387. let container = ResponsePartContainer<Response>(
  388. eventLoop: channel
  389. .eventLoop
  390. ) { (response: Response) in
  391. XCTFail("No response expected but got: \(response)")
  392. }
  393. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  394. let requestHeadPromise = channel.eventLoop.makePromise(of: Void.self)
  395. transport.sendRequest(.head(self.makeRequestHead()), promise: requestHeadPromise)
  396. // Close the channel.
  397. XCTAssertNoThrow(try channel.close().wait())
  398. // Promise should fail.
  399. XCTAssertThrowsError(try requestHeadPromise.futureResult.wait())
  400. }
  401. func testErrorsAreNotAlwaysStatus() throws {
  402. let channel = EmbeddedChannel()
  403. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  404. let container = ResponsePartContainer<Response>(
  405. eventLoop: channel.eventLoop,
  406. unaryResponsePromise: responsePromise
  407. )
  408. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  409. transport.activate(stream: channel)
  410. // Send an error
  411. transport.receiveError(GRPCError.RPCCancelledByClient())
  412. XCTAssertThrowsError(
  413. try transport.responseContainer.lazyInitialMetadataPromise
  414. .getFutureResult().wait()
  415. ) { error in
  416. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  417. }
  418. XCTAssertThrowsError(
  419. try transport.responseContainer.lazyTrailingMetadataPromise
  420. .getFutureResult().wait()
  421. ) { error in
  422. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  423. }
  424. XCTAssertThrowsError(try responsePromise.futureResult.wait()) { error in
  425. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  426. }
  427. // Status never fails.
  428. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  429. }
  430. }
  431. extension _GRPCClientRequestPart {
  432. var requestHead: _GRPCRequestHead? {
  433. switch self {
  434. case let .head(head):
  435. return head
  436. case .message, .end:
  437. return nil
  438. }
  439. }
  440. var message: Request? {
  441. switch self {
  442. case let .message(message):
  443. return message.message
  444. case .head, .end:
  445. return nil
  446. }
  447. }
  448. var isEnd: Bool {
  449. switch self {
  450. case .end:
  451. return true
  452. case .head, .message:
  453. return false
  454. }
  455. }
  456. }