ChannelTransportTests.swift 16 KB


  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoModel
  17. @testable import GRPC
  18. import NIO
  19. import NIOHTTP2
  20. import XCTest
  21. class ChannelTransportTests: GRPCTestCase {
  22. typealias Request = Echo_EchoRequest
  23. typealias RequestPart = _GRPCClientRequestPart<Request>
  24. typealias Response = Echo_EchoResponse
  25. typealias ResponsePart = _GRPCClientResponsePart<Response>
  26. private func makeEmbeddedTransport(
  27. channel: EmbeddedChannel,
  28. container: ResponsePartContainer<Response>,
  29. deadline: NIODeadline = .distantFuture
  30. ) -> ChannelTransport<Request, Response> {
  31. let transport = ChannelTransport<Request, Response>(
  32. eventLoop: channel.eventLoop,
  33. responseContainer: container,
  34. timeLimit: .deadline(deadline),
  35. errorDelegate: nil,
  36. logger: self.logger
  37. ) { call, promise in
  38. channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).whenComplete { result in
  39. switch result {
  40. case .success:
  41. promise.succeed(channel)
  42. case let .failure(error):
  43. promise.fail(error)
  44. }
  45. }
  46. }
  47. return transport
  48. }
  49. private func makeRequestHead() -> _GRPCRequestHead {
  50. return _GRPCRequestHead(
  51. method: "POST",
  52. scheme: "http",
  53. path: "/foo/bar",
  54. host: "localhost",
  55. deadline: .distantFuture,
  56. customMetadata: [:],
  57. encoding: .disabled
  58. )
  59. }
  60. private func makeRequest(_ text: String) -> _MessageContext<Request> {
  61. return _MessageContext(Request.with { $0.text = text }, compressed: false)
  62. }
  63. private func makeResponse(_ text: String) -> _MessageContext<Response> {
  64. return _MessageContext(Response.with { $0.text = text }, compressed: false)
  65. }
  66. // MARK: - Happy path
  67. func testUnaryHappyPath() throws {
  68. let channel = EmbeddedChannel()
  69. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  70. let container = ResponsePartContainer<Response>(
  71. eventLoop: channel.eventLoop,
  72. unaryResponsePromise: responsePromise
  73. )
  74. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  75. // Okay, let's send a unary request.
  76. transport.sendUnary(
  77. self.makeRequestHead(),
  78. request: .with { $0.text = "hello" },
  79. compressed: false
  80. )
  81. // We haven't activated yet so the transport should buffer the message.
  82. XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
  83. // Activate the channel.
  84. channel.pipeline.fireChannelActive()
  85. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  86. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  87. XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
  88. transport.receiveResponse(.initialMetadata([:]))
  89. transport.receiveResponse(.message(.init(.with { $0.text = "Hello!" }, compressed: false)))
  90. transport.receiveResponse(.trailingMetadata([:]))
  91. transport.receiveResponse(.status(.ok))
  92. XCTAssertNoThrow(
  93. try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  94. .wait()
  95. )
  96. XCTAssertNoThrow(try responsePromise.futureResult.wait())
  97. XCTAssertNoThrow(
  98. try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
  99. .wait()
  100. )
  101. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  102. }
  103. func testBidirectionalHappyPath() throws {
  104. let channel = EmbeddedChannel()
  105. let container = ResponsePartContainer<Response>(
  106. eventLoop: channel
  107. .eventLoop
  108. ) { (response: Response) in
  109. XCTFail("No response expected but got: \(response)")
  110. }
  111. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  112. // Okay, send the request. We'll do it before activating.
  113. transport.sendRequests([
  114. .head(self.makeRequestHead()),
  115. .message(self.makeRequest("1")),
  116. .message(self.makeRequest("2")),
  117. .message(self.makeRequest("3")),
  118. .end,
  119. ], promise: nil)
  120. // We haven't activated yet so the transport should buffer the messages.
  121. XCTAssertNil(try channel.readOutbound(as: _GRPCClientRequestPart<Request>.self))
  122. // Activate the channel.
  123. channel.pipeline.fireChannelActive()
  124. // Read the parts.
  125. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  126. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  127. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  128. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  129. XCTAssertTrue(try channel.readOutbound(as: RequestPart.self)?.isEnd ?? false)
  130. // Write some responses.
  131. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
  132. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.trailingMetadata([:])))
  133. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.status(.ok)))
  134. // Check the responses.
  135. XCTAssertNoThrow(
  136. try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  137. .wait()
  138. )
  139. XCTAssertNoThrow(
  140. try transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
  141. .wait()
  142. )
  143. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  144. }
  145. // MARK: - Timeout
  146. func testTimeoutBeforeActivating() throws {
  147. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  148. let channel = EmbeddedChannel()
  149. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  150. let container = ResponsePartContainer<Response>(
  151. eventLoop: channel.eventLoop,
  152. unaryResponsePromise: responsePromise
  153. )
  154. let transport = self.makeEmbeddedTransport(
  155. channel: channel,
  156. container: container,
  157. deadline: deadline
  158. )
  159. // Advance time beyond the timeout.
  160. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  161. XCTAssertThrowsError(
  162. try transport.responseContainer.lazyInitialMetadataPromise
  163. .getFutureResult().wait()
  164. )
  165. XCTAssertThrowsError(try responsePromise.futureResult.wait())
  166. XCTAssertThrowsError(
  167. try transport.responseContainer.lazyTrailingMetadataPromise
  168. .getFutureResult().wait()
  169. )
  170. XCTAssertEqual(
  171. try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
  172. .deadlineExceeded
  173. )
  174. // Writing should fail.
  175. let sendPromise = channel.eventLoop.makePromise(of: Void.self)
  176. transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
  177. XCTAssertThrowsError(try sendPromise.futureResult.wait())
  178. }
  179. func testTimeoutAfterActivating() throws {
  180. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  181. let channel = EmbeddedChannel()
  182. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  183. let container = ResponsePartContainer<Response>(
  184. eventLoop: channel.eventLoop,
  185. unaryResponsePromise: responsePromise
  186. )
  187. let transport = self.makeEmbeddedTransport(
  188. channel: channel,
  189. container: container,
  190. deadline: deadline
  191. )
  192. // Activate the channel.
  193. channel.pipeline.fireChannelActive()
  194. // Advance time beyond the timeout.
  195. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  196. XCTAssertThrowsError(
  197. try transport.responseContainer.lazyInitialMetadataPromise
  198. .getFutureResult().wait()
  199. )
  200. XCTAssertThrowsError(try responsePromise.futureResult.wait())
  201. XCTAssertThrowsError(
  202. try transport.responseContainer.lazyTrailingMetadataPromise
  203. .getFutureResult().wait()
  204. )
  205. XCTAssertEqual(
  206. try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
  207. .deadlineExceeded
  208. )
  209. // Writing should fail.
  210. let sendPromise = channel.eventLoop.makePromise(of: Void.self)
  211. transport.sendRequest(.head(self.makeRequestHead()), promise: sendPromise)
  212. XCTAssertThrowsError(try sendPromise.futureResult.wait())
  213. }
  214. func testTimeoutMidRPC() throws {
  215. let deadline = NIODeadline.uptimeNanoseconds(0) + .minutes(42)
  216. let channel = EmbeddedChannel()
  217. let container = ResponsePartContainer<Response>(
  218. eventLoop: channel
  219. .eventLoop
  220. ) { (response: Response) in
  221. XCTFail("No response expected but got: \(response)")
  222. }
  223. let transport = self.makeEmbeddedTransport(
  224. channel: channel,
  225. container: container,
  226. deadline: deadline
  227. )
  228. // Activate the channel.
  229. channel.pipeline.fireChannelActive()
  230. // Okay, send some requests.
  231. transport.sendRequests([
  232. .head(self.makeRequestHead()),
  233. .message(self.makeRequest("1")),
  234. ], promise: nil)
  235. // Read the parts.
  236. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.requestHead)
  237. XCTAssertNotNil(try channel.readOutbound(as: RequestPart.self)?.message)
  238. // We'll send back the initial metadata.
  239. XCTAssertNoThrow(try channel.writeInbound(ResponsePart.initialMetadata([:])))
  240. XCTAssertNoThrow(
  241. try transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  242. .wait()
  243. )
  244. // Advance time beyond the timeout.
  245. channel.embeddedEventLoop.advanceTime(by: .minutes(42))
  246. // Check the remaining response parts.
  247. XCTAssertThrowsError(
  248. try transport.responseContainer.lazyTrailingMetadataPromise
  249. .getFutureResult().wait()
  250. )
  251. XCTAssertEqual(
  252. try transport.responseContainer.lazyStatusPromise.getFutureResult().map { $0.code }.wait(),
  253. .deadlineExceeded
  254. )
  255. }
  256. // MARK: - Channel errors
  257. func testChannelBecomesInactive() throws {
  258. let channel = EmbeddedChannel()
  259. let container = ResponsePartContainer<Response>(
  260. eventLoop: channel
  261. .eventLoop
  262. ) { (response: Response) in
  263. XCTFail("No response expected but got: \(response)")
  264. }
  265. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  266. // Activate and deactivate the channel.
  267. channel.pipeline.fireChannelActive()
  268. channel.pipeline.fireChannelInactive()
  269. // Everything should fail.
  270. XCTAssertThrowsError(
  271. try transport.responseContainer.lazyInitialMetadataPromise
  272. .getFutureResult().wait()
  273. )
  274. XCTAssertThrowsError(
  275. try transport.responseContainer.lazyTrailingMetadataPromise
  276. .getFutureResult().wait()
  277. )
  278. // Except the status, that will never fail.
  279. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  280. }
  281. func testChannelError() throws {
  282. let channel = EmbeddedChannel()
  283. let container = ResponsePartContainer<Response>(
  284. eventLoop: channel
  285. .eventLoop
  286. ) { (response: Response) in
  287. XCTFail("No response expected but got: \(response)")
  288. }
  289. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  290. // Activate the channel.
  291. channel.pipeline.fireChannelActive()
  292. // Fire an error.
  293. channel.pipeline.fireErrorCaught(GRPCStatus.processingError)
  294. // Everything should fail.
  295. XCTAssertThrowsError(
  296. try transport.responseContainer.lazyInitialMetadataPromise
  297. .getFutureResult().wait()
  298. )
  299. XCTAssertThrowsError(
  300. try transport.responseContainer.lazyTrailingMetadataPromise
  301. .getFutureResult().wait()
  302. )
  303. // Except the status, that will never fail.
  304. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  305. }
  306. // MARK: - Test Transport after Shutdown
  307. func testOutboundMethodsAfterShutdown() throws {
  308. let channel = EmbeddedChannel()
  309. let container = ResponsePartContainer<Response>(
  310. eventLoop: channel
  311. .eventLoop
  312. ) { (response: Response) in
  313. XCTFail("No response expected but got: \(response)")
  314. }
  315. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  316. // Close the channel.
  317. XCTAssertNoThrow(try channel.close().wait())
  318. // Sending should fail.
  319. let sendRequestPromise = channel.eventLoop.makePromise(of: Void.self)
  320. transport.sendRequest(.head(self.makeRequestHead()), promise: sendRequestPromise)
  321. XCTAssertThrowsError(try sendRequestPromise.futureResult.wait()) { error in
  322. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  323. }
  324. // Sending many should fail.
  325. let sendRequestsPromise = channel.eventLoop.makePromise(of: Void.self)
  326. transport.sendRequests([.end], promise: sendRequestsPromise)
  327. XCTAssertThrowsError(try sendRequestsPromise.futureResult.wait()) { error in
  328. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  329. }
  330. // Cancelling should fail.
  331. let cancelPromise = channel.eventLoop.makePromise(of: Void.self)
  332. transport.cancel(promise: cancelPromise)
  333. XCTAssertThrowsError(try cancelPromise.futureResult.wait()) { error in
  334. XCTAssertEqual(error as? ChannelError, ChannelError.alreadyClosed)
  335. }
  336. let channelFuture = transport.streamChannel()
  337. XCTAssertThrowsError(try channelFuture.wait()) { error in
  338. XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
  339. }
  340. }
  341. func testInboundMethodsAfterShutdown() throws {
  342. let channel = EmbeddedChannel()
  343. let container = ResponsePartContainer<Response>(
  344. eventLoop: channel
  345. .eventLoop
  346. ) { (response: Response) in
  347. XCTFail("No response expected but got: \(response)")
  348. }
  349. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  350. // Close the channel.
  351. XCTAssertNoThrow(try channel.close().wait())
  352. // We'll fail the handler in the container if this one is received.
  353. transport.receiveResponse(.message(self.makeResponse("ignored!")))
  354. transport.receiveError(GRPCStatus.processingError)
  355. }
  356. func testBufferedWritesAreFailedOnClose() throws {
  357. let channel = EmbeddedChannel()
  358. let container = ResponsePartContainer<Response>(
  359. eventLoop: channel
  360. .eventLoop
  361. ) { (response: Response) in
  362. XCTFail("No response expected but got: \(response)")
  363. }
  364. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  365. let requestHeadPromise = channel.eventLoop.makePromise(of: Void.self)
  366. transport.sendRequest(.head(self.makeRequestHead()), promise: requestHeadPromise)
  367. // Close the channel.
  368. XCTAssertNoThrow(try channel.close().wait())
  369. // Promise should fail.
  370. XCTAssertThrowsError(try requestHeadPromise.futureResult.wait())
  371. }
  372. func testErrorsAreNotAlwaysStatus() throws {
  373. let channel = EmbeddedChannel()
  374. let responsePromise = channel.eventLoop.makePromise(of: Response.self)
  375. let container = ResponsePartContainer<Response>(
  376. eventLoop: channel.eventLoop,
  377. unaryResponsePromise: responsePromise
  378. )
  379. let transport = self.makeEmbeddedTransport(channel: channel, container: container)
  380. transport.activate(stream: channel)
  381. // Send an error
  382. transport.receiveError(GRPCError.RPCCancelledByClient())
  383. XCTAssertThrowsError(
  384. try transport.responseContainer.lazyInitialMetadataPromise
  385. .getFutureResult().wait()
  386. ) { error in
  387. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  388. }
  389. XCTAssertThrowsError(
  390. try transport.responseContainer.lazyTrailingMetadataPromise
  391. .getFutureResult().wait()
  392. ) { error in
  393. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  394. }
  395. XCTAssertThrowsError(try responsePromise.futureResult.wait()) { error in
  396. XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
  397. }
  398. // Status never fails.
  399. XCTAssertNoThrow(try transport.responseContainer.lazyStatusPromise.getFutureResult().wait())
  400. }
  401. }
  402. extension _GRPCClientRequestPart {
  403. var requestHead: _GRPCRequestHead? {
  404. switch self {
  405. case let .head(head):
  406. return head
  407. case .message, .end:
  408. return nil
  409. }
  410. }
  411. var message: Request? {
  412. switch self {
  413. case let .message(message):
  414. return message.message
  415. case .head, .end:
  416. return nil
  417. }
  418. }
  419. var isEnd: Bool {
  420. switch self {
  421. case .end:
  422. return true
  423. case .head, .message:
  424. return false
  425. }
  426. }
  427. }