GRPCAsyncServerHandlerTests.swift 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOEmbedded
  18. import NIOHPACK
  19. import NIOPosix
  20. import XCTest
  21. @testable import GRPC
  22. // MARK: - Tests
  23. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  24. class AsyncServerHandlerTests: GRPCTestCase {
  25. private let recorder = AsyncResponseStream()
  26. private var group: EventLoopGroup!
  27. private var loop: EventLoop!
  28. override func setUp() {
  29. super.setUp()
  30. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  31. self.loop = self.group.next()
  32. }
  33. override func tearDown() {
  34. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  35. super.tearDown()
  36. }
  37. func makeCallHandlerContext(
  38. encoding: ServerMessageEncoding = .disabled
  39. ) -> CallHandlerContext {
  40. let closeFuture = self.loop.makeSucceededVoidFuture()
  41. return CallHandlerContext(
  42. errorDelegate: nil,
  43. logger: self.logger,
  44. encoding: encoding,
  45. eventLoop: self.loop,
  46. path: "/ignored",
  47. remoteAddress: nil,
  48. responseWriter: self.recorder,
  49. allocator: ByteBufferAllocator(),
  50. closeFuture: closeFuture
  51. )
  52. }
  53. private func makeHandler(
  54. encoding: ServerMessageEncoding = .disabled,
  55. callType: GRPCCallType = .bidirectionalStreaming,
  56. observer: @escaping @Sendable (
  57. GRPCAsyncRequestStream<String>,
  58. GRPCAsyncResponseStreamWriter<String>,
  59. GRPCAsyncServerCallContext
  60. ) async throws -> Void
  61. ) -> AsyncServerHandler<StringSerializer, StringDeserializer, String, String> {
  62. return AsyncServerHandler(
  63. context: self.makeCallHandlerContext(encoding: encoding),
  64. requestDeserializer: StringDeserializer(),
  65. responseSerializer: StringSerializer(),
  66. callType: callType,
  67. interceptors: [],
  68. userHandler: observer
  69. )
  70. }
  71. @Sendable
  72. private static func echo(
  73. requests: GRPCAsyncRequestStream<String>,
  74. responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
  75. context: GRPCAsyncServerCallContext
  76. ) async throws {
  77. for try await message in requests {
  78. try await responseStreamWriter.send(message)
  79. }
  80. }
  81. @Sendable
  82. private static func neverReceivesMessage(
  83. requests: GRPCAsyncRequestStream<String>,
  84. responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
  85. context: GRPCAsyncServerCallContext
  86. ) async throws {
  87. for try await message in requests {
  88. XCTFail("Unexpected message: '\(message)'")
  89. }
  90. }
  91. @Sendable
  92. private static func neverCalled(
  93. requests: GRPCAsyncRequestStream<String>,
  94. responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
  95. context: GRPCAsyncServerCallContext
  96. ) async throws {
  97. XCTFail("This observer should never be called")
  98. }
  99. func testHappyPath() async throws {
  100. let handler = self.makeHandler(
  101. observer: Self.echo(requests:responseStreamWriter:context:)
  102. )
  103. defer {
  104. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  105. }
  106. self.loop.execute {
  107. handler.receiveMetadata([:])
  108. handler.receiveMessage(ByteBuffer(string: "1"))
  109. handler.receiveMessage(ByteBuffer(string: "2"))
  110. handler.receiveMessage(ByteBuffer(string: "3"))
  111. handler.receiveEnd()
  112. }
  113. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  114. await responseStream.next().assertMetadata()
  115. for expected in ["1", "2", "3"] {
  116. await responseStream.next().assertMessage { buffer, metadata in
  117. XCTAssertEqual(buffer, .init(string: expected))
  118. XCTAssertFalse(metadata.compress)
  119. }
  120. }
  121. await responseStream.next().assertStatus { status, _ in
  122. XCTAssertEqual(status.code, .ok)
  123. }
  124. await responseStream.next().assertNil()
  125. }
  126. func testHappyPathWithCompressionEnabled() async throws {
  127. let handler = self.makeHandler(
  128. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  129. observer: Self.echo(requests:responseStreamWriter:context:)
  130. )
  131. defer {
  132. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  133. }
  134. self.loop.execute {
  135. handler.receiveMetadata([:])
  136. handler.receiveMessage(ByteBuffer(string: "1"))
  137. handler.receiveMessage(ByteBuffer(string: "2"))
  138. handler.receiveMessage(ByteBuffer(string: "3"))
  139. handler.receiveEnd()
  140. }
  141. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  142. await responseStream.next().assertMetadata()
  143. for expected in ["1", "2", "3"] {
  144. await responseStream.next().assertMessage { buffer, metadata in
  145. XCTAssertEqual(buffer, .init(string: expected))
  146. XCTAssertTrue(metadata.compress)
  147. }
  148. }
  149. await responseStream.next().assertStatus()
  150. await responseStream.next().assertNil()
  151. }
  152. func testHappyPathWithCompressionEnabledButDisabledByCaller() async throws {
  153. let handler = self.makeHandler(
  154. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  155. ) { requests, responseStreamWriter, context in
  156. try await context.response.compressResponses(false)
  157. return try await Self.echo(
  158. requests: requests,
  159. responseStreamWriter: responseStreamWriter,
  160. context: context
  161. )
  162. }
  163. defer {
  164. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  165. }
  166. self.loop.execute {
  167. handler.receiveMetadata([:])
  168. handler.receiveMessage(ByteBuffer(string: "1"))
  169. handler.receiveMessage(ByteBuffer(string: "2"))
  170. handler.receiveMessage(ByteBuffer(string: "3"))
  171. handler.receiveEnd()
  172. }
  173. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  174. await responseStream.next().assertMetadata()
  175. for expected in ["1", "2", "3"] {
  176. await responseStream.next().assertMessage { buffer, metadata in
  177. XCTAssertEqual(buffer, .init(string: expected))
  178. XCTAssertFalse(metadata.compress)
  179. }
  180. }
  181. await responseStream.next().assertStatus()
  182. await responseStream.next().assertNil()
  183. }
  184. func testResponseHeadersAndTrailersSentFromContext() async throws {
  185. let handler = self.makeHandler { _, responseStreamWriter, context in
  186. try await context.response.setHeaders(["pontiac": "bandit"])
  187. try await responseStreamWriter.send("1")
  188. try await context.response.setTrailers(["disco": "strangler"])
  189. }
  190. defer {
  191. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  192. }
  193. self.loop.execute {
  194. handler.receiveMetadata([:])
  195. handler.receiveEnd()
  196. }
  197. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  198. await responseStream.next().assertMetadata { headers in
  199. XCTAssertEqual(headers, ["pontiac": "bandit"])
  200. }
  201. await responseStream.next().assertMessage()
  202. await responseStream.next().assertStatus { _, trailers in
  203. XCTAssertEqual(trailers, ["disco": "strangler"])
  204. }
  205. await responseStream.next().assertNil()
  206. }
  207. func testResponseSequence() async throws {
  208. let handler = self.makeHandler { _, responseStreamWriter, _ in
  209. try await responseStreamWriter.send(contentsOf: ["1", "2", "3"])
  210. }
  211. defer {
  212. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  213. }
  214. self.loop.execute {
  215. handler.receiveMetadata([:])
  216. handler.receiveEnd()
  217. }
  218. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  219. await responseStream.next().assertMetadata { _ in }
  220. await responseStream.next().assertMessage()
  221. await responseStream.next().assertMessage()
  222. await responseStream.next().assertMessage()
  223. await responseStream.next().assertStatus { _, _ in }
  224. await responseStream.next().assertNil()
  225. }
  226. func testThrowingDeserializer() async throws {
  227. let handler = AsyncServerHandler(
  228. context: self.makeCallHandlerContext(),
  229. requestDeserializer: ThrowingStringDeserializer(),
  230. responseSerializer: StringSerializer(),
  231. callType: .bidirectionalStreaming,
  232. interceptors: [],
  233. userHandler: Self.neverReceivesMessage(requests:responseStreamWriter:context:)
  234. )
  235. defer {
  236. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  237. }
  238. self.loop.execute {
  239. handler.receiveMetadata([:])
  240. handler.receiveMessage(ByteBuffer(string: "hello"))
  241. }
  242. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  243. await responseStream.next().assertStatus { status, _ in
  244. XCTAssertEqual(status.code, .internalError)
  245. }
  246. await responseStream.next().assertNil()
  247. }
  248. func testThrowingSerializer() async throws {
  249. let handler = AsyncServerHandler(
  250. context: self.makeCallHandlerContext(),
  251. requestDeserializer: StringDeserializer(),
  252. responseSerializer: ThrowingStringSerializer(),
  253. callType: .bidirectionalStreaming,
  254. interceptors: [],
  255. userHandler: Self.echo(requests:responseStreamWriter:context:)
  256. )
  257. defer {
  258. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  259. }
  260. self.loop.execute {
  261. handler.receiveMetadata([:])
  262. handler.receiveMessage(ByteBuffer(string: "hello"))
  263. }
  264. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  265. await responseStream.next().assertMetadata()
  266. await responseStream.next().assertStatus { status, _ in
  267. XCTAssertEqual(status.code, .internalError)
  268. }
  269. await responseStream.next().assertNil()
  270. }
  271. func testReceiveMessageBeforeHeaders() async throws {
  272. let handler = self.makeHandler(
  273. observer: Self.neverCalled(requests:responseStreamWriter:context:)
  274. )
  275. defer {
  276. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  277. }
  278. self.loop.execute {
  279. handler.receiveMessage(ByteBuffer(string: "foo"))
  280. }
  281. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  282. await responseStream.next().assertStatus { status, _ in
  283. XCTAssertEqual(status.code, .internalError)
  284. }
  285. await responseStream.next().assertNil()
  286. }
  287. func testReceiveMultipleHeaders() async throws {
  288. let handler = self.makeHandler(
  289. observer: Self.neverReceivesMessage(requests:responseStreamWriter:context:)
  290. )
  291. defer {
  292. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  293. }
  294. self.loop.execute {
  295. handler.receiveMetadata([:])
  296. handler.receiveMetadata([:])
  297. }
  298. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  299. await responseStream.next().assertStatus { status, _ in
  300. XCTAssertEqual(status.code, .internalError)
  301. }
  302. await responseStream.next().assertNil()
  303. }
  304. func testFinishBeforeStarting() async throws {
  305. let handler = self.makeHandler(
  306. observer: Self.neverCalled(requests:responseStreamWriter:context:)
  307. )
  308. self.loop.execute {
  309. handler.finish()
  310. }
  311. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  312. await responseStream.next().assertStatus()
  313. await responseStream.next().assertNil()
  314. }
  315. func testFinishAfterHeaders() async throws {
  316. let handler = self.makeHandler(
  317. observer: Self.neverReceivesMessage(requests:responseStreamWriter:context:)
  318. )
  319. self.loop.execute {
  320. handler.receiveMetadata([:])
  321. handler.finish()
  322. }
  323. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  324. await responseStream.next().assertStatus()
  325. await responseStream.next().assertNil()
  326. }
  327. func testFinishAfterMessage() async throws {
  328. let handler = self.makeHandler(observer: Self.echo(requests:responseStreamWriter:context:))
  329. self.loop.execute {
  330. handler.receiveMetadata([:])
  331. handler.receiveMessage(ByteBuffer(string: "hello"))
  332. }
  333. // Await the metadata and message so we know the user function is running.
  334. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  335. await responseStream.next().assertMetadata()
  336. await responseStream.next().assertMessage()
  337. // Finish, i.e. terminate early.
  338. self.loop.execute {
  339. handler.finish()
  340. }
  341. await responseStream.next().assertStatus { status, _ in
  342. XCTAssertEqual(status.code, .internalError)
  343. }
  344. await responseStream.next().assertNil()
  345. }
  346. func testErrorAfterHeaders() async throws {
  347. let handler = self.makeHandler(observer: Self.echo(requests:responseStreamWriter:context:))
  348. self.loop.execute {
  349. handler.receiveMetadata([:])
  350. handler.receiveError(CancellationError())
  351. }
  352. // We don't send a message so we don't expect any responses. As metadata is sent lazily on the
  353. // first message we don't expect to get metadata back either.
  354. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  355. await responseStream.next().assertStatus { status, _ in
  356. XCTAssertEqual(status.code, .unavailable)
  357. }
  358. await responseStream.next().assertNil()
  359. }
  360. func testErrorAfterMessage() async throws {
  361. let handler = self.makeHandler(observer: Self.echo(requests:responseStreamWriter:context:))
  362. self.loop.execute {
  363. handler.receiveMetadata([:])
  364. handler.receiveMessage(ByteBuffer(string: "hello"))
  365. }
  366. // Wait the metadata and message; i.e. for function to have been invoked.
  367. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  368. await responseStream.next().assertMetadata()
  369. await responseStream.next().assertMessage()
  370. // Throw in an error.
  371. self.loop.execute {
  372. handler.receiveError(CancellationError())
  373. }
  374. // The RPC should end.
  375. await responseStream.next().assertStatus { status, _ in
  376. XCTAssertEqual(status.code, .unavailable)
  377. }
  378. await responseStream.next().assertNil()
  379. }
  380. func testHandlerThrowsGRPCStatusOKResultsInUnknownStatus() async throws {
  381. // Create a user function that immediately throws GRPCStatus.ok.
  382. let handler = self.makeHandler { _, _, _ in
  383. throw GRPCStatus.ok
  384. }
  385. // Send some metadata to trigger the creation of the async task with the user function.
  386. self.loop.execute {
  387. handler.receiveMetadata([:])
  388. }
  389. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  390. await responseStream.next().assertStatus { status, _ in
  391. XCTAssertEqual(status.code, .unknown)
  392. }
  393. await responseStream.next().assertNil()
  394. }
  395. func testUnaryHandlerReceivingMultipleMessages() async throws {
  396. @Sendable
  397. func neverCalled(_: String, _: GRPCAsyncServerCallContext) async throws -> String {
  398. XCTFail("Should not be called")
  399. return ""
  400. }
  401. let handler = GRPCAsyncServerHandler(
  402. context: self.makeCallHandlerContext(),
  403. requestDeserializer: StringDeserializer(),
  404. responseSerializer: StringSerializer(),
  405. interceptors: [],
  406. wrapping: neverCalled(_:_:)
  407. )
  408. defer {
  409. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  410. }
  411. self.loop.execute {
  412. handler.receiveMetadata([:])
  413. handler.receiveMessage(ByteBuffer(string: "1"))
  414. handler.receiveMessage(ByteBuffer(string: "2"))
  415. }
  416. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  417. await responseStream.next().assertStatus { status, _ in
  418. XCTAssertEqual(status.code, .internalError)
  419. }
  420. }
  421. func testServerStreamingHandlerReceivingMultipleMessages() async throws {
  422. @Sendable
  423. func neverCalled(
  424. _: String,
  425. _: GRPCAsyncResponseStreamWriter<String>,
  426. _: GRPCAsyncServerCallContext
  427. ) async throws {
  428. XCTFail("Should not be called")
  429. }
  430. let handler = GRPCAsyncServerHandler(
  431. context: self.makeCallHandlerContext(),
  432. requestDeserializer: StringDeserializer(),
  433. responseSerializer: StringSerializer(),
  434. interceptors: [],
  435. wrapping: neverCalled(_:_:_:)
  436. )
  437. defer {
  438. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  439. }
  440. self.loop.execute {
  441. handler.receiveMetadata([:])
  442. handler.receiveMessage(ByteBuffer(string: "1"))
  443. handler.receiveMessage(ByteBuffer(string: "2"))
  444. }
  445. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  446. await responseStream.next().assertStatus { status, _ in
  447. XCTAssertEqual(status.code, .internalError)
  448. }
  449. }
  450. }
  451. internal final class AsyncResponseStream: GRPCServerResponseWriter {
  452. private let source:
  453. NIOAsyncSequenceProducer<
  454. GRPCServerResponsePart<ByteBuffer>,
  455. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  456. GRPCAsyncSequenceProducerDelegate
  457. >.Source
  458. internal var responseSequence:
  459. NIOAsyncSequenceProducer<
  460. GRPCServerResponsePart<ByteBuffer>,
  461. NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
  462. GRPCAsyncSequenceProducerDelegate
  463. >
  464. init() {
  465. let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
  466. lowWatermark: 10,
  467. highWatermark: 50
  468. )
  469. let sequence = NIOAsyncSequenceProducer.makeSequence(
  470. elementType: GRPCServerResponsePart<ByteBuffer>.self,
  471. backPressureStrategy: backpressureStrategy,
  472. delegate: GRPCAsyncSequenceProducerDelegate()
  473. )
  474. self.source = sequence.source
  475. self.responseSequence = sequence.sequence
  476. }
  477. func sendMetadata(
  478. _ metadata: HPACKHeaders,
  479. flush: Bool,
  480. promise: EventLoopPromise<Void>?
  481. ) {
  482. _ = self.source.yield(.metadata(metadata))
  483. promise?.succeed(())
  484. }
  485. func sendMessage(
  486. _ bytes: ByteBuffer,
  487. metadata: MessageMetadata,
  488. promise: EventLoopPromise<Void>?
  489. ) {
  490. _ = self.source.yield(.message(bytes, metadata))
  491. promise?.succeed(())
  492. }
  493. func sendEnd(
  494. status: GRPCStatus,
  495. trailers: HPACKHeaders,
  496. promise: EventLoopPromise<Void>?
  497. ) {
  498. _ = self.source.yield(.end(status, trailers))
  499. self.source.finish()
  500. promise?.succeed(())
  501. }
  502. func stopRecording() {
  503. self.source.finish()
  504. }
  505. }
  506. extension Optional where Wrapped == GRPCServerResponsePart<ByteBuffer> {
  507. func assertNil() {
  508. XCTAssertNil(self)
  509. }
  510. func assertMetadata(_ verify: (HPACKHeaders) -> Void = { _ in }) {
  511. switch self {
  512. case let .some(.metadata(headers)):
  513. verify(headers)
  514. default:
  515. XCTFail("Expected metadata but value was \(String(describing: self))")
  516. }
  517. }
  518. func assertMessage(_ verify: (ByteBuffer, MessageMetadata) -> Void = { _, _ in }) {
  519. switch self {
  520. case let .some(.message(buffer, metadata)):
  521. verify(buffer, metadata)
  522. default:
  523. XCTFail("Expected message but value was \(String(describing: self))")
  524. }
  525. }
  526. func assertStatus(_ verify: (GRPCStatus, HPACKHeaders) -> Void = { _, _ in }) {
  527. switch self {
  528. case let .some(.end(status, trailers)):
  529. verify(status, trailers)
  530. default:
  531. XCTFail("Expected status but value was \(String(describing: self))")
  532. }
  533. }
  534. }