InProcessServerTransportTest.swift 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import XCTest
  17. @testable import GRPCCore
  18. final class InProcessServerTransportTest: XCTestCase {
  19. func testStartListening() async throws {
  20. let transport = InProcessServerTransport()
  21. let stream = RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>(
  22. descriptor: .init(service: "testService", method: "testMethod"),
  23. inbound: .elements([.message([42])]),
  24. outbound: .init(
  25. wrapping: BufferedStream.Source(
  26. storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
  27. )
  28. )
  29. )
  30. let streamSequence = transport.listen()
  31. var streamSequenceInterator = streamSequence.makeAsyncIterator()
  32. try transport.acceptStream(stream)
  33. let testStream = try await streamSequenceInterator.next()
  34. let messages = try await testStream?.inbound.collect()
  35. XCTAssertEqual(messages, [.message([42])])
  36. }
  37. func testStopListening() async throws {
  38. let transport = InProcessServerTransport()
  39. let firstStream = RPCStream<
  40. RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
  41. >(
  42. descriptor: .init(service: "testService1", method: "testMethod1"),
  43. inbound: .elements([.message([42])]),
  44. outbound: .init(
  45. wrapping: BufferedStream.Source(
  46. storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
  47. )
  48. )
  49. )
  50. let streamSequence = transport.listen()
  51. var streamSequenceInterator = streamSequence.makeAsyncIterator()
  52. try transport.acceptStream(firstStream)
  53. let firstTestStream = try await streamSequenceInterator.next()
  54. let firstStreamMessages = try await firstTestStream?.inbound.collect()
  55. XCTAssertEqual(firstStreamMessages, [.message([42])])
  56. transport.stopListening()
  57. let secondStream = RPCStream<
  58. RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable
  59. >(
  60. descriptor: .init(service: "testService1", method: "testMethod1"),
  61. inbound: .elements([.message([42])]),
  62. outbound: .init(
  63. wrapping: BufferedStream.Source(
  64. storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
  65. )
  66. )
  67. )
  68. XCTAssertThrowsRPCError(try transport.acceptStream(secondStream)) { error in
  69. XCTAssertEqual(error.code, .failedPrecondition)
  70. }
  71. let secondTestStream = try await streamSequenceInterator.next()
  72. XCTAssertNil(secondTestStream)
  73. }
  74. }