InProcessServerTransportTests.swift 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import XCTest
  17. @testable import GRPCCore
  18. @testable import GRPCInProcessTransport
  19. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
  20. final class InProcessServerTransportTests: XCTestCase {
  21. func testStartListening() async throws {
  22. let transport = InProcessServerTransport()
  23. let stream = RPCStream<
  24. RPCAsyncSequence<RPCRequestPart, any Error>,
  25. RPCWriter<RPCResponsePart>.Closable
  26. >(
  27. descriptor: .init(service: "testService", method: "testMethod"),
  28. inbound: RPCAsyncSequence<RPCRequestPart, any Error>(
  29. wrapping: AsyncThrowingStream {
  30. $0.yield(.message([42]))
  31. $0.finish()
  32. }
  33. ),
  34. outbound: .init(
  35. wrapping: BufferedStream.Source(
  36. storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
  37. )
  38. )
  39. )
  40. let messages = LockedValueBox<[RPCRequestPart]?>(nil)
  41. try await withThrowingTaskGroup(of: Void.self) { group in
  42. group.addTask {
  43. try await transport.listen { stream in
  44. let partValue = try? await stream.inbound.reduce(into: []) { $0.append($1) }
  45. messages.withLockedValue { $0 = partValue }
  46. transport.stopListening()
  47. }
  48. }
  49. try transport.acceptStream(stream)
  50. }
  51. XCTAssertEqual(messages.withLockedValue { $0 }, [.message([42])])
  52. }
  53. func testStopListening() async throws {
  54. let transport = InProcessServerTransport()
  55. let firstStream = RPCStream<
  56. RPCAsyncSequence<RPCRequestPart, any Error>, RPCWriter<RPCResponsePart>.Closable
  57. >(
  58. descriptor: .init(service: "testService1", method: "testMethod1"),
  59. inbound: RPCAsyncSequence(
  60. wrapping: AsyncThrowingStream {
  61. $0.yield(.message([42]))
  62. $0.finish()
  63. }
  64. ),
  65. outbound: .init(
  66. wrapping: BufferedStream.Source(
  67. storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
  68. )
  69. )
  70. )
  71. try transport.acceptStream(firstStream)
  72. try await transport.listen { stream in
  73. let firstStreamMessages = try? await stream.inbound.reduce(into: []) {
  74. $0.append($1)
  75. }
  76. XCTAssertEqual(firstStreamMessages, [.message([42])])
  77. transport.stopListening()
  78. let secondStream = RPCStream<
  79. RPCAsyncSequence<RPCRequestPart, any Error>, RPCWriter<RPCResponsePart>.Closable
  80. >(
  81. descriptor: .init(service: "testService1", method: "testMethod1"),
  82. inbound: RPCAsyncSequence(
  83. wrapping: AsyncThrowingStream {
  84. $0.yield(.message([42]))
  85. $0.finish()
  86. }
  87. ),
  88. outbound: .init(
  89. wrapping: BufferedStream.Source(
  90. storage: .init(backPressureStrategy: .watermark(.init(low: 1, high: 1)))
  91. )
  92. )
  93. )
  94. XCTAssertThrowsError(ofType: RPCError.self) {
  95. try transport.acceptStream(secondStream)
  96. } errorHandler: { error in
  97. XCTAssertEqual(error.code, .failedPrecondition)
  98. XCTAssertEqual(error.message, "The server transport is closed.")
  99. }
  100. }
  101. }
  102. }